Parallel request with Retrofit, Coroutines and Suspend functions
Asked Answered
S

2

20

I'm using Retrofit in order to make some network requests. I'm also using the Coroutines in combination with 'suspend' functions.

My question is: Is there a way to improve the following code. The idea is to launch multiple requests in parallels and wait for them all to finish before continuing the function.

lifecycleScope.launch {
    try {
        itemIds.forEach { itemId ->
            withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
        }
    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

(Note that "MyService.getItem()" is a 'suspend' function.)

I guess that there is something nicer than a foreach in this case.

Anyone with an idea?

Selectman answered 1/11, 2019 at 11:38 Comment(2)
Try with this kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/… coroutines can be put in an array like JS Promise.allPenelopa
awaitAll is less convenient that coroutineScope, where you don't have to explicitly enumerate what to await for.Mesomorphic
M
42

I've prepared three approaches to solving this, from the simplest to the most correct one. To simplify the presentation of the approaches, I have extracted this common code:

lifecycleScope.launch {
    val itemById = try {
        fetchItems(itemIds)
    } catch (exception: Exception) {
        exception.printStackTrace()
    }
    Log.i(TAG, "Fetched these items: $itemById")
}

Before I go on, a general note: your getItem() function is suspendable, you have no need to submit it to the IO dispatcher. All your coroutines can run on the main thread.

Now let's see how we can implement fetchItems(itemIds).

1. Simple forEach

Here we take advantage of the fact that all the coroutine code can run on the main thread:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
    val itemById = mutableMapOf<Long, Item>()
    coroutineScope {
        itemIds.forEach { itemId ->
            launch { itemById[itemId] = MyService.getItem(itemId) }
        }
    }
    return itemById
}

coroutineScope will wait for all the coroutines you launch inside it. Even though they all run concurrently to each other, the launched coroutines still dispatch to the single (main) thread, so there is no concurrency issue with updating the map from each of them.

2. Thread-Safe Variant

The fact that it leverages the properties of a single-threaded context can be seen as a limitation of the first approach: it doesn't generalize to threadpool-based contexts. We can avoid this limitation by relying on the async-await mechanism:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
    itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
            .map { it.await() }
            .toMap()
}

Here we rely on two non-obvious properties of Collection.map():

  1. It performs all the transformation eagerly, so the first transformation to a collection of Deferred<Pair<Long, Item>> is completely done before entering the second stage, where we await on all of them.
  2. It is an inline function, which allows us to write suspendable code in it even though the function itself is not a suspend fun and gets a non-suspendable lambda (Deferred<T>) -> T.

This means that all the fetching is done concurrently, but the map gets assembled in a single coroutine.

3. Flow-Based Approach with Improved Concurrency Control

The above solved the concurrency for us, but it lacks any backpressure. If your input list is very large, you'll want to put a limit on how many simultaneous network requests you're making.

You can do this with a Flow-based idiom:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
        .asFlow()
        .flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
            flow { emit(itemId to MyService.getItem(itemId)) }
        }
        .toMap()

Here the magic is in the .flatMapMerge operation. You give it a function (T) -> Flow<R> and it will execute it sequentially on all the input, but then it will concurrently collect all the flows it got. Note that I couldn't simplify flow { emit(getItem()) } } to just flowOf(getItem()) because getItem() must be called lazily, while collecting the flow.

Flow.toMap() is not currently provided in the standard library, so here it is:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
    val result = mutableMapOf<K, V>()
    collect { (k, v) -> result[k] = v }
    return result
}
Mesomorphic answered 1/11, 2019 at 12:41 Comment(0)
S
2

If you are looking for just a nicer way to write it and eliminate foreach

lifecycleScope.launch {
    try {

        itemIds.asFlow()
               .flowOn(Dispatchers.IO) 
               .collect{ itemId -> itemById[itemId] = MyService.getItem(itemId)}

    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

Also please look at lifecycleScope I suspect it is using Dispatchers.Main. If that is the case you can remove this .flowOn(Dispatchers.IO) extra dispatcher declaration.

For more info: Kotlin Asynchronous Flow

Strohl answered 1/11, 2019 at 21:28 Comment(2)
Unfortunately, this doesn't achieve concurrency. Also, you don't need Dispatchers.IO under any circumstances. The IO dispatcher is a crutch for calling legacy blocking functions in a large thread pool.Mesomorphic
I was using dispatcher since original code was using, that's why I added comment about extra dispatcher :)Strohl

© 2022 - 2024 — McMap. All rights reserved.