How can I send items to a Kotlin.Flow (like a Beha

2020-02-26 09:00发布

问题:

I wanted to know how can I send/emit items to a Kotlin.Flow, so my use case is:

In the consumer/ViewModel/Presenter I can subscribe with the collect function:

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

But the issue is in the Repository side, with RxJava we could use a Behaviorsubject expose it as an Observable/Flowable and emit new items like this:

behaviourSubject.onNext(true)

But whenever I build a new flow:

flow {

}

I can only collect. How can I send values to a flow?

回答1:

If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>()

This will replicate BehaviourSubject, to expose the channel as a Flow:

// Repository
fun observe() {
  return channel.asFlow()
}

Now to send an event/value to that exposed Flow simple send to this channel.

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}

Console:

false

If you wish to only receive values after you start collecting you should use a BroadcastChannel instead.

To make it clear:

Behaves as an Rx's PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(false)
}

false

Only false gets printed as the first event was sent before collect { }.


Behaves as an Rx's BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(false)
}

true

false

Both events are printed, you always get the latest value (if present).

Also, want to mention Kotlin's team development on DataFlow (name pending):

  • https://github.com/Kotlin/kotlinx.coroutines/pull/1354

Which seems better suited to this use case (as it will be a cold stream).