From d8387e07d1930ff6076feede5ed16212463f1323 Mon Sep 17 00:00:00 2001 From: Jaroslav Tulach Date: Sat, 18 Sep 2021 08:44:55 +0200 Subject: [PATCH] Better synchronization of the simple server --- .../html/presenters/browser/SimpleServer.java | 451 +++++++++--------- 1 file changed, 235 insertions(+), 216 deletions(-) diff --git a/browser/src/main/java/org/netbeans/html/presenters/browser/SimpleServer.java b/browser/src/main/java/org/netbeans/html/presenters/browser/SimpleServer.java index ee3a381b..747e0fec 100644 --- a/browser/src/main/java/org/netbeans/html/presenters/browser/SimpleServer.java +++ b/browser/src/main/java/org/netbeans/html/presenters/browser/SimpleServer.java @@ -29,18 +29,19 @@ import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.text.DateFormat; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Random; @@ -56,7 +57,7 @@ import java.util.regex.Pattern; import org.netbeans.html.boot.spi.Fn; -final class SimpleServer extends HttpServer implements Runnable { +final class SimpleServer extends HttpServer { private final Map maps = new TreeMap<>((s1, s2) -> { if (s1.length() != s2.length()) { return s2.length() - s1.length(); @@ -65,9 +66,19 @@ final class SimpleServer extends HttpServer pendingActions = new ArrayList<>(); private static final Pattern PATTERN_GET = Pattern.compile("(OPTIONS|HEAD|GET|POST|PUT|DELETE) */([^ \\?]*)(\\?[^ ]*)?"); private static final Pattern PATTERN_HOST = Pattern.compile(".*^Host: *(.*):([0-9]+)$", Pattern.MULTILINE); @@ -93,39 +104,53 @@ void init(int from, int to) throws IOException { } @Override - void start() throws IOException { + synchronized void start() throws IOException { LOG.log(Level.INFO, "Listening for HTTP connections on port {0}", getServer().socket().getLocalPort()); - processor = new Thread(this, "HTTP server"); + processor = new Thread(this::mainLoop, "HTTP server"); processor.start(); } + private final synchronized Thread getProcessorThread() { + return processor; + } + + final void assertThread() { + assert Thread.currentThread() == getProcessorThread(); + } + @Override String getRequestURI(ReqRes r) { + assertThread(); return "/" + r.url; } @Override String getServerName(ReqRes r) { + assertThread(); return r.hostName; } @Override int getServerPort(ReqRes r) { + assertThread(); return r.hostPort; } @Override String getParameter(ReqRes r, String id) { + assertThread(); return (String) r.args.get(id); } @Override String getMethod(ReqRes r) { + assertThread(); return r.method; } @Override String getBody(ReqRes r) { + assertThread(); if (r.body == null) { return ""; } else { @@ -139,6 +164,7 @@ static int endOfHeader(String header) { @Override String getHeader(ReqRes r, String key) { + assertThread(); for (String l : r.header.split("\r\n")) { if (l.isEmpty()) { break; @@ -152,36 +178,43 @@ String getHeader(ReqRes r, String key) { @Override Writer getWriter(ReqRes r) { + assertThread(); return r.writer; } @Override void setContentType(ReqRes r, String contentType) { + assertThread(); r.contentType = contentType; } @Override void setStatus(ReqRes r, int status) { + assertThread(); r.status = status; } @Override OutputStream getOutputStream(ReqRes r) { + assertThread(); return r.os; } @Override void suspend(ReqRes r) { - r.interestOps(0); + assertThread(); r.suspended = true; + r.updateOperations(); } @Override void resume(ReqRes r, Runnable whenReady) { - whenReady.run(); - r.suspended = false; - r.interestOps(SelectionKey.OP_WRITE); - connectionWakeup(); + connectionWakeup(() -> { + assertThread(); + r.suspended = false; + r.updateOperations(); + whenReady.run(); + }); } @Override @@ -193,6 +226,7 @@ void setCharacterEncoding(ReqRes r, String encoding) { @Override void addHeader(ReqRes r, String name, String value) { + assertThread(); r.headers.put(name, value); } @@ -212,51 +246,47 @@ public int getPort() { } } - void connectionWakeup() { + synchronized void connectionWakeup(Runnable runOnMainLoop) { Selector localConnection = this.connection; + this.pendingActions.add(runOnMainLoop); if (localConnection != null) { localConnection.wakeup(); } } - @Override - public void run() { + private void mainLoop() { ByteBuffer bb = ByteBuffer.allocate(2048); - int sleep = 10; - - while (Thread.currentThread() == processor) { + while (Thread.currentThread() == getProcessorThread()) { ServerSocketChannel localServer; Selector localConnection; + Runnable[] pendings; SocketChannel toClose = null; try { synchronized (this) { localServer = this.getServer(); localConnection = this.connection; + pendings = this.pendingActions.toArray(new Runnable[0]); + this.pendingActions.clear(); } - LOG.log(Level.FINE, "Before select {0}", localConnection.isOpen()); - LOG.log(Level.FINE, "Server {0}", localServer.isOpen()); + LOG.log(Level.FINEST, "Before select status: open server{0}, open connection {1}, pending {2}", + new Object[]{localServer.isOpen(), localConnection.isOpen(), pendings.length} + ); + + for (Runnable r : pendings) { + r.run(); + } int amount = localConnection.select(); - LOG.log(Level.FINE, "After select: {0}", amount); + LOG.log(Level.FINEST, "After select: {0}", amount); if (amount == 0) { - try { - Thread.sleep(sleep); - } catch (InterruptedException ex) { - } - sleep *= 2; - if (sleep > 1000) { - sleep = 1000; - } - } else { - sleep = 10; + LOG.log(Level.FINE, "No amount after select: {0}", amount); } Set readyKeys = localConnection.selectedKeys(); Iterator it = readyKeys.iterator(); - PROCESS: while (it.hasNext()) { SelectionKey key = it.next(); LOG.log(Level.FINEST, "Handling key {0}", key.attachment()); @@ -269,82 +299,48 @@ public void run() { SelectionKey another = channel.register( localConnection, SelectionKey.OP_READ ); + another.attach(new ReadHeader()); } catch (ClosedByInterruptException ex) { LOG.log(Level.WARNING, "Interrupted while accepting", ex); server.close(); server = null; LOG.log(Level.INFO, "Accept server reset"); } - continue PROCESS; - } - - if (key.isReadable()) { - ((Buffer)bb).clear(); + } else if (key.isReadable()) { + ((Buffer) bb).clear(); SocketChannel channel = (SocketChannel) key.channel(); toClose = channel; channel.read(bb); - if (key.attachment() instanceof ReqRes) { - ((Buffer)bb).flip(); - ReqRes req = (ReqRes) key.attachment(); - req.bodyToFill().put(bb); - if (req.bodyToFill().remaining() == 0) { - key.interestOps(SelectionKey.OP_WRITE); + ((Buffer) bb).flip(); + + if (key.attachment() instanceof ReadHeader) { + ReadHeader readHeader = (ReadHeader) key.attachment(); + ReqRes nextKey = readHeader.process(key, bb); + if (nextKey != null) { + key.attach(nextKey); + nextKey.updateOperations(); } - continue PROCESS; - } - - - ((Buffer)bb).flip(); - String text = new String(bb.array(), 0, bb.limit()); - int fullHeader = endOfHeader(text); - if (channel.isOpen() && fullHeader == -1) { - continue PROCESS; - } - String header = text.substring(0, fullHeader); - - Matcher m = PATTERN_GET.matcher(header); - String url = m.find() ? m.group(2) : null; - String args = url != null && m.groupCount() == 3 ? m.group(3) : null; - String method = m.group(1); - - Map context; - if (args != null) { - Map c = new HashMap<>(); - parseArgs(c, args); - context = Collections.unmodifiableMap(c); - } else { - context = Collections.emptyMap(); - } - - Matcher length = PATTERN_LENGTH.matcher(header); - ByteBuffer body = null; - if (length.find()) { - int contentLength = Integer.parseInt(length.group(1)); - body = ByteBuffer.allocate(contentLength); - ((Buffer)bb).position(fullHeader + 4); - body.put(bb); - } - - ReqRes req = findRequest(url, context, header, method, body); - key.attach(req); - if (body != null && body.remaining() > 0) { - key.interestOps(SelectionKey.OP_READ); - continue PROCESS; + } else if (key.attachment() instanceof ReqRes) { + ReqRes req = (ReqRes) key.attachment(); + req.readBody(key, bb); + req.updateOperations(); } - key.interestOps(SelectionKey.OP_WRITE); - continue PROCESS; - } - - if (key.isWritable()) { + } else if (key.isWritable()) { SocketChannel channel = (SocketChannel) key.channel(); toClose = channel; - ReqRes reply = (ReqRes) key.attachment(); - if (reply == null) { - continue PROCESS; + if (key.attachment() instanceof ReqRes) { + ReqRes request = (ReqRes) key.attachment(); + WriteReply write = request.handle(channel); + if (write != null) { + key.attach(write); + write.updateOperations(); + } + } else if (key.attachment() instanceof WriteReply) { + WriteReply write = (WriteReply) key.attachment(); + write.output(channel); } - reply.handle(key, channel); } - } + } } catch (ThreadDeath td) { throw td; } catch (Throwable t) { @@ -359,41 +355,26 @@ public void run() { } } - try { - LOG.fine("Closing connection"); - this.connection.close(); - LOG.fine("Closing server"); - this.getServer().close(); - } catch (IOException ex) { - LOG.log(Level.WARNING, null, ex); - } - synchronized (this) { - notifyAll(); + try { + LOG.fine("Closing connection"); + this.connection.close(); + LOG.fine("Closing server"); + this.getServer().close(); + } catch (IOException ex) { + LOG.log(Level.WARNING, null, ex); + } finally { + notifyAll(); + } } LOG.fine("All notified, exiting server"); } - private ReqRes findRequest( - String url, Map args, String header, - String method, ByteBuffer bodyToFill - ) { - LOG.log(Level.FINE, "Searching for page {0}", url); - Matcher hostMatch = PATTERN_HOST.matcher(header); - String host = null; - int port = -1; - if (hostMatch.find()) { - host = hostMatch.group(1); - port = Integer.parseInt(hostMatch.group(2)); - } - if (host != null) { - LOG.log(Level.FINE, "Host {0}:{1}", new Object[] { host, port }); - } - + private Handler findHandler(String url) { + LOG.log(Level.FINE, "Searching for handler for page {0}", url); for (Map.Entry entry : maps.entrySet()) { if (url.startsWith(entry.getKey())) { - final Handler h = entry.getValue(); - return new ReqRes(h, url, args, host, port, header, method, bodyToFill); + return entry.getValue(); } } throw new IllegalStateException("No mapping for " + url + " among " + maps); @@ -455,10 +436,7 @@ static byte[] date(String prefix, Date date) { } } - /** - * @return the server - */ - public ServerSocketChannel getServer() throws IOException { + public synchronized ServerSocketChannel getServer() throws IOException { if (server == null) { ServerSocketChannel s = ServerSocketChannel.open(); s.configureBlocking(false); @@ -482,7 +460,8 @@ public ServerSocketChannel getServer() throws IOException { return server; } - class Context implements ThreadFactory { + final class Context implements ThreadFactory { + private final String id; Executor RUN; Thread RUNNER; @@ -509,10 +488,11 @@ Context initializeRunner(String id) { @Override void runSafe(Context c, Runnable r, Fn.Presenter presenter) { class Wrap implements Runnable { + @Override public void run() { if (presenter != null) { - try (Closeable c = Fn.activate(presenter)) { + try ( Closeable c = Fn.activate(presenter)) { r.run(); } catch (IOException ex) { // go on @@ -535,30 +515,83 @@ public void run() { } } - final class ReqRes extends SelectionKey { + final class ReadHeader { + + private final StringBuilder buffer = new StringBuilder(); + + final ReqRes process(SelectionKey key, ByteBuffer chunk) { + String text = new String(chunk.array(), 0, chunk.limit(), StandardCharsets.US_ASCII); + buffer.append(text); + int fullHeader = buffer.indexOf("\r\n\r\n"); + if (fullHeader == -1) { + return null; + } + String header = text.substring(0, fullHeader); + + Matcher m = PATTERN_GET.matcher(header); + String url = m.find() ? m.group(2) : null; + String args = url != null && m.groupCount() == 3 ? m.group(3) : null; + String method = m.group(1); + + Map context; + if (args != null) { + Map c = new HashMap<>(); + parseArgs(c, args); + context = Collections.unmodifiableMap(c); + } else { + context = Collections.emptyMap(); + } + + Matcher length = PATTERN_LENGTH.matcher(header); + ByteBuffer body = null; + if (length.find()) { + int contentLength = Integer.parseInt(length.group(1)); + body = ByteBuffer.allocate(contentLength); + ((Buffer) chunk).position(fullHeader + 4); + body.put(chunk); + } + + Handler h = findHandler(url); + Matcher hostMatch = PATTERN_HOST.matcher(header); + String host = null; + int port = -1; + if (hostMatch.find()) { + host = hostMatch.group(1); + port = Integer.parseInt(hostMatch.group(2)); + } + if (host != null) { + LOG.log(Level.FINER, "Host {0}:{1}", new Object[]{host, port}); + } + return new ReqRes(h, key, url, context, host, port, header, method, body); + } + } + + final class ReqRes { + + private final SelectionKey delegate; private final Handler h; final String url; final String hostName; final int hostPort; - final Map args; + final Map args; final String header; final String method; final ByteBuffer body; - private ByteBuffer bb = ByteBuffer.allocate(8192); - private SelectionKey delegate; private final ByteArrayOutputStream os = new ByteArrayOutputStream(); final Writer writer = new OutputStreamWriter(os, StandardCharsets.UTF_8); - final Map headers = new LinkedHashMap<>(); + final Map headers = new LinkedHashMap<>(); String contentType; int status = 200; + boolean computed; boolean suspended; public ReqRes( - Handler h, - String url, Map args, String host, - int port, String header, String method, ByteBuffer body + Handler h, SelectionKey delegate, + String url, Map args, String host, + int port, String header, String method, ByteBuffer body ) { this.h = h; + this.delegate = delegate; this.url = url; this.hostName = host; this.hostPort = port; @@ -568,68 +601,49 @@ public ReqRes( this.body = body; } - public void handle(SelectionKey key, SocketChannel channel) throws IOException { - delegate = key; + void updateOperations() { + if (body != null && body.remaining() > 0) { + delegate.interestOps(SelectionKey.OP_READ); + } else if (suspended) { + delegate.interestOps(0); + } else { + delegate.interestOps(SelectionKey.OP_WRITE); + } + } - if (bb != null) { - Map headerAttrs = Collections.emptyMap(); - String mime; + public WriteReply handle(SocketChannel channel) throws IOException { + if (!computed) { + computed = true; h.service(SimpleServer.this, this, this); - headerAttrs = headers; + } + if (suspended) { + channel.write(ByteBuffer.allocate(0)); + return null; + } - mime = contentType; - if (mime == null) { - mime = "content/unknown"; // NOI18N - } - ((Buffer)bb).clear(); - - LOG.log(Level.FINE, "Found page request {0}", url); // NOI18N - ((Buffer)bb).clear(); - bb.put(("HTTP/1.1 " + status + "\r\n").getBytes()); - bb.put("Connection: close\r\n".getBytes()); - bb.put("Server: Browser PReqenter\r\n".getBytes()); - bb.put(date(null)); - bb.put("\r\n".getBytes()); - bb.put(("Content-Type: " + mime + "\r\n").getBytes()); - for (Map.Entry entry : headerAttrs.entrySet()) { - bb.put((entry.getKey() + ":" + entry.getValue() + "\r\n").getBytes()); - } - bb.put("Pragma: no-cache\r\nCache-control: no-cache\r\n".getBytes()); - bb.put("\r\n".getBytes()); - ((Buffer)bb).flip(); - channel.write(bb); - LOG.log(Level.FINER, "Written header, type {0}", mime); - bb = null; - - if ("HEAD".equals(method) || "OPTIONS".equals(method)) { - LOG.fine("Writer flushed and closed, closing channel"); - channel.close(); - return; - } + if (contentType == null) { + contentType = "content/unknown"; // NOI18N } - try { - if (attachment() == null) { - if (suspended) { - channel.write(ByteBuffer.allocate(0)); - return; - } - ByteBuffer out = ByteBuffer.wrap(toByteArray()); - attach(out); - } - ByteBuffer bb = (ByteBuffer) attachment(); - if (bb.remaining() > 0) { - channel.write(bb); - } else { - channel.close(); - } - } finally { - if (!channel.isOpen()) { - LOG.log(Level.FINE, "channel not open, closing"); - key.attach(null); - key.cancel(); - } + ByteBuffer bb = ByteBuffer.allocate(8192); + ((Buffer) bb).clear(); + + LOG.log(Level.FINE, "Serving page request {0}", url); // NOI18N + ((Buffer) bb).clear(); + bb.put(("HTTP/1.1 " + status + "\r\n").getBytes()); + bb.put("Connection: close\r\n".getBytes()); + bb.put("Server: Browser presenter\r\n".getBytes()); + bb.put(date(null)); + bb.put("\r\n".getBytes()); + bb.put(("Content-Type: " + contentType + "\r\n").getBytes()); + for (Map.Entry entry : headers.entrySet()) { + bb.put((entry.getKey() + ":" + entry.getValue() + "\r\n").getBytes()); } + bb.put("Pragma: no-cache\r\nCache-control: no-cache\r\n".getBytes()); + bb.put("\r\n".getBytes()); + ((Buffer) bb).flip(); + + return new WriteReply(delegate, url, bb, ByteBuffer.wrap(toByteArray())); } byte[] toByteArray() throws IOException { @@ -637,48 +651,53 @@ byte[] toByteArray() throws IOException { return os.toByteArray(); } - public ByteBuffer bodyToFill() { - return body; + void readBody(SelectionKey key, ByteBuffer chunk) { + body.put(chunk); } @Override public String toString() { return "Request[" + method + ":" + url + "]"; } + } - @Override - public SelectableChannel channel() { - return delegate.channel(); - } - - @Override - public Selector selector() { - return delegate.selector(); - } + final class WriteReply { - @Override - public boolean isValid() { - return delegate.isValid(); - } + private final SelectionKey delegate; + private final String url; + private final ByteBuffer header; + private final ByteBuffer body; - @Override - public void cancel() { - delegate.cancel(); + WriteReply(SelectionKey delegate, String url, ByteBuffer header, ByteBuffer body) { + this.delegate = delegate; + this.url = url; + this.header = header; + this.body = body; } - @Override - public int interestOps() { - return delegate.interestOps(); + void updateOperations() { + delegate.interestOps(SelectionKey.OP_WRITE); } - @Override - public SelectionKey interestOps(int arg0) { - return delegate.interestOps(arg0); - } + void output(SocketChannel channel) throws IOException { + try { + if (header.remaining() > 0) { + channel.write(header); + return; + } + if (body.remaining() > 0) { + channel.write(body); + } else { + channel.close(); + } + } finally { + if (!channel.isOpen()) { + LOG.log(Level.FINE, "channel for {0} not open, closing", url); + delegate.attach(null); + delegate.cancel(); + } + } - @Override - public int readyOps() { - return delegate.readyOps(); } } }