Correct way to stop custom logback async appender

2019-04-09 09:11发布

问题:

I've created Amazon SQS and SNS logback appenders using the Amazon's Java SDK. The basic appenders use the synchronous Java APIs, but I've also created asynchronous versions of both by extending the ch.qos.logback.classic.AsyncAppender class.

Stopping the logback logger context with the async appenders does not work as expected though. When the context is stopped, all async appenders try to to flush remaining events before exiting. The problem originates from ch.qos.logback.core.AsyncAppenderBase#stop method, which interrupts the worker thread. The interrupt is triggered while the Amazon SDK is still processing the queued events and results a com.amazonaws.AbortedException. In my tests the AbortedException happened while the SDK was processing a response from the API, so the actual message went through, but this might not always be the case.

Is it intended that logback interrupts the worker thread even though the workers should still process the remaining event queue? And if so, how can I work around the AbortedException caused by the interrupt? I could override the whole stop methods and remove the interrupt, but that would require copy pasting most of the implementation.

回答1:

I finally managed to figure a solution, which I guess is not optimal and far from simple, but it's working.

My first attempt was to use asynchronous versions of the AWS SDK APIs with the logback provided executor, because with internal executor, the interrupt problem could be avoided. But this didn't work out because the work queues are shared, and in this case the queue must be appender specific to allow stopping it correctly. So I needed to use own executor with each appender.

First I needed an executor for the AWS clients. The catch with the executor is that the provided thread factory must create daemon threads, otherwise it will block indefinitely if the logback's JVM shutdown hook is used.

public static ExecutorService newExecutor(Appender<?> appender, int threadPoolSize) {
    final String name = appender.getName();
    return Executors.newFixedThreadPool(threadPoolSize, new ThreadFactory() {

        private final AtomicInteger idx = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(name + "-" + idx.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    });
}

The next issue was how to stop the appender correctly with the interrupt? This required handling interrupted exception with a retry, because the executor would otherwise skip waiting for the queue flush.

public static void shutdown(Appender<?> appender, ExecutorService executor, long waitMillis) {
    executor.shutdown();
    boolean completed = awaitTermination(appender, executor, waitMillis);
    if (!completed) {
        appender.addWarn(format("Executor for %s did not shut down in %d milliseconds, " +
                                "logging events might have been discarded",
                                appender.getName(), waitMillis));
    }
}

private static boolean awaitTermination(Appender<?> appender, ExecutorService executor, long waitMillis) {
    long started = System.currentTimeMillis();
    try {
        return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ie1) {
        // the worker loop is stopped by interrupt, but the remaining queue should still be handled
        long waited = System.currentTimeMillis() - started;
        if (waited < waitMillis) {
            try {
                return executor.awaitTermination(waitMillis - waited, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ie2) {
                appender.addError(format("Shut down of executor for %s was interrupted",
                                         appender.getName()));
            }
        }
        Thread.currentThread().interrupt();
    }
    return false;
}

The normal logback appenders are expected to work in syncronous manner and therefore shouldn't lose logging events even without a proper shutdown hook. This is a problem with the current async AWS SDK API calls. I decided to use countdown latch to provide a blocking appender behavior.

public class LoggingEventHandler<REQUEST extends AmazonWebServiceRequest, RESULT> implements AsyncHandler<REQUEST, RESULT> {

    private final ContextAware contextAware;
    private final CountDownLatch latch;
    private final String errorMessage;

    public LoggingEventHandler(ContextAware contextAware, CountDownLatch latch, String errorMessage) {
        this.contextAware = contextAware;
        this.latch = latch;
        this.errorMessage = errorMessage;
    }

    @Override
    public void onError(Exception exception) {
        contextAware.addWarn(errorMessage, exception);
        latch.countDown();
    }

    @Override
    public void onSuccess(REQUEST request, RESULT result) {
        latch.countDown();
    }
}

And to handle waiting with the latch.

public static void awaitLatch(Appender<?> appender, CountDownLatch latch, long waitMillis) {
    if (latch.getCount() > 0) {
        try {
            boolean completed = latch.await(waitMillis, TimeUnit.MILLISECONDS);
            if (!completed) {
                appender.addWarn(format("Appender '%s' did not complete sending event in %d milliseconds, " +
                                        "the event might have been lost",
                                        appender.getName(), waitMillis));
            }
        } catch (InterruptedException ex) {
            appender.addWarn(format("Appender '%s' was interrupted, " +
                                    "a logging event might have been lost or shutdown was initiated",
                                    appender.getName()));
            Thread.currentThread().interrupt();
        }
    }
}

And then all bundled together. The following example is simplified version of the real implementation, just showing the relevant parts for this issue.

public class SqsAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

    private AmazonSQSAsyncClient sqs;

    @Override
    public void start() {
        sqs = new AmazonSQSAsyncClient(
                getCredentials(),
                getClientConfiguration(),
                Executors.newFixedThreadPool(getThreadPoolSize())
        );
        super.start();
    }

    @Override
    public void stop() {
        super.stop();
        if (sqs != null) {
            AppenderExecutors.shutdown(this, sqs.getExecutorService(), getMaxFlushTime());
            sqs.shutdown();
            sqs = null;
        }
    }

    @Override
    protected void append(final ILoggingEvent eventObject) {
        SendMessageRequest request = ...
        CountDownLatch latch = new CountDownLatch(1);
        sqs.sendMessageAsync(request, new LoggingEventHandler<SendMessageRequest, SendMessageResult>(this, latch, "Error"));
        AppenderExecutors.awaitLatch(this, latch, getMaxFlushTime());
    }
}

All this was required to handle the following cases properly:

  • Flush remaining event queue on logback context stop or shutdown hook when async appender wrapper is used
  • Do not block indefinitely when logback's delayed shutdown hook is used
  • Provide blocking behavior when async appender is not used
  • Survive from the interrupt of async appender stop which caused all AWS SDK stream implementations to interrupt

The above is used in the open source project Logback extensions, which I am maintainer of.