How to create a custom source app for spring-cloud

2019-07-21 19:58发布

I want to create a web-socket source (for spring-cloud-stream-app-starters) which is currently not available on git hub.

I went through some of the available sources but had some confusions, may be because I'm not familiar with the framework.

Can I just create a spring boot application with Source binding and return the received packets from web-socket clients in an @InboundChannelAdapter(value = Source.OUTPUT) annotated method. ? Also how can I use WebSocketInboundChannelAdapter to start a websocket server and push the packets to the underlying broker.?

2条回答
姐就是有狂的资本
2楼-- · 2019-07-21 20:59

You can get some ideas in the Reference Manual.

The WebSocketInboundChannelAdapter is an event-driven channel adapter, it's not pollable source. So, what you need is just a @Bean for this one and an appropriate reference to the Source.OUTPUT.

The WebSocketInboundChannelAdapter doesn't start server. That is responsibility of the:

/**
 * The {@link IntegrationWebSocketContainer} implementation for the {@code server}
 * {@link org.springframework.web.socket.WebSocketHandler} registration.
 * <p>
 * Registers an internal {@code IntegrationWebSocketContainer.IntegrationWebSocketHandler}
 * for provided {@link #paths} with the {@link WebSocketHandlerRegistry}.
 * <p>
 * The real registration is based on Spring Web-Socket infrastructure via {@link WebSocketConfigurer}
 * implementation of this class.
 *
 * @author Artem Bilan
 * @author Gary Russell
 * @since 4.1
 */
public class ServerWebSocketContainer extends IntegrationWebSocketContainer
        implements WebSocketConfigurer, SmartLifecycle {

We have a documentation on the matter as well.

There is also a stomp-chat sample to demonstrate the server behavior.

I think you don't need "underlying broker" in this kind of source application: you just receive messages over web socket and publish them to the Source.OUTPUT. Why do you need STOMP broker here?

UPDATE

Have just tested this code against Rabbit Binder:

@SpringBootApplication
@EnableBinding(Source.class)
public class CloudStreamWebSocketSourceApplication {

    @Bean
    public WebSocketInboundChannelAdapter webSocketInboundChannelAdapter() {
        WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
                new WebSocketInboundChannelAdapter(serverWebSocketContainer());
        webSocketInboundChannelAdapter.setOutputChannelName(Source.OUTPUT);
        return webSocketInboundChannelAdapter;
    }

    @Bean
    public IntegrationWebSocketContainer serverWebSocketContainer() {
        return new ServerWebSocketContainer("/test")
                .withSockJs()
                .setAllowedOrigins("*");
    }

    public static void main(String[] args) throws IOException {
        SpringApplication.run(CloudStreamWebSocketSourceApplication.class, args);
        System.out.println("Done");
    }

}

My test-case is like:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class CloudStreamWebSocketSourceApplicationTests {

    @LocalServerPort
    private String port;

    @Test
    public void testWebSocketStreamSource() throws IOException, InterruptedException {
        StandardWebSocketClient webSocketClient = new StandardWebSocketClient();

        ClientWebSocketContainer clientWebSocketContainer =
                new ClientWebSocketContainer(webSocketClient, "ws://localhost:" + this.port + "/test/websocket");
        clientWebSocketContainer.start();

        WebSocketSession session = clientWebSocketContainer.getSession(null);

        session.sendMessage(new TextMessage("foo"));

        Thread.sleep(10000);
    }

}

This is my dependencies:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
查看更多
可以哭但决不认输i
3楼-- · 2019-07-21 21:03

@Krishas I would definitely look at the available sources, find the one that closely resembles what you're trying to accomplish and model your new source after it. In general what you are suggesting is correct. . . it should be Spring boot app annotated with @Source. But of course the devil is in the details. So what I would suggest is to create a PR so we can all review it and help you bring it to the state where we can include it in the pool of available starters.

查看更多
登录 后发表回答