I'm still learning RxJava. What is the best way to use a stream within another stream? Or is it against the principles of reactive programming?
A toy example that I'm trying to write includes a TCP client and a server that sends back capitalized input. I'd like to take input from standard input, send it to the server and print out everything received by both the client and the server.
The following is the expected output from the program:
(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH
I was able to achieve this using three observables:
stdinStream
that emits strings from the standard input,serverStream
that emits strings the server receivesclientStream
that emits strings the client receives.
and then subscribe the inputStream
from within the creation of clientStream
, like so:
private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
return Observable.create(sub -> {
try (Socket socket = new Socket(host, port);
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
PrintWriter outWriter = new PrintWriter(outputStream, true);
) {
inputStream.subscribe(line -> {
outWriter.println(line);
try {
sub.onNext(inFromServer.readLine());
} catch (IOException e) {
sub.onError(e);
}
});
} catch (UnknownHostException e) {
sub.onError(e);
} catch (IOException e) {
sub.onError(e);
}
});
}
Note: I don't want to create multiple clients and would rather keep a single client running and instruct it to send different values to the server based on the input. So, the approach of mapping the input to a new clientStream
is NOT desired:
stdinStream.map(line -> createClientStream(line))
So my questions are:
- Is this a sane way to use RxJava? Are there better alternatives?
- I created client socket as part of the creation of
clientStream
. I did this so that I can easily run it asynchronously using schedulers,clientStream.scheduleOn(Schedulers.newThread)
. Maybe I should do it differently given my single-client requirement?
Here is the complete code: https://gist.github.com/lintonye/25af58abdfcc688ad3c3
What you need is
using
. Put all the socket related objects into aConnection
class and given the input sequence, map it to a pair of println/readLine while maintaining a single connection. Here is a gist for a runnable example.