I am trying to combine AsyncHttpClient and Scalaz Task together. Normally, if I am using AsyncHttpClient, I can invoke client.close to stop the client.
val asyncHttpClient = new AsyncHttpClient()
println(asyncHttpClient.prepareGet("http://www.google.com"))
asyncHttpClient.close()
So current will be stopped. However, if I wrap the api call into the Task. I dont know how to stop it.
def get(s: String) = Task.async[Int](k => {
asyncHttpClient.prepareGet(s).execute(toHandler)
Thread.sleep(5000)
asyncHttpClient.closeAsynchronously()
} )
def toHandler[A] = new AsyncCompletionHandler[Response] {
def onCompleted(r: Response) = {
println("get response ", r.getResponseBody)
r
}
def onError(e: Throwable) = {
println("some error")
e
}
}
println(get("http://www.google.com").run)
The current Process is still running. I am thinking the reason is that Task and AsynClient are both async. I dont know what I should do to close it
Many thanks in advance
The problem is that Task.async
takes a function that can register callbacks. This is a little confusing, and the types aren't much help because there's so much damn Unit
in there, but what it means here is that you want something more like this:
import com.ning.http.client._
import scalaz.syntax.either._
import scalaz.concurrent.Task
val asyncHttpClient = new AsyncHttpClient()
def get(s: String): Task[Response] = Task.async[Response](callback =>
asyncHttpClient.prepareGet(s).execute(
new AsyncCompletionHandler[Unit] {
def onCompleted(r: Response): Unit = callback(r.right)
def onError(e: Throwable): Unit = callback(e.left)
}
)
)
This doesn't handle closing the client—it's just intended to show the general idea. You could close the client in the handler, but I'd suggest something more like this:
import com.ning.http.client._
import scalaz.syntax.either._
import scalaz.concurrent.Task
def get(client: AsyncHttpClient)(s: String): Task[Response] =
Task.async[Response](callback =>
client.prepareGet(s).execute(
new AsyncCompletionHandler[Unit] {
def onCompleted(r: Response): Unit = callback(r.right)
def onError(e: Throwable): Unit = callback(e.left)
}
)
)
def initClient: Task[AsyncHttpClient] = Task(new AsyncHttpClient())
def closeClient(client: AsyncHttpClient): Task[Unit] = Task(client.close())
And then:
val res = for {
c <- initClient
r <- get(c)("http://www.google.com")
_ <- closeClient(c)
} yield r
res.unsafePerformAsync(
_.fold(
_ => println("some error"),
r => println("get response " + r.getResponseBody)
)
)
This avoids closeAsynchronously
(which seems to be going away, anyway).