kotlin coroutines - use main thread in run blockin

2019-06-23 04:15发布

问题:

I am trying to execute following code:

 val jobs = listOf(...)
 return runBlocking(CommonPool) {
    val executed = jobs.map {
        async { it.execute() }
    }.toTypedArray()
    awaitAll(*executed)
 }

where jobs is the list of some Suppliers - in synchronus world this should just create, for example, list of ints. Everything works fine, but the problem is the main thread is not utilized. Bellow screenshot from YourKit:

So, the question is - how can I utilize main thread also?

I suppose runBlocking is the problem here, but is there other way to receive the same result? With Java parallel stream it looks far more better, but the main thread is still not utilized entirely (tasks are totally independent).

UPDATE

Ok, maybe I have told you too few things. My questions came some time after watching Vankant Subramaniam presentation : https://youtu.be/0hQvWIdwnw4. I need maximum performance, there is no IO, no Ui etc. Only computations. There is only request and I need to use all my available resources.

One think which I have is to set paralleizm to thread count + 1, but I think it is rather silly.

回答1:

I tested the solution with Java 8 parallel streams:

jobs.parallelStream().forEach { it.execute() }

I found the CPU utilization to be reliably on 100%. For reference, I used this computation job:

class MyJob {
    fun execute(): Double {
        val rnd = ThreadLocalRandom.current()
        var d = 1.0
        (1..rnd.nextInt(1_000_000)).forEach { _ ->
            d *= 1 + rnd.nextDouble(0.0000001)
        }
        return d
    }
}

Note that its duration varies randomly from zero up to the time it takes to perform 100,000,000 FP multiplications.

Out of curiosity I also studied the code you added to your question as the solution that works for you. I found a number of issues with it, such as:

  • accumulating all the results into a list instead of processing them as they become available
  • closing the result channel immediately after submitting the last job instead of waiting for all the results

I wrote some code of my own and added code to benchmark the Stream API one-liner against it. Here it is:

const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }


fun parallelStream(): Double =
        jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })

fun channels(): Double {
    val resultChannel = Channel<Double>(UNLIMITED)

    val mainComputeChannel = Channel<MyJob>()
    val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
        GlobalScope.actor<MyJob>(Dispatchers.Default) {
            for (job in channel) {
                job.execute().also { resultChannel.send(it) }
            }
        }
    }
    val allComputeChannels = poolComputeChannels + mainComputeChannel

    // Launch a coroutine that submits the jobs
    GlobalScope.launch {
        jobs.forEach { job ->
            select {
                allComputeChannels.forEach { chan ->
                    chan.onSend(job) {}
                }
            }
        }
    }

    // Run the main loop which takes turns between running a job
    // submitted to the main thread channel and receiving a result
    return runBlocking {
        var completedCount = 0
        var sum = 0.0
        while (completedCount < NUM_JOBS) {
            select<Unit> {
                mainComputeChannel.onReceive { job ->
                    job.execute().also { resultChannel.send(it) }
                }
                resultChannel.onReceive { result ->
                    sum += result
                    completedCount++
                }
            }
        }
        sum
    }
}

fun main(args: Array<String>) {
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
}

fun measure(task: String, measuredCode: () -> Double) {
    val block = { print(measuredCode().toString().substringBefore('.')) }
    println("Warming up $task")
    (1..20).forEach { _ -> block() }
    println("\nMeasuring $task")
    val average = (1..20).map { measureTimeMillis(block) }.average()
    println("\n$task took $average ms")
}

Here's my typical result:

Parallel Stream took 396.85 ms
Channels took 398.1 ms

The results are similar, but one line of code still beats 50 lines of code :)



回答2:

Just because there is no work being run on this explicit thread doesn't mean that device is not running other threads on the same core.

It's actually better to have your MainThread idle, that will make your UI more responsive.



回答3:

First, I'd like to empathize that utilizing main thread usually doesn't serve any practical purpose.

If your application is fully asynchronous, then you'll have only single (main) thread blocked. This thread does consume some memory and adds a bit of extra pressure on scheduler, but the added impact on performance would be negligible and even impossible to measure.

In practical java world it's almost impossible to have fixed number of threads in the JVM. There are system threads (gc), there are nio threads, etc.

One thread doesn't make a difference. You're fine as long as the number of threads in your application doesn't grow unconstrained with increased load.


Back to the original question.

I don't think there is a concise way to utilize main thread in this kind of parallel processing tasks.

For example, you can do the following:

data class Job(val res: Int) {
    fun execute(): Int {
        Thread.sleep(100)
        println("execute $res in ${Thread.currentThread().name}")
        return res
    }
}

fun main() {
    val jobs = (1..100).map { Job(it) }
    val resultChannel = Channel<Int>(Channel.UNLIMITED)
    val mainInputChannel = Channel<Job>()

    val workers = (1..10).map {
        actor<Job>(CommonPool) {
            for (j in channel) {
                resultChannel.send(j.execute())
            }
        }
    }

    val res: Deferred<List<Int>> = async(CommonPool) {
        val allChannels = (listOf(mainInputChannel) + workers)

        jobs.forEach { job ->
            select {
                allChannels.forEach {
                    it.onSend(job) {}
                }
            }
        }

        allChannels.forEach { it.close() }
        (1..jobs.size).map { resultChannel.receive() }
    }

    runBlocking {
        for (j in mainInputChannel) {
            resultChannel.send(j.execute())
        }
    }

    runBlocking {
        res.await().forEach { println(it) }
    }
}

Basically, is a simple producer/consumer implementation where main thread serves as one of the consumers. But that leads to lots of boilerplate though.

Output:

execute 1 in main @coroutine#12
execute 5 in ForkJoinPool.commonPool-worker-1 @coroutine#4
execute 6 in ForkJoinPool.commonPool-worker-2 @coroutine#5
execute 7 in ForkJoinPool.commonPool-worker-7 @coroutine#6
execute 2 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 8 in ForkJoinPool.commonPool-worker-4 @coroutine#7
execute 4 in ForkJoinPool.commonPool-worker-5 @coroutine#3
execute 3 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 12 in main @coroutine#12
execute 10 in ForkJoinPool.commonPool-worker-7 @coroutine#9
execute 15 in ForkJoinPool.commonPool-worker-5 @coroutine#6
execute 11 in ForkJoinPool.commonPool-worker-3 @coroutine#10
execute 16 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 9 in ForkJoinPool.commonPool-worker-1 @coroutine#8
execute 14 in ForkJoinPool.commonPool-worker-4 @coroutine#5
execute 13 in ForkJoinPool.commonPool-worker-2 @coroutine#4
execute 20 in main @coroutine#12
execute 17 in ForkJoinPool.commonPool-worker-5 @coroutine#2
execute 18 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 24 in ForkJoinPool.commonPool-worker-1 @coroutine#6
execute 23 in ForkJoinPool.commonPool-worker-4 @coroutine#5
execute 22 in ForkJoinPool.commonPool-worker-2 @coroutine#4
execute 19 in ForkJoinPool.commonPool-worker-7 @coroutine#7
execute 21 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 25 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 28 in main @coroutine#12
execute 29 in ForkJoinPool.commonPool-worker-2 @coroutine#2
execute 30 in ForkJoinPool.commonPool-worker-7 @coroutine#3
execute 27 in ForkJoinPool.commonPool-worker-4 @coroutine#10
execute 26 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 32 in ForkJoinPool.commonPool-worker-3 @coroutine#4
execute 31 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 36 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 35 in ForkJoinPool.commonPool-worker-4 @coroutine#7
execute 33 in ForkJoinPool.commonPool-worker-2 @coroutine#5
execute 38 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 37 in main @coroutine#12
execute 34 in ForkJoinPool.commonPool-worker-7 @coroutine#6
execute 39 in ForkJoinPool.commonPool-worker-6 @coroutine#3
execute 40 in ForkJoinPool.commonPool-worker-1 @coroutine#1
execute 44 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 41 in ForkJoinPool.commonPool-worker-4 @coroutine#4
execute 46 in ForkJoinPool.commonPool-worker-1 @coroutine#2
execute 47 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 45 in main @coroutine#12
execute 42 in ForkJoinPool.commonPool-worker-2 @coroutine#9
execute 43 in ForkJoinPool.commonPool-worker-7 @coroutine#10
execute 48 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 52 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 49 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 54 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 53 in main @coroutine#12
execute 50 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 51 in ForkJoinPool.commonPool-worker-6 @coroutine#7
execute 56 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 55 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 60 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 61 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 57 in ForkJoinPool.commonPool-worker-4 @coroutine#4
execute 59 in ForkJoinPool.commonPool-worker-3 @coroutine#10
execute 64 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 58 in ForkJoinPool.commonPool-worker-6 @coroutine#9
execute 62 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 63 in main @coroutine#12
execute 68 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 65 in ForkJoinPool.commonPool-worker-1 @coroutine#3
execute 66 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 67 in ForkJoinPool.commonPool-worker-7 @coroutine#7
execute 69 in ForkJoinPool.commonPool-worker-6 @coroutine#4
execute 70 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 74 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 75 in main @coroutine#12
execute 71 in ForkJoinPool.commonPool-worker-5 @coroutine#5
execute 76 in ForkJoinPool.commonPool-worker-7 @coroutine#3
execute 73 in ForkJoinPool.commonPool-worker-6 @coroutine#10
execute 78 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 72 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 77 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 79 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 83 in main @coroutine#12
execute 84 in ForkJoinPool.commonPool-worker-4 @coroutine#3
execute 85 in ForkJoinPool.commonPool-worker-5 @coroutine#5
execute 82 in ForkJoinPool.commonPool-worker-1 @coroutine#7
execute 81 in ForkJoinPool.commonPool-worker-6 @coroutine#4
execute 80 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 89 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 90 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 91 in main @coroutine#12
execute 86 in ForkJoinPool.commonPool-worker-5 @coroutine#6
execute 88 in ForkJoinPool.commonPool-worker-6 @coroutine#10
execute 87 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 92 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 93 in ForkJoinPool.commonPool-worker-4 @coroutine#3
execute 99 in main @coroutine#12
execute 97 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 98 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 95 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 100 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 94 in ForkJoinPool.commonPool-worker-5 @coroutine#4
execute 96 in ForkJoinPool.commonPool-worker-7 @coroutine#7
1
5
6
7
2
8
4
3
12
10
15
11
16
9
14
13
20
17
18
24
23
22
19
21
25
28
29
30
27
26
32
31
36
35
33
38
37
34
39
40
44
41
46
47
45
42
43
48
52
49
54
53
50
51
56
55
60
61
57
59
64
58
62
63
68
65
66
67
69
70
74
75
71
76
73
78
72
77
79
83
84
85
82
81
80
89
90
91
86
88
87
92
93
99
97
98
95
100
94
96


回答4:

async() without any parameters using DefaultDispatcher and will take pool from parent, so all the async calls executed in CommonPool. If you want different set of threads to run your code, create your own pool. While it's usually good practice to not utilize main thread with computation, but depends on your usecase.