I am using io.vertx.reactivex.kafka.client.producer.KafkaProducer
client. The client has a
rxWrite
function which returns Single<RecordMetadata>
. However I need to log error if any, during write operation. It apparently is not getting executed.
I have written following working example.
test(): Function to test the chaining and logging
fun test(): Single<Int> {
val data = Single.just(ArrayList<String>().apply {
add("Hello")
add("World")
})
data.flattenAsObservable<String> { list -> list }
.flatMap { advertiser ->
//does not work with writeKafka
writeError(advertiser).toObservable().doOnError({ println("Error $data") })
}
.subscribe({ record -> println(record) }, { e -> println("Error2 $e") })
return data.map { it.size }
}
writeKafka: Writes the given given string into Kafka and returns Single
fun writeKafka(param: String): Single<RecordMetadata> {
//null topic to produce IllegalArgumentException()
val record = KafkaProducerRecord.create(null, UUID.randomUUID().toString(), param)
return kafkaProducer.rxWrite(record)
}
writeError: Always return a single with error of same type
fun writeError(param: String): Single<RecordMetadata> {
return Single.error<RecordMetadata>(IllegalArgumentException())
}
So when I call writeKafka
It only prints Error2
but if I use writeError
it prints both Error
and Error2
. Looks like the single returned by writeKafka
is still waiting for result, but then why even Error2
is printed?
I am pretty newbie in RxJava2, could somebody point out any error in that?
It is important to read and post the stacktrace of errors so that the problem can be isolated.
In this case, looks like you get the
IllegalArgumentException
fromcreate
and you don't get anySingle
because the relevant Kafka class throws it.return kafkaProducer.rxWrite(record)
never executes at all and you practically crash theflatMap
.doOnError
never gets into play hence only the "Error2" is printed.