To process items of a collection in parallel you can use Kotlin Coroutines. For example the following extension function processes items in parallel and waits for them to be processed:
suspend fun <T, R> Iterable<T>.processInParallel(
dispatcher: CoroutineDispatcher = Dispatchers.IO,
processBlock: suspend (v: T) -> R,
): List<R> = coroutineScope { // or supervisorScope
map {
async(dispatcher) { processBlock(it) }
}.awaitAll()
}
This is suspend
extension function on Iterable<T>
type, which does a parallel processing of items and returns some result of processing each item. By default it uses Dispatchers.IO
dispatcher to offload blocking tasks to a shared pool of threads. Must be called from a coroutine (including a coroutine with Dispatchers.Main
dispatcher) or another suspend
function.
Example of calling from a coroutine:
val myObjects: List<MyObject> = getMyObjects()
someCoroutineScope.launch {
val results = myObjects.processInParallel {
someMethod(it)
}
// use processing results
}
where someCoroutineScope
is an instance of CoroutineScope
.
Or if you want to just launch and forget you can use this function:
fun <T> CoroutineScope.processInParallelAndForget(
iterable: Iterable<T>,
dispatcher: CoroutineDispatcher = Dispatchers.IO,
processBlock: suspend (v: T) -> Unit
) = iterable.forEach {
launch(dispatcher) { processBlock(it) }
}
This is an extension function on CoroutineScope
, which doesn't return any result. It also uses Dispatchers.IO
dispatcher by default. Can be called using CoroutineScope
or from another coroutine.
Calling example:
someoroutineScope.processInParallelAndForget(myObjects) {
someMethod(it)
}
// OR from another coroutine:
someCoroutineScope.launch {
processInParallelAndForget(myObjects) {
someMethod(it)
}
}
where someCoroutineScope
is an instance of CoroutineScope
.