JMS-Websocket - delayed message delivery

2019-08-17 15:18发布

问题:

This application receives & forwards messages from database events to client applications. Messages are immediately delivered when the client browser has a web socket session.

However, when no web socket session exists and a message is sent by the JMSProducer into the Destination "jms/notificationQueue" in QueueSenderSessionBean, the message is immediately consumed in NotificationEndpoint. This is not my intent.

My intent is for the queue to retain the message until the user connects to NotificationEndpoint. If the user is not connected to the NotificationEndpoint, I think there should be no instance of NotificationEndpoint created to receive the message.

How do I delay the JMSConsumer consuming the message from the queue?

Overview - TomEE Plus 8.0.0-M1 project

  1. Application receives notification in a NotificationServlet HttpServletRequest
  2. String message is put into JMS Queue by QueueSenderSessionBean injected into NotificationServlet
  3. NotificationMessageDrivenBean implements MessageListener to listen to the JMS Queue
  4. An Event annotated with @NotificationServletJMSMessage is fired from NotificationMessageDrivenBean for an Observer in NotificationEndpoint method onJMSMessage.
  5. NotificationEndpoint uses PushContext which gathers all websocket sessions to deliver the message to the user
  6. In PushContext.send, if any websocket sessions with a user uuid property matching the message user uuid property, the message is delivered to each websocket session.

My understanding of @ServerEndpoint is that "each new WS session gets its own instance." Notify only specific user(s) through WebSockets, when something is modified in the database

Sources: the above link from https://stackoverflow.com/users/157882/balusc and https://blogs.oracle.com/theaquarium/integrating-websockets-and-jms-with-cdi-events-in-java-ee-7-v2

WEB-INF/resources.xml

<?xml version="1.0" encoding="UTF-8"?>
<resources>
    <Resource id="jmsConnectionFactory" type="javax.jms.ConnectionFactory">
        connectionMaxIdleTime = 15 Minutes
        connectionMaxWaitTime = 5 seconds
        poolMaxSize = 10
        poolMinSize = 0
        resourceAdapter = Default JMS Resource Adapter
        transactionSupport = xa
    </Resource>
</resources>

NotificationServlet.java

import java.io.IOException;
import java.util.UUID;

import javax.annotation.Resource;
import javax.faces.context.FacesContext;
import javax.inject.Inject;
import javax.jms.Queue;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet("/notifications")
public class NotificationServlet extends HttpServlet
{
    @Resource(name = "jms/notificationQueue")
    private Queue _notificationQueue;

    @Inject
    private QueueSenderSessionBean _queueSessionSenderBean;

    @Override
    protected void doGet(HttpServletRequest request, 
            HttpServletResponse response) 
        throws ServletException, 
        IOException
    {
        try
        {
            String notificationJson =
                    extractNotificationJson(request);
            if (notificationJson != null)
            {
                _queueSessionSenderBean.sendMessage(
                        "notification=" 
                                + notificationJson);                
            }

        }
        catch (Exception e)
        {
            e.printStackTrace();
            // handle exception
        }
    }

    public String extractNotificationJson(HttpServletRequest request) 
            throws IOException
    {
        if(request.getParameter("notification") != null)
        {
            String[] notificationString = 
                    request.getParameterValues("notification");
            return notificationString[0];
        }
        return null;       
    }
}

QueueSenderSessionBean.java

import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.Stateless;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.DeliveryMode;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;

import org.json.JSONObject;

@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
    @Resource(mappedName = "jms/notificationQueue")
    private Queue _notificationQueue;

    @Inject
    @JMSConnectionFactory("jmsConnectionFactory")
    private JMSContext _jmsContext; 

    // Static Methods

    // Member Methods
    public void sendMessage(String message) 
    {
        try
        {        
            JMSProducer messageProducer =
                _jmsContext.createProducer();
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

            String userProperty = "someValue";

            TextMessage textMessage = _jmsContext.createTextMessage(message);
            textMessage.setStringProperty("userProperty", userProperty);            
            messageProducer.send(_notificationQueue, textMessage);

        }
        catch (JMSException e)
        {
            e.printStackTrace();
            // handle jms exception
        }
    }
}

Qualifier NotificationServletJMSMessage.java

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.inject.Qualifier;

    @Qualifier
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
    public @interface NotificationServletJMSMessage
    {

    }

NotificationMessageDrivenBean.java

import javax.ejb.MessageDriven;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.Message;
import javax.jms.MessageListener;

@Named
@MessageDriven(mappedName = "jms/notificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
    @Inject
    @NotificationServletJMSMessage
    Event<Message> jmsEvent;

    @Override
    public void onMessage(Message message)
    {
        jmsEvent.fire(message);
    }
}

PushContext.java

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.websocket.Session;

@ApplicationScoped
public class PushContext
{
    @Inject 
    private JMSContext _jmsContext; 

    @Resource(mappedName = "jms/notificationQueue")
    private Queue _notificationQueue;

    private Map<String, Set<Session>> _sessions;

    @PostConstruct 
    public void init()
    {
        _sessions = new ConcurrentHashMap<>();
    }

    public void add(Session session, String userUuid)
    {
        _sessions.computeIfAbsent(userUuid, 
                value -> ConcurrentHashMap.newKeySet()).add(session);
    }

    void remove(Session session)
    {
        _sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
    }

    public void send(Set<String> userUuids, Message message) throws JMSException 
    {
        String userUuid = message.getStringProperty("userUuid");
        userUuids.add(userUuid);

        Set<Session> userSessions;

        synchronized(_sessions) 
        {
            userSessions = _sessions.entrySet().stream()
                .filter(e -> userUuids.contains(e.getKey()))
                .flatMap(e -> e.getValue().stream())
                .collect(Collectors.toSet());
        }
        for (Session userSession : userSessions) 
        {
            if (userSession.isOpen()) 
            {
                userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
            }
        }
    }

    public void removeSession(Session session)
    {
        String userUuid = (String)session.getUserProperties().get("userUuid");
        _sessions.remove(userUuid, session);
    }
}

NotificationEndpoint.java

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint 
{
    private static final Set<Session> SESSIONS =
            Collections.synchronizedSet(new HashSet<Session>()); 
    private QueueSenderSessionBean _senderBean;

    @Inject
    private PushContext _pushContext;

    @Inject
    public NotificationEndpoint(QueueSenderSessionBean senderBean)
    { 
        _senderBean = senderBean;
    }

    @OnOpen
    public void onOpen(Session session,
            EndpointConfig configurator,
            @PathParam(value = "tokenId") String userUuidString) 
    {
        session.getUserProperties().put("userUuid", userUuidString);        
        _pushContext.add(session, userUuidString);
    }


    @OnMessage
    public void onMessage(String message, Session session) 
            throws IOException 
    {
        System.out.println("Message received: " + message);
        _senderBean.sendMessage(message);
    }

    @OnClose
    public void onClose(CloseReason reason, Session session) 
    {
        System.out.println(
                "Closing 'notificatioEndpoint due to " 
                + reason.getReasonPhrase());
        try
        {
            session.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        _pushContext.removeSession(session);
    }

    @OnError
    public void error(Session session, Throwable t) 
    {
       t.printStackTrace();
    }

    public static void sendToAllClients(String message) 
    {
        synchronized (SESSIONS) 
        {
            for (Session session : SESSIONS) 
            {
                if (session.isOpen()) 
                {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }

    public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message) 
    {
        Set<String> userUuids = new HashSet<String>();

        try 
        {
            _pushContext.send(userUuids, message);
        } 
        catch (JMSException ex) 
        {
            ex.printStackTrace();
            Logger.getLogger(NotificationEndpoint.class.getName()).
            log(Level.SEVERE, null, ex);
        }     
    }
}

Thank you, Ted S

回答1:

Delayed message delivery accomplished inspired by the solution here.

The solution was using a local queue to hold messages if the user was not connected to the web socket, then when connected, moving messages from the local queue to a remote queue which gets received/consumed immediately using the MessageDrivenBean.

Also, instead of listening for messages from the database (Postgresql) with a Web Servlet, I changed my DB trigger to NOTIFY and started an asynchronous listener using the pgjdbc-ng driver and the Postgresql LISTEN/NOTIFY pattern described here.

NotificationListener.java

@Stateless
public class NotificationListener extends Thread
{
    @Inject
    private QueueSenderSessionBean _queueSessionSenderBean;

    @Override
    public void run()
    {
        listenToNotifications();
    }

    public void listenToNotifications()
    {
        PGNotificationListener listener = new PGNotificationListener()
                {
                    public void notification(int processId, String channelName, String payload)
                    {
                        System.out.println("Received notification from: "
                                + channelName + ", "
                                + payload);
                        _queueSessionSenderBean.sendMessage(payload);
                    }
                };
            PGDataSource dataSource = new PGDataSource();
            dataSource.setHost("localhost");
            dataSource.setDatabase("db");
            dataSource.setPort(5432);
            dataSource.setUser("user");
            dataSource.setPassword("pass");
        try(PGConnection connection =
                (PGConnection) dataSource.getConnection())
        {
            Statement statement = connection.createStatement();
            statement.execute("LISTEN notifications");
            statement.close();
            connection.addNotificationListener(listener);
            while (true)
            {
                if (Thread.currentThread().isInterrupted())
                {
                    break;
                } 
            }
        }
        catch (Exception e)
        {
            // TODO: handle exception
            e.printStackTrace();
        }
    }   
}

NotificationStarter.java

@Singleton
@Startup
public class NotificationsStarter
{
    @EJB
    private NotificationListener _listener;

    @PostConstruct
    public void startListener()
    {
        _listener.start();
    }

    @PreDestroy
    public void shutdown()
    {
        _listener.interrupt();
    }
}

PushContext.java

@ApplicationScoped
public class PushContext
{

    @Resource(mappedName = "jms/localNotificationQueue")
    private Queue _localNotificationQueue;

    @Resource(mappedName = "jms/remoteNotificationQueue")
    private Queue _remoteNotificationQueue;

    private Map<String, Set<Session>> _sessions;

    @PostConstruct 
    public void init()
    {
        _sessions = new ConcurrentHashMap<>();
    }

    public void add(Session session, String userUuid)
    {
        _sessions.computeIfAbsent(userUuid, 
                value -> ConcurrentHashMap.newKeySet()).add(session);
    }

    void remove(Session session)
    {
        _sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
    }

    public void send(Set<String> userUuids, Message message) throws JMSException 
    {
        String userUuid = message.getStringProperty("userUuid");
        userUuids.add(userUuid);

        Set<Session> userSessions;

        synchronized(_sessions) 
        {
            userSessions = _sessions.entrySet().stream()
                .filter(e -> userUuids.contains(e.getKey()))
                .flatMap(e -> e.getValue().stream())
                .collect(Collectors.toSet());
            for (Session userSession : userSessions) 
            {
                if (userSession.isOpen()) 
                {
                    userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
                }
            }
        }
    }

    public void removeSession(Session session)
    {
        String userUuid = (String)session.getUserProperties().get("userUuid");
        _sessions.remove(userUuid, session);
    }

    public Boolean userHasWebSocketSession(String userUuid)
    {
        Boolean sessionOpen = false;

        Set<String> userUuids = new HashSet<String>();
        userUuids.add(userUuid);

        Set<Session> userSessions;

        synchronized(_sessions) 
        {
            userSessions = _sessions.entrySet().stream()
                .filter(e -> userUuids.contains(e.getKey()))
                .flatMap(e -> e.getValue().stream())
                .collect(Collectors.toSet());
        }
        for (Session userSession : userSessions) 
        {
            if (userSession.isOpen()) 
            {
                sessionOpen = true;
                break;
            }
        }
        return sessionOpen;
    }
}

QueueSenderSessionBean.java

@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
    @Resource(mappedName = "jms/localNotificationQueue")
    private Queue _localNotificationQueue;

    @Resource(mappedName = "jms/remoteNotificationQueue")
    private Queue _remoteNotificationQueue;

    @Inject
    @JMSConnectionFactory("jmsConnectionFactory")
    private JMSContext _jmsContext; 

    @Inject
    PushContext _pushContext;

    public void sendMessage(String message) 
    {
        JMSProducer messageProducer =
                _jmsContext.createProducer();
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        try
        {        
            String userProperty = "someValue";

            TextMessage textMessage = _jmsContext.createTextMessage(message);
            textMessage.setStringProperty("userProperty", userProperty ); 
            Boolean userIsConnected = 
                    _pushContext.userHasWebSocketSession(userUuid);
            if (!userIsConnected)
            {
                messageProducer.send(_localNotificationQueue, textMessage);
            }
            else
            {
                messageProducer.send(_remoteNotificationQueue, textMessage);
            }
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }    
}

NotificationMessageDrivenBean.java is now listening to only the remote queue

@Named
@MessageDriven(mappedName = "jms/remoteNotificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
    @Inject
    @NotificationServletJMSMessage
    Event<Message> jmsEvent;

    @Override
    public void onMessage(Message message)
    {
        jmsEvent.fire(message);
    }
}

New QueueReceiverSessionBean.java is used to receive/consume messages from the localNotificationQueue and places them in the remoteNotificationQueue whenever the user connects to NotificationEndpoint web socket.

@Named
@LocalBean
@Stateless
public class QueueReceiverSessionBean
{
    @Resource(mappedName = "jms/localNotificationQueue")
    private Queue _localNotificationQueue;

    @Resource(mappedName = "jms/remoteNotificationQueue")
    private Queue _remoteNotificationQueue;

    @Inject
    @JMSConnectionFactory("jmsConnectionFactory")
    private JMSContext _jmsContext; 

    public void receiveQueuedMessages(String userUuidString) throws JMSException
    {
        Set<String> userUuids =
                new HashSet<String>();
        userUuids.add(userUuidString);

        JMSConsumer messageConsumer = 
                _jmsContext.createConsumer(_localNotificationQueue,
                        "userProperty='someValue'",
                        true);

        JMSProducer messageProducer =
                _jmsContext.createProducer();

        Message localMessage =
                messageConsumer.receive(10);
        while(localMessage != null)
        {
            TextMessage textMessage = 
                    _jmsContext.createTextMessage(((TextMessage) localMessage).getText());
            textMessage.setStringProperty("userUuid", userUuidString);            
            messageProducer.send(_remoteNotificationQueue, textMessage);
            localMessage.acknowledge();
            localMessage =
                    messageConsumer.receive(10);
        } 
        messageConsumer.close();
    }

    public void sendMessage(String message) 
    {
        JMSProducer messageProducer =
                _jmsContext.createProducer();
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        try
        {        
            if (message.startsWith("notification"))
            {
                String messageJson = message.substring(message.indexOf("=") + 1);
                JSONObject notificationJson =
                        new JSONObject(messageJson);
                String userUuid = notificationJson.getString("receivinguseruuid");

                TextMessage textMessage = _jmsContext.createTextMessage(message);
                textMessage.setStringProperty("userUuid", userUuid);            
                messageProducer.send(_remoteNotificationQueue, textMessage);
            }
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }
}

NotificationEndpoint.java

@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint implements Serializable
{

    private static final long serialVersionUID = 1L;
    private static final Set<Session> SESSIONS =
            Collections.synchronizedSet(new HashSet<Session>()); 
    private QueueReceiverSessionBean _senderBean;

    @Inject
    private PushContext _pushContext;

    @Inject
    public NotificationEndpoint(QueueReceiverSessionBean senderBean)
    { 
        _senderBean = senderBean;
    }

    @OnOpen
    public void onOpen(Session session,
            EndpointConfig configurator,
            @PathParam(value = "tokenId") String userUuidString) 
    {
        session.getUserProperties().put("userUuid", userUuidString );        
        _pushContext.add(session, userUuidString);
        try
        {
            _senderBean.receiveQueuedMessages(userUuidString);
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }


    @OnMessage
    public void onMessage(String message, Session session) 
            throws IOException 
    {
        _senderBean.sendMessage(message);
    }

    @OnClose
    public void onClose(CloseReason reason, Session session) 
    {
        try
        {
            session.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        _pushContext.removeSession(session);
    }

    @OnError
    public void error(Session session, Throwable t) 
    {
       t.printStackTrace();
    }

    public static void sendToAllClients(String message) 
    {
        synchronized (SESSIONS) 
        {
            for (Session session : SESSIONS) 
            {
                if (session.isOpen()) 
                {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }

    public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message) 
    {
        Set<String> userUuids = new HashSet<String>();

        try 
        {
            _pushContext.send(userUuids, message);
        } 
        catch (JMSException ex) 
        {
            ex.printStackTrace();
            Logger.getLogger(NotificationEndpoint.class.getName()).
            log(Level.SEVERE, null, ex);
        }     
    }    
}

Note: This code was used in the TomEE 8.0 container. Injecting JMSContext into the EJBs uncovered a bug in TomEE where the container fails to release the JMSConnection resource. Issue has been added to TomEE issues tracker