Spring Boot SSL TCPClient ~ StompBrokerRelayMessag

2019-03-27 21:20发布

I'm attempting to build a websocket messaging app based on the Spring Websocket Demo running ActiveMQ as the STOMP message broker with Undertow. The application runs fine on insecure connections. However, I'm having difficulty configuring the STOMP Broker Relay to forward with SSL connections.

As mentioned in the Spring WebSocket Docs...

The "STOMP broker relay" in the above configuration is a Spring MessageHandler that handles messages by forwarding them to an external message broker. To do so it establishes TCP connections to the broker, forwards all messages to it, and then forwards all messages received from the broker to clients through their WebSocket sessions. Essentially it acts as a "relay" that forwards messages in both directions.

Further, the docs state a dependency on reactor-net which I have...

Please add a dependency on org.projectreactor:reactor-net for TCP connection management.

The issue is that my current implementation doesn't initialize the NettyTCPClient via SSL so the ActiveMQ connection fails with an SSLException.


[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED: 
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] » 
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...

As such I've attempted to research the Project Reactor Docs to set SSL options for the connection but I haven't been successful.

At this point I've found the StompBrokerRelayMessageHandler initializes the NettyTCPClient by default in Reactor2TcpClient yet, it doesn't appear to configurable.

Assistance would be greatly appreciated.

SSCCE


app.props

spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx

WebSocketConfig

//... 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    private final static String KEYSTORE = "/activemq.jks";
    private final static String KEYSTORE_PASS = "xxx";
    private final static String KEYSTORE_TYPE = "JKS";
    private final static String TRUSTSTORE = "/activemq_certs.jks";
    private final static String TRUSTSTORE_PASS = "xxx";

    private static String getBindLocation() {
        return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public SslBrokerService activeMQBroker() throws Exception {

        final SslBrokerService service = new SslBrokerService();
        service.setPersistent(false);

        KeyManager[] km = SecurityManager.getKeyManager();
        TrustManager[] tm = SecurityManager.getTrustManager();

        service.addSslConnector(getBindLocation(), km, tm, null);
        final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
        service.setDestinations(new ActiveMQDestination[]{topic});

        return service;
    }


    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/welcome").withSockJS();
        registry.addEndpoint("/test").withSockJS();
    }

   private static class SecurityManager { 
   //elided...
   }

}

SOLVED Per Rossens Advice. Here's the implementation details for anyone interested.


WebSocketConfig

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
    ...
    @Bean
    public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
      StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
      ConfigurationReader reader = new StompClientDispatcherConfigReader();
      Environment environment = new Environment(reader).assignErrorJournal();
      TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
      handler.setTcpClient(client);
      return handler;
    }
}

StompTCPClientSpecFactory

private static class StompTcpClientSpecFactory
        implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);

    private final String host;
    private final int port;
    private final String KEYSTORE = "src/main/resources/tcpclient.jks";
    private final String KEYSTORE_PASS = "xxx";
    private final String KEYSTORE_TYPE = "JKS";
    private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
    private final String TRUSTSTORE_PASS = "xxx";
    private final String TRUSTSTORE_TYPE = "JKS";
    private final Environment environment;

    private final SecurityManager tcpManager = new SecurityManager
            .SSLBuilder(KEYSTORE, KEYSTORE_PASS)
            .keyStoreType(KEYSTORE_TYPE)
            .trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
            .trustStoreType(TRUSTSTORE_TYPE)
            .build();

    public StompTcpClientSpecFactory(Environment environment, String host, int port) {
        this.environment = environment;
        this.host = host;
        this.port = port;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
            Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {

        return tcpClientSpec
                .ssl(new SslOptions()
                        .sslProtocol("TLS")
                        .keystoreFile(tcpManager.getKeyStore())
                        .keystorePasswd(tcpManager.getKeyStorePass())
                        .trustManagers(tcpManager::getTrustManager)
                        .trustManagerPasswd(tcpManager.getTrustStorePass()))
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .env(this.environment)
                .dispatcher(this.environment.getCachedDispatchers("StompClient").get())
                .connect(this.host, this.port);
    }
}

3条回答
兄弟一词,经得起流年.
2楼-- · 2019-03-27 21:38

The StompBrokerRelayMessageHandler has a tcpClient property you can set. However it looks like we don't expose that through the WebSocketMessageBrokerConfigurer setup.

You can remove @EnableWebSocketMessageBroker and extend DelegatingWebSocketMessageBrokerConfiguration instead. It's effectively the same but you're now extending directly from the configuration class that provides all the beans.

This allows you to then override the stompBrokerRelayMessageHandler() bean and set its TcpClient property directly. Just make sure the overriding method is marked with @Bean.

查看更多
干净又极端
3楼-- · 2019-03-27 21:41

I needed to secure a STOMP broker relay to RabbitMQ using Spring Messaging 4.2.5 with Java 8 and found that the question's follow-up code had become outdated.

When launching my application, I provide truststore environment properties to trust an internal self-signed certificate authority. java -Djavax.net.ssl.trustStore=/etc/pki/java/server.jks -Djavax.net.ssl.trustStorePassword=xxxxx -jar build/libs/server.war

Per Rossen's answer, I changed

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

to

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {

Then, in that WebSocketConfig, I provided my own AbstractBrokerMessageHandler bean:

@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
    AbstractBrokerMessageHandler handler = super.stompBrokerRelayMessageHandler();
    if (handler instanceof StompBrokerRelayMessageHandler) {
        ((StompBrokerRelayMessageHandler) handler).setTcpClient(new Reactor2TcpClient<>(
                new StompTcpFactory("127.0.0.1", 61614, true)
        ));
    }
    return handler;
}

The instanceof conditional was to simplify use of a NoOpBrokerMessageHandler in unit tests.

And finally, the following is the implementation of the StompTcpFactory used above:

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
    private final String host;
    private final int port;
    private final boolean ssl;

    public StompTcpFactory(String host, int port, boolean ssl) {
        this.host = host;
        this.port = port;
        this.ssl = ssl;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
        return tcpClientSpec
                .env(environment)
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .ssl(ssl ? new SslOptions() : null)
                .connect(host, port);
    }

    private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
        @Override
        public ReactorConfiguration read() {
            return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
        }
    }

}
查看更多
女痞
4楼-- · 2019-03-27 21:51

@amoebob answer is great but threads aren't close properly. Each time a connexion from client is open, a new thread is open and never closed. I discover this issue in production and spend few days to resolve it. So I suggest you to change StompTcpFactory to improve threads reuse :

import io.netty.channel.EventLoopGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.Reactor2StompCodec;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompEncoder;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import reactor.Environment;
import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

  private final Environment environment;
  private final EventLoopGroup eventLoopGroup;
  private final String host;
  private final int port;
  private final boolean ssl;

  public StompTcpFactory(String host, int port, boolean ssl) {
    this.host = host;
    this.port = port;
    this.ssl = ssl;
    this.environment = new Environment(() -> new ReactorConfiguration(emptyList(), "sync", new Properties()));
    this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
  }

  @Override
  public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
    return tcpClientSpec
            .env(environment)
            .options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
            .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
            .ssl(ssl ? new SslOptions() : null)
            .connect(host, port);
  }

}
查看更多
登录 后发表回答