RxJava: Feed one stream (Observable) as the input

2019-05-31 19:16发布

问题:

I'm still learning RxJava. What is the best way to use a stream within another stream? Or is it against the principles of reactive programming?

A toy example that I'm trying to write includes a TCP client and a server that sends back capitalized input. I'd like to take input from standard input, send it to the server and print out everything received by both the client and the server.

The following is the expected output from the program:

(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH

I was able to achieve this using three observables:

  • stdinStream that emits strings from the standard input,
  • serverStream that emits strings the server receives
  • clientStream that emits strings the client receives.

and then subscribe the inputStream from within the creation of clientStream, like so:

    private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
    return Observable.create(sub -> {
        try (Socket socket = new Socket(host, port);
             BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
             PrintWriter outWriter = new PrintWriter(outputStream, true);
        ) {
            inputStream.subscribe(line -> {
                outWriter.println(line);
                try {
                    sub.onNext(inFromServer.readLine());
                } catch (IOException e) {
                    sub.onError(e);
                }
            });
        } catch (UnknownHostException e) {
            sub.onError(e);
        } catch (IOException e) {
            sub.onError(e);
        }
    });
}

Note: I don't want to create multiple clients and would rather keep a single client running and instruct it to send different values to the server based on the input. So, the approach of mapping the input to a new clientStream is NOT desired:

stdinStream.map(line -> createClientStream(line))

So my questions are:

  1. Is this a sane way to use RxJava? Are there better alternatives?
  2. I created client socket as part of the creation of clientStream. I did this so that I can easily run it asynchronously using schedulers, clientStream.scheduleOn(Schedulers.newThread). Maybe I should do it differently given my single-client requirement?

Here is the complete code: https://gist.github.com/lintonye/25af58abdfcc688ad3c3

回答1:

What you need is using. Put all the socket related objects into a Connection class and given the input sequence, map it to a pair of println/readLine while maintaining a single connection. Here is a gist for a runnable example.

static class Connection {
    Socket socket;
    BufferedReader inFromServer;
    DataOutputStream outputStream;
    PrintWriter outWriter;

    public Connection(String host, int port) {
        try {
            socket = new Socket(host, port);
            inFromServer = new BufferedReader(
                new InputStreamReader(socket.getInputStream()));
            outputStream = new DataOutputStream(socket.getOutputStream());
            outWriter = new PrintWriter(outputStream, true);
        } catch (IOException ex) {
            Exceptions.propagate(ex);
        }
    }

    public void close() {
        try {
            outWriter.close();
            outputStream.close();
            inFromServer.close();
            socket.close();
        } catch (IOException ex) {
            Exceptions.propagate(ex);
        }
    }
}

public static void main(String[] args) {
    runServer();

    Observable<String> source = Observable.just("a", "b", "c");

    String host = "localhost";
    int port = 8080;

    Observable.<String, Connection>using(() -> new Connection(host, port), 
    conn -> 
        source
        .map(v -> {
            conn.outWriter.println(v);
            try {
                return conn.inFromServer.readLine();
            } catch (IOException ex) {
                throw Exceptions.propagate(ex);
            }
        })
    , Connection::close)
    .subscribe(System.out::println);
}