Kotlin Process Collection In Parallel?
Asked Answered
T

5

5

I have a collection of objects, which I need to perform some transformation on. Currently I am using:

var myObjects: List<MyObject> = getMyObjects()

myObjects.forEach{ myObj ->
    someMethod(myObj)
}

It works fine, but I was hoping to speed it up by running someMethod() in parallel, instead of waiting for each object to finish, before starting on the next one.

Is there any way to do this in Kotlin? Maybe with doAsyncTask or something?

I know when this was asked over a year ago it was not possible, but now that Kotlin has coroutines like doAsyncTask I am curious if any of the coroutines can help

Thug answered 8/8, 2017 at 18:25 Comment(4)
Possible duplicate of Parallel operations on Kotlin collections?Slater
That question is a year and a half old, and was asked before Kotlin Coroutines were introducedThug
the launch() coroutine may do the jobxMoxa
also, you can always use the Java parallel streamsCirri
T
16

Yes, this can be done using coroutines. The following function applies an operation in parallel on all elements of a collection:

fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking {
    map { async(CommonPool) { f(it) } }.forEach { it.await() }
}

While the definition itself is a little cryptic, you can then easily apply it as you would expect:

myObjects.forEachParallel { myObj ->
    someMethod(myObj)
}

Parallel map can be implemented in a similar way, see https://mcmap.net/q/319847/-parallel-operations-on-kotlin-collections.

Triumphal answered 21/8, 2017 at 10:7 Comment(1)
Currently 'CommonPool' cannot be accessed - it is internal in 'kotlinx.coroutines'!Weighted
S
6

Java Stream is simple to use in Kotlin:

tasks.stream().parallel().forEach { computeNotSuspend(it) }

If you are using Android however, you cannot use Java 8 if you want an app compatible with an API lower than 24.

You can also use coroutines as you suggested. But it's not really part of the language as of now (August 2017) and you need to install an external library. There is very good guide with examples.

    runBlocking<Unit> {
        val deferreds = tasks.map { async(CommonPool) { compute(it) } }
        deferreds.forEach { it.await() }
    }

Note that coroutines are implemented with non-blocking multi-threading, which mean they can be faster than traditional multi-threading. I have code below benchmarking the Stream parallel versus coroutine and in that case the coroutine approach is 7 times faster on my machine. However you have to do some work yourself to make sure your code is "suspending" (non-locking) which can be quite tricky. In my example I'm just calling delay which is a suspend function provided by the library. Non-blocking multi-threading is not always faster than traditional multi-threading. It can be faster if you have many threads doing nothing but waiting on IO, which is kind of what my benchmark is doing.

My benchmarking code:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.*
import kotlin.system.measureNanoTime
import kotlin.system.measureTimeMillis

class SomeTask() {
    val durationMS = random.nextInt(1000).toLong()

    companion object {
        val random = Random()
    }
}

suspend fun compute(task: SomeTask): Unit {
    delay(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun computeNotSuspend(task: SomeTask): Unit {
    Thread.sleep(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun main(args: Array<String>) {
    val n = 100
    val tasks = List(n) { SomeTask() }

    val timeCoroutine = measureNanoTime {
        runBlocking<Unit> {
            val deferreds = tasks.map { async(CommonPool) { compute(it) } }
            deferreds.forEach { it.await() }
        }
    }

    println("Coroutine ${timeCoroutine / 1_000_000} ms")

    val timePar = measureNanoTime {
        tasks.stream().parallel().forEach { computeNotSuspend(it) }
    }
    println("Stream parallel ${timePar / 1_000_000} ms")
}

Output on my 4 cores computer:

Coroutine: 1037 ms
Stream parallel: 7150 ms

If you uncomment out the println in the two compute functions you will see that in the non-blocking coroutine code the tasks are processed in the right order, but not with Streams.

Skysail answered 17/8, 2017 at 21:45 Comment(0)
S
1

To process items of a collection in parallel you can use Kotlin Coroutines. For example the following extension function processes items in parallel and waits for them to be processed:

suspend fun <T, R> Iterable<T>.processInParallel(
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> R,
    ): List<R> = coroutineScope { // or supervisorScope
          map {
              async(dispatcher) { processBlock(it) }
          }.awaitAll()
    }

This is suspend extension function on Iterable<T> type, which does a parallel processing of items and returns some result of processing each item. By default it uses Dispatchers.IO dispatcher to offload blocking tasks to a shared pool of threads. Must be called from a coroutine (including a coroutine with Dispatchers.Main dispatcher) or another suspend function.

Example of calling from a coroutine:

val myObjects: List<MyObject> = getMyObjects()

someCoroutineScope.launch {
    val results = myObjects.processInParallel {
        someMethod(it)
    }
    // use processing results
}

where someCoroutineScope is an instance of CoroutineScope.


Or if you want to just launch and forget you can use this function:

fun <T> CoroutineScope.processInParallelAndForget(
    iterable: Iterable<T>,
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> Unit
) = iterable.forEach {
    launch(dispatcher) { processBlock(it) }
}

This is an extension function on CoroutineScope, which doesn't return any result. It also uses Dispatchers.IO dispatcher by default. Can be called using CoroutineScope or from another coroutine. Calling example:

someoroutineScope.processInParallelAndForget(myObjects) {
    someMethod(it)
}

// OR from another coroutine:

someCoroutineScope.launch {
    processInParallelAndForget(myObjects) {
        someMethod(it)
    }
}

where someCoroutineScope is an instance of CoroutineScope.

Sommelier answered 25/1, 2022 at 9:55 Comment(0)
C
1

As per Kotlin language official guide, OP's code would be done something like this:

val myObjects: List<MyObject> = getMyObjects()

runBlocking(Dispatchers.Default) { // or Dispatchers.IO as you like
    myObjects.map {
        async { someMethod(it) }
    }.awaitAll()
}
Clino answered 17/10, 2023 at 6:37 Comment(0)
P
0

You can use RxJava to solve this.

List<MyObjects> items = getList()

Observable.from(items).flatMap(object : Func1<MyObjects, Observable<String>>() {
    fun call(item: MyObjects): Observable<String> {
        return someMethod(item)
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : Subscriber<String>() {
    fun onCompleted() {

    }

    fun onError(e: Throwable) {

    }

    fun onNext(s: String) {
        // do on output of each string
    }
})

By subscribing on Schedulers.io(), some method is scheduled on background thread.

Practical answered 8/8, 2017 at 18:47 Comment(1)
Gluing pipeline observer to "background thread" doesn't enable executing parallelismDragster

© 2022 - 2025 — McMap. All rights reserved.