Playframework and Twitter Streaming API

2020-05-03 11:33发布

How to read response data from Twitter Streaming API - POST statuses/filter ? I have established connection and I receive 200 status code, but I don't know how to read tweets. I just want to println tweets as they coming.

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
  if(response.headers.status == 200)
    println(response.body)
} 

EDIT: I found this solution

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response => 
  if(response.headers.status == 200){
    response.body
      .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
      .filter(_.contains("\r\n"))
      .map(json => Try(parse(json).extract[Tweet]))
      .runForeach {
        case Success(tweet) =>
          println("-----")
          println(tweet.text)
        case Failure(e) =>
          println("-----")
          println(e.getStackTrace)
      }
  }
}

2条回答
小情绪 Triste *
2楼-- · 2020-05-03 12:17

calling stream() gives you back a Future[StreamedResponse]. you'll then have to use akka idioms to convert the ByteString chunks within that. something like:

val stream = ws.url(url)
  .sign(OAuthCalculator(consumerKey, requestToken))
  .withMethod("POST")
  .stream()

stream flatMap { res =>
  res.body.runWith(Sink.foreach[ByteString] { bytes =>
    println(bytes.utf8String)
  })
}

note that i didn't test the code above (but it's based off of the streaming response section of https://www.playframework.com/documentation/2.5.x/ScalaWS plus the sink description from http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html)

also note that this will print each chunk on its own line, and i'm not sure if the twitter API gives back complete json blobs per chunk. you may need to use a Sink.fold if you want to accumulate chunks before printing them.

查看更多
Rolldiameter
3楼-- · 2020-05-03 12:18

The body of the response for a streaming WS request is an Akka Streams Source of bytes. Since Twitter Api responses are newline delimited (usually) you can use Framing.delimiter to split them up into byte chunks, parse the chunks to JSON, and do what you want with them. Something like this should work:

import akka.stream.scaladsl.Framing
import scala.util.{Success, Try}
import akka.util.ByteString
import play.api.libs.json.{JsSuccess, Json, Reads}
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken}

case class Tweet(id: Long, text: String)
object Tweet {
  implicit val reads: Reads[Tweet] = Json.reads[Tweet]
}

def twitter = Action.async { implicit request =>
  ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016")
      .sign(OAuthCalculator(consumerKey, requestToken))
      .withMethod("POST")
      .stream().flatMap { response =>
    response.body
      // Split up the byte stream into delimited chunks. Note
      // that the chunks are quite big
      .via(Framing.delimiter(ByteString.fromString("\n"), 20000))
      // Parse the chunks into JSON, and then to a Tweet.
      // A better parsing strategy would be to account for all
      // the different possible responses, but here we just
      // collect those that match a Tweet.
      .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet]))
      .collect {
        case Success(JsSuccess(tweet, _)) => tweet.text
      }
      // Print out each chunk
      .runForeach(println).map { _ =>
        Ok("done")
    }
  }
}

Note: to materialize the stream you'll need to inject an implicit Materializer into your controller.

查看更多
登录 后发表回答