Vert.x Reactive Kafka client: chaining not working

2019-09-01 01:50发布

I am using io.vertx.reactivex.kafka.client.producer.KafkaProducer client. The client has a rxWrite function which returns Single<RecordMetadata>. However I need to log error if any, during write operation. It apparently is not getting executed.

I have written following working example.

test(): Function to test the chaining and logging

    fun test(): Single<Int> {
    val data = Single.just(ArrayList<String>().apply {
        add("Hello")
        add("World")
    })

    data.flattenAsObservable<String> { list -> list }
        .flatMap { advertiser ->
       //does not work with writeKafka
            writeError(advertiser).toObservable().doOnError({ println("Error $data") })
        }
        .subscribe({ record -> println(record) }, { e -> println("Error2 $e") })

    return data.map { it.size }
}

writeKafka: Writes the given given string into Kafka and returns Single

fun writeKafka(param: String): Single<RecordMetadata> {
 //null topic to produce IllegalArgumentException()
    val record = KafkaProducerRecord.create(null, UUID.randomUUID().toString(), param)
    return kafkaProducer.rxWrite(record)
}

writeError: Always return a single with error of same type

fun writeError(param: String): Single<RecordMetadata> {
    return Single.error<RecordMetadata>(IllegalArgumentException())
}

So when I call writeKafka It only prints Error2 but if I use writeError it prints both Error and Error2. Looks like the single returned by writeKafka is still waiting for result, but then why even Error2 is printed?

I am pretty newbie in RxJava2, could somebody point out any error in that?

1条回答
神经病院院长
2楼-- · 2019-09-01 02:31

It is important to read and post the stacktrace of errors so that the problem can be isolated.

In this case, looks like you get the IllegalArgumentException from create and you don't get any Single because the relevant Kafka class throws it. return kafkaProducer.rxWrite(record) never executes at all and you practically crash the flatMap. doOnError never gets into play hence only the "Error2" is printed.

查看更多
登录 后发表回答