Jersey SSE - eventOutput.write throws nullpointer

2019-05-10 04:28发布

问题:

I have implemented a Restful web interface using Jersey for sending messages received from an internal JMS publisher to external clients via HTTP. I have managed to get a test message out to a Java client, but the Thread throws a null pointer exception before completing the write() execution, closing the connection and preventing further communication.

Here is my resource class:

@GET
@Path("/stream_data")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents(@Context ServletContext context){
    final EventOutput eventOutput = new EventOutput();
    new Thread( new ObserverThread(eventOutput, (MService) context.getAttribute("instance")) ).start();
    return eventOutput;
}

And here is my thread's run method:

public class ObserverThread implements Observer, Runnable {
   //constructor sets eventOutput & mService objects
   //mService notifyObservers() called when JMS message received
   //text added to Thread's message queue to await sending to client 

   public void run() {
    try {
        String message = "{'symbol':'test','entryType'='0','price'='test'}";
        Thread.sleep(1000);
        OutboundEvent.Builder builder = new OutboundEvent.Builder();
        builder.mediaType(MediaType.APPLICATION_JSON_TYPE);
        builder.data(String.class, message);
        OutboundEvent event = builder.build();
        eventOutput.write(event);
        System.out.println(">>>>>>SSE CLIENT HAS BEEN REGISTERED!");
        mService.addObserver(this);
        while(!eventOutput.isClosed()){
            if(!updatesQ.isEmpty()){
                pushUpdate(updatesQ.dequeue());
            }
        }
        System.out.println("<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!");
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

Here is my client code:

Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
    WebTarget target = client.target(url);
    EventInput eventInput = target.request().get(EventInput.class);
    try {
        while (!eventInput.isClosed()) {
            eventInput.setChunkType(MediaType.WILDCARD_TYPE);
            final InboundEvent inboundEvent = eventInput.read();
            if (inboundEvent != null) {
                String theString = inboundEvent.readData();
                System.out.println(theString + "\n");
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }

I am getting the "{'symbol':'test','entryType'='0','price'='test'}" test message printed to the client console, but the server then prints a NullPointerException before it can print the ">>>>SSE Client registered" message. This closes the connection so the client exits the while loop and stops listening for updates.

I converted the project to a webapp 3.0 version facet in order to add an async-supported tag to the web.xml but i am receiving the same null pointer error. I am inclined to think that it is caused by the servlet ending the Request/Response objects once the first message is returned, evidence is shown in the stack trace:

Exception in thread "Thread-20" java.lang.NullPointerException
at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:741)
at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434)
at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299)
at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:981)
at org.apache.coyote.Response.action(Response.java:183)
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314)
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:98)
at org.glassfish.jersey.message.internal.CommittingOutputStream.flush(CommittingOutputStream.java:292)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:241)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:192)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:242)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:345)
at org.glassfish.jersey.server.ChunkedOutput.flushQueue(ChunkedOutput.java:192)
at org.glassfish.jersey.server.ChunkedOutput.write(ChunkedOutput.java:182)
at com.bpc.services.service.ObserverThread.run(MarketObserverThread.java:32)
at java.lang.Thread.run(Thread.java:745)
<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!

I have attempted to test an sse broadcaster as well. In this case I am not seeing any exceptions thrown, but the connection is closed once the first message has been received, leading me to believe it is something in the servlet forcing the connection to close. Can anyone advise me on how to debug this on the server-side?

回答1:

I had a similar issue from what seems to be a long standing bug in Jersey's @Context injection for ExecutorService instances. In their current implementation of Sse (version 2.27),

class JerseySse implements Sse {

    @Context
    private ExecutorService executorService;

    @Override
    public OutboundSseEvent.Builder newEventBuilder() {
        return new OutboundEvent.Builder();
    }

    @Override
    public SseBroadcaster newBroadcaster() {
        return new JerseySseBroadcaster(executorService);
    }
}

the executorService field is never initialized, so the JerseySseBroadcaster raises a NullPointerException in my case. I worked around the bug by explicitly triggering the injection.

If you're using HK2 for CDI (Jersey's default), a rough sketch of a solution to the question above could look similar to the following:

@Singleton
@Path("...")
public class JmsPublisher {

    private Sse sse;

    private SseBroadcaster broadcaster;

    private final ExecutorService executor;

    private final BlockingQueue<String> jmsMessageQueue;

    ...

    @Context
    public void setSse(Sse sse, ServiceLocator locator) {
        locator.inject(sse); // Inject sse.executorService
        this.sse = sse;
        this.broadcaster = sse.newBroadcaster();
    }

    ...

    @GET
    @Path("/stream_data")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void register(SseEventSink eventSink) {
        broadcaster.register(eventSink);
    }

    ...

    @PostConstruct
    private void postConstruct() {
        executor.submit(() -> {
            try {
                while(true) {
                    String message = jmsMessageQueue.take();
                    broadcaster.broadcast(sse.newEventBuilder()
                        .mediaType(MediaType.APPLICATION_JSON_TYPE)
                        .data(String.class, message)
                        .build());
                }
            } catch(InterruptedException e) {
                Thread.currentThread().interrupt();
            }
       });
    }

    @PreDestroy
    private void preDestroy() {
        executor.shutdownNow();
    }

}