I wanted to know how can I send/emit items to a Kotlin.Flow
, so my use case is:
In the consumer/ViewModel/Presenter I can subscribe with the collect
function:
fun observe() {
coroutineScope.launch {
// 1. Send event
reopsitory.observe().collect {
println(it)
}
}
}
But the issue is in the Repository
side, with RxJava we could use a Behaviorsubject expose it as an Observable/Flowable
and emit new items like this:
behaviourSubject.onNext(true)
But whenever I build a new flow:
flow {
}
I can only collect. How can I send values to a flow?
If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:
private val channel = ConflatedBroadcastChannel<Boolean>()
This will replicate BehaviourSubject
, to expose the channel as a Flow:
// Repository
fun observe() {
return channel.asFlow()
}
Now to send an event/value to that exposed Flow
simple send to this channel.
// Repository
fun someLogicalOp() {
channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}
Console:
false
If you wish to only receive values after you start collecting you should use a BroadcastChannel
instead.
To make it clear:
Behaves as an Rx's PublishedSubject
private val channel = BroadcastChannel<Boolean>(1)
fun broadcastChannelTest() {
// 1. Send event
channel.send(true)
// 2. Start collecting
channel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
channel.send(false)
}
false
Only false
gets printed as the first event was sent before collect { }
.
Behaves as an Rx's BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>()
fun conflatedBroadcastChannelTest() {
// 1. Send event
confChannel.send(true)
// 2. Start collecting
confChannel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
confChannel.send(false)
}
true
false
Both events are printed, you always get the latest value (if present).
Also, want to mention Kotlin's team development on DataFlow
(name pending):
- https://github.com/Kotlin/kotlinx.coroutines/pull/1354
Which seems better suited to this use case (as it will be a cold stream).