“Closed while Pending/Unready” warnings from Jetty

2019-08-15 02:13发布

问题:

After fixing synchronization issues in our async servlet we still got rare

java.io.IOException: Closed while Pending/Unready

warnings from Jetty. With the fixes above it decreased from ~90/day to ~5/day in our production system. It's rare and it seems a lot better but probably something minor is still missing.

Complete stacktrace:

[jetty-63523] (HttpOutput.java:287)   - 
java.io.IOException: Closed while Pending/Unready
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:285)
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:493)
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
        at java.lang.Thread.run(Thread.java:748)

The only code which writes to the output stream (without synchronization) is the following setContentType(), setStatus() and flushBuffer() calls here:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

This runs before our QueueWriteListener is set, so if flushBuffer() is synchronous it should not be a problem.

Anyway, looking up source code of Jetty, Reponse.flush() calls HttpOutput.flush() where calling new AsyncFlush().iterate() seems suspicious but debugging the doPost()/flushBuffer() this case branch is not run.

Complete code:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

private static class QueueWriteListener implements AsyncListener, WriteListener {

    private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);

    private final AsyncContext asyncContext;
    private final ServletOutputStream output;

    @GuardedBy("this")
    private boolean completed = false;

    public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
        this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
        this.output = checkNotNull(output, "output cannot be null");
    }

    @Override
    public void onWritePossible() throws IOException {
        writeImpl();
    }

    private synchronized void writeImpl() throws IOException {
        if (completed) {
            return;
        }
        while (output.isReady()) {
            final byte[] message = getNextMessage();
            if (message == null) {
                output.flush();
                return;
            }
            output.write(message);
        }
    }

    private synchronized void completeImpl() {
        // also stops DataFeederThread to call bufferArrived
        completed = true;
        asyncContext.complete();
    }

    @Override
    public void onError(final Throwable t) {
        logger.error("Writer.onError", t);
        completeImpl();
    }

    public void dataArrived() {
        try {
            writeImpl();
        } catch (RuntimeException | IOException e) {
            ...
        }
    }

    public void noMoreData() {
        completeImpl();
    }

    @Override
    public synchronized void onComplete(final AsyncEvent event) throws IOException {
        completed = true; // might not needed but does not hurt
    }

    @Override
    public synchronized void onTimeout(final AsyncEvent event) throws IOException {
        completeImpl();
    }

    @Override
    public void onError(final AsyncEvent event) throws IOException {
        logger.error("onError", event.getThrowable());
    }

    ...
}

So, it seems that between completing and (asynchronously) closing the output by Jetty noone can write to the output, therefore its state should not be changed to pending or unready. Despite of that it still happens somehow. What could be the cause of this Closed while Pending/Unready warning?

I have checked our logs (nothing relevant).

onError(AsyncEvent event) is not synchronized in our code (yet) but it's not relevant since its log message has never shown up in our log.

Related discussion on GitHub: https://github.com/eclipse/jetty.project/issues/2689

回答1:

I was able to reproduce the warning with this code:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(8192);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(10_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

private static class QueueWriteListener implements AsyncListener, WriteListener {

    private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);

    private final ServletOutputStream output;

    public QueueWriteListener(final ServletOutputStream output) {
        this.output = checkNotNull(output, "output cannot be null");
    }

    @Override
    public void onWritePossible() throws IOException {
        logger.info("onWritePossible()");
        long written = 0;
        while (output.isReady()) {
            final byte[] data = new byte[8 * 1024 * 1024];
            Arrays.fill(data, (byte) 'W');
            output.write(data);
            written += data.length;
            logger.info("write OK, written: {} KB", written / 1024);
        }
        logger.info("onWritePossible() end");
    }

    @Override
    public void onError(final Throwable t) {
        logger.info("Writer.onError -Error: {}, Message: {}", t.getClass().getName(), t.getMessage());
    }

    @Override
    public void onComplete(final AsyncEvent event) throws IOException {
        logger.warn("onComplete: {}", event);
    }

    @Override
    public void onTimeout(final AsyncEvent event) throws IOException {
        logger.warn("onTimeout()");
    }

    @Override
    public void onError(final AsyncEvent event) throws IOException {
        logger.error("onError: {}", event, event.getThrowable());
    }

    @Override
    public void onStartAsync(final AsyncEvent event) throws IOException {
        logger.info("onStartAsync: {}", event);
    }
}

And with a slow curl client:

curl --limit 16 -XPOST http://localhost:35419/example2

Result:

10:39:29,063  INFO [jetty-16] {Example2Servlet.java:52} - onWritePossible()
10:39:29,084  INFO [jetty-16] {Example2Servlet.java:59} - write OK, written: 8192 KB
10:39:29,084  INFO [jetty-16] {Example2Servlet.java:61} - onWritePossible() end
10:39:39,085  WARN [jetty-17] {Example2Servlet.java:76} - onTimeout()
10:39:39,088  WARN [jetty-17] {HttpOutput.java:286} - java.io.IOException: Closed while Pending/Unready, requestUrl=http://localhost:35419/example2
10:39:39,090  INFO [jetty-17] {HttpOutput.java:287} - 
java.io.IOException: Closed while Pending/Unready, requestUrl=http://localhost:35419/example2
    at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:285)
    at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:493)
    at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
    at java.lang.Thread.run(Thread.java:748)
10:39:39,091  WARN [jetty-17] {Example2Servlet.java:71} - onComplete: org.eclipse.jetty.server.AsyncContextEvent@3b51901

Things to notice:

  • It does not use any third party threads.
  • In the original code there was a complete() call in onTimeout(), but it does not influence the behavior, so I removed it.
  • I also removed the synchronized keywords from method declarations - they do not influence the behavior.
  • I was able to reproduce the same behavior with different buffer and array sizes, the warning came sooner or later. (For example, buffer size of 8192 bytes and array size of 1024 bytes also reproduces the warning but needs a little bit more time.) This usually results additional debug logs, like this one:

    DEBUG [jetty-12] {HttpOutput.java:1271} - EOF of org.eclipse.jetty.server.HttpOutput$AsyncWrite@4b8dff34[PROCESSING]