Akka-http documentation says:
Apart from regarding a socket bound on the server-side as a Source[IncomingConnection] and each connection as a Source[HttpRequest] with a Sink[HttpResponse]
Assume we get the merged source containing incoming connections from many Source[IncomingConnection].
Then, assume, we get Source[HttpRequest] from Source[IncomingConnection] (see the code below).
Then, no problem, we can provide a flow to convert HttpRequest to HttpResponse.
And here is the problem - how can we properly Sink the responses ? How can we join the responses to connections?
The whole idea behind the use case is the possibility to prioritize the processing of incoming requests from different connections. Should be useful in many cases I guess...
Thanks in advance!
Edit: Solution based on the answer from @RamonJRomeroyVigil:
Server code:
val in1 = Http().bind(interface = "localhost", port = 8200)
val in2 = Http().bind(interface = "localhost", port = 8201)
val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val merge = b.add(Merge[IncomingConnection](2))
in1 ~> print("in1") ~> merge.in(0)
in2 ~> print("in2") ~> merge.in(1)
SourceShape(merge.out)
})
val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
connSrc.flatMapConcat { conn =>
Source.empty[HttpResponse]
.via(conn.flow)
.map(request => (request, conn))
}
val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] =
Flow[(HttpRequest, IncomingConnection)].map{
case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, _), conn: IncomingConnection) =>
println(s"${System.currentTimeMillis()}: " +
s"process request from ${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
(HttpResponse(entity = "pong"), conn)
}
reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
Source.single(resp).via(conn.flow).runWith(Sink.ignore)
}).run()
def print(prefix: String) = Flow[IncomingConnection].map { s =>
println(s"$prefix [ ${System.currentTimeMillis()} ]: ${s.remoteAddress}"); s
}
So, I am using curl from console and see the following:
% curl http://localhost:8200/ping
curl: (52) Empty reply from server
Second curl request fails:
% curl http://localhost:8200/ping
curl: (7) Failed to connect to localhost port 8200: Connection refused
On the server console I see the following when sending 1st request:
in1 [ 1450287301512 ]: /127.0.0.1:52461
1450287301626: process request from localhost:52461
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
And nothing when sending 2nd request.
So, it looks like there is some problem with the internal connection flow (as stated @RamonJRomeroyVigil) or with something else...
Basically the code does not work.
Still investigating the problem.
The below solution is based on further information provided in the question comments.
Given
The
flatMapConcat
method solves the specific question stated:This provides a Source of
(HttpRequest, IncomingConnection)
tuples.Assuming you have a processing step that converts requests to respones
You can send a response back to the client:
Warning: This solution calls
conn.flow
twice: once to create a flow that generates requests and again to create a flow to send responses to. I do not know if this type of use-case will break something within theIncomingConnection
logic.