Why don't I see high performance with Reactive

2019-06-02 11:37发布

Why can't I see the high TPS (transactions/second) performance with Reactive Kafa that has been produced by the project's authors?

This code, derived from the benchmark code in the reactive kafka project, is a run of 2M records populated in a single-partition topic. When run I get TPS of about 140K. Not awful, but far short of the 100s of 1000s hoped for.

My largest concern here is this is a only a 1-partition topic, which really isn't a real test case.

case class RunTest4(msgCount: Int, producer: com.foo.Producer, kafkaHost: String, groupId: String, topic: String)(implicit system: ActorSystem) {

  // Pre-populate a topic w/some records (2 million)
  producer.populate(msgCount, topic)
  Thread.sleep(2000)
  partitionInfo(topic)
  val partitionTarget = msgCount - 1

  val settings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(kafkaHost)
    .withGroupId(groupId)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  def consumerAtLeastOnceBatched(batchSize: Int)(implicit mat: Materializer): Unit = {
    val promise = Promise[Unit]
    val control = Consumer.committableSource(settings, Subscriptions.topics(topic))
      .map {
        msg => msg.committableOffset
      }
      .batch(batchSize.toLong, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
        batch.updated(elem)
      }
      .mapAsync(3) { m =>
        m.commitScaladsl().map(_ => m)(ExecutionContexts.sameThreadExecutionContext)
      }
      .toMat(Sink.foreach { batch =>
        if (batch.offsets().head._2 >= partitionTarget)
          promise.complete(Success(()))
      })(Keep.left)
      .run()

    println("Control is: " + control.getClass.getName)
    val now = System.currentTimeMillis()
    Await.result(promise.future, 30.seconds)
    val later = System.currentTimeMillis()
    println("TPS: " + (msgCount / ((later - now) / 1000.0)))
    control.shutdown()

    groupInfo(groupId)
  }

  private def partitionInfo(topic: String) =
    kafka.tools.GetOffsetShell.main(Array("--topic", topic, "--broker-list", kafkaHost, "--time", "-1"))
  private def groupInfo(group: String) =
    kafka.admin.ConsumerGroupCommand.main(Array("--describe", "--group", group, "--bootstrap-server", kafkaHost, "--new-consumer"))

}

This test is (I hope) a good way to handle multiple partitions per topic--a much more realistic situation. When I run this with a batch size of 10,000 and a topic w/2M records populated across 4 topic partitions my test times out with a wait of 30 seconds, meaning whenever it finished it would have had TPS of <67K (2M/30)... not great really. (This test will succeed with a smaller record population, but that's not the test!)

(For reference, my LateKafka project (produces a Source), which admittedly is skeletal, hits above 300K TPS for the same test, and using a native KafkaConsumer is around 500K on my laptop.)

case class RunTest3(msgCount: Int, producer: com.foo.Producer, kafkaHost: String, groupId: String, topic: String)(implicit system: ActorSystem) {

  // Pre-populate a topic w/some records (2 million)
  producer.populate(msgCount, topic)
  Thread.sleep(2000)
  partitionInfo(topic)
  val partitionTarget = msgCount - 1

  val settings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(kafkaHost)
    .withGroupId(groupId)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  def consumerAtLeastOnceBatched(batchSize: Int)(implicit mat: Materializer): Unit = {
    val promise = Promise[Unit]
    val control = Consumer.committablePartitionedSource(settings, Subscriptions.topics(topic))
      .flatMapMerge(4, _._2)
      .map {
        msg => msg.committableOffset
      }
      .batch(batchSize.toLong, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
        batch.updated(elem)
      }
      .mapAsync(3) { m =>
        m.commitScaladsl().map(_ => m)(ExecutionContexts.sameThreadExecutionContext)
      }
      .toMat(Sink.foreach { batch =>
        if (batch.offsets().head._2 >= partitionTarget)
          promise.complete(Success(()))
      })(Keep.left)
      .run()

    println("Control is: " + control.getClass.getName)
    val now = System.currentTimeMillis()
    Await.result(promise.future, 30.seconds)
    val later = System.currentTimeMillis()
    println("TPS: " + (msgCount / ((later - now) / 1000.0)))
    control.shutdown()

    groupInfo(groupId)
  }

  private def partitionInfo(topic: String) =
    kafka.tools.GetOffsetShell.main(Array("--topic", topic, "--broker-list", kafkaHost, "--time", "-1"))
  private def groupInfo(group: String) =
    kafka.admin.ConsumerGroupCommand.main(Array("--describe", "--group", group, "--bootstrap-server", kafkaHost, "--new-consumer"))

}

Are these expected results or is there something wrong with my test code?

标签: akka
0条回答
登录 后发表回答