Kotlin Coroutines - How to block to await/join all

2019-07-09 23:25发布

I am new to Kotlin/Coroutines, so hopefully I am just missing something/don't fully understand how to structure my code for the problem I am trying to solve.

Essentially, I am taking a list of strings, and for each item in the list I want to send it to another method to do work (make a network call and return data based on the response). (Edit:) I want all calls to launch concurrently, and block until all calls are done/the response is acted on, and then return a new list with the info of each response.

I probably don't yet fully understand when to use launch/async, but I've tried to following with both launch (with joinAll), and async (with await).

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

What I am expecting to happen, is if my lstInputs is a size of 120, when all jobs are joined, my lstOfReturnData should also have a size of 120.

What actually is happening is inconsitent results. I'll run it once, and I get 118 in my final list, run it again, it's 120, run it again, it's 117, etc. In the networkCallToGetData() method, I am handling any exceptions, to at least return something for every request, regardless if the network call fails.

Can anybody help explain why I am getting inconsistent results, and what I need to do to ensure I am blocking appropriately and all jobs are being joined before moving on?

3条回答
倾城 Initia
2楼-- · 2019-07-09 23:58

Runblocking should mean you don't have to call join. Launching a coroutine from inside a runblocking scope should do this for you. Have you tried just:

fun processData(lstInputs: List<String>): List<response> {

val lstOfReturnData = mutableListOf<response>()

runBlocking {
    lstInputs.forEach {
            launch(Dispatchers.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
   } 
}

return lstofReturnData
查看更多
时光不老,我们不散
3楼-- · 2019-07-10 00:04

mutableListOf() creates an ArrayList, which is not thread-safe.
Try using ConcurrentLinkedQueue instead.
Also, do you use the stable version of Kotlin/Kotlinx.coroutine (not the old experimental one)? In the stable version, with the introduction of structured concurrency, there is no need to write jobs.joinAll anymore. launch is an extesion function of runBlocking which will launch new coroutines in the scope of the runBlocking and the runBlocking scope will automatically wait for all the launched jobs to finsish. So the code above can be shorten to

    val lstOfReturnData = ConcurrentLinkedQueue<response>()
    runBlocking {
            lstInputs.forEach {
                launch(Dispatches.IO) {
                    lstOfReturnData.add(networkCallToGetData(it))
                }
            }
    }
    return lstOfReturnData 
查看更多
甜甜的少女心
4楼-- · 2019-07-10 00:17

runBlocking blocks current thread interruptibly until its completion. I guess it's not what you want. If I think wrong and you want to block the current thread than you can get rid of coroutine and just make network call in the current thread:

val lstOfReturnData = mutableListOf<response>()
lstInputs.forEach {
    lstOfReturnData.add(networkCallToGetData(it))
} 

But if it is not your intent you can do the following:

class Presenter(private val uiContext: CoroutineContext = Dispatchers.Main) 
    : CoroutineScope {

    // creating local scope for coroutines
    private var job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = uiContext + job

    // call this to cancel job when you don't need it anymore
    fun detach() {
        job.cancel()
    }

    fun processData(lstInputs: List<String>) {

        launch {
            val deferredList = lstInputs.map { 
                async(Dispatchers.IO) { networkCallToGetData(it) } // runs in parallel in background thread
            }
            val lstOfReturnData = deferredList.awaitAll() // waiting while all requests are finished without blocking the current thread

            // use lstOfReturnData in Main Thread, e.g. update UI
        }
    }
}
查看更多
登录 后发表回答