Swift 5.5 Concurrency: how to serialize async Tasks to replace an OperationQueue with maxConcurrentOperationCount = 1?
Asked Answered
R

1

10

I’m currently migrating my app to use the concurrency model in Swift. I want to serialize Tasks to make sure they are executed one after the other (no paralellism). In my use case, I want to listen to notifications posted by the NotificationCenter and execute a Task every time a new notification is posted. But I want to make sure no previous task is running. It's the equivalent of using an OperationQueue with maxConcurrentOperationCount = 1.

For example, I’m using CloudKit with Core Data in my app and I use persistent history tracking to determine what changes have occurred in the store. In this Synchronizing a Local Store to the Cloud Sample Code, Apple uses an operation queue for handling history processing tasks (in CoreDataStack). This OperationQueue has a maximum number of operations set to 1.

private lazy var historyQueue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
}()

When a Core Data notification is received, a new task is added to this serial operation queue. So if many notifications are received, they will all be performed one after the other one in a serial way.

@objc
func storeRemoteChange(_ notification: Notification) {
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation {
        self.processPersistentHistory()
    }
}

In this Loading and Displaying a Large Data Feed Sample Code, Apple uses Tasks to handle history changes (in QuakesProvider).

// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
    Task {
        await self.fetchPersistentHistory()
    }
}

I feel something is wrong in the second project as Tasks could happen in any order, and not necessarily in a serial order (contrary to the first project where the OperationQueue as a maxConcurrentOperationCount = 1).

Should we use an actor somewhere to make sure the methods are serially called?

I thought about an implementation like this but I’m not yet really comfortable with that:

actor PersistenceStoreListener {
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) {
        self.persistentContainer = persistentContainer
    }

    func processRemoteStoreChange() async {
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    }
}

where the processRemoteStoreChange method would be called by when a new notification is received (AsyncSequence):

notificationListenerTask = Task {
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications {
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    }
}
Row answered 31/12, 2021 at 8:48 Comment(13)
In Swift Concurrency a sequential execution is the normal case unless you use async let or a detached task. In a task group the iterations can be performed concurrently, but the result is awaited in order. In your case an AsyncSequence is the right way.Thermion
@Thermion So you mean that the code in the second project from Apple using Task (developer.apple.com/documentation/coredata/…) is executing Tasks in a serial way? There are no risk of task scheduled later being executed before another task already scheduled?Row
Yes, as the name AsyncSequence implies it's a sequence and a sequence is executed sequentially. But feel free to try it out.Thermion
Hum Apple is not using the AsyncSequence in their code (it's my proposal). But even if they were, it means the notifications are received sequentially, but when I call the async methods in the Task, will they be executed sequentially too? I'm a bit confused by the ways the different Tasks will handled actually.Row
"In a task group the iterations can be performed concurrently, but the result is awaited in order" False. The results can arrive in any order.Wry
"Should we use an actor somewhere to make sure the methods are serially called?" That is certainly one of the main purposes of actors, yes.Wry
Thanks @Wry for your precisions! Do you have any thought regarding my code proposal for this actor?Row
I have no idea because I don't know what processRemoteStoreChange() really does. But if the goal is to serialize operations but not on the main thread, that's an actor.Wry
@Wry You're right, I apologize. TaskGroup returns the items in order of completion. But AsyncSequence returns the items in sequential order.Thermion
@Rob No they don't. That's why you can't simply append the results to an array as they arrive. You lose all association with the array you were looping through to start with.Wry
Correct. We need to do the same silliness to reassemble the results in the original order.Kimmie
So how to make sure that only one async method executes at the same time when added from a AsyncSequence, like notifications, without missing any item from the AsyncSequence ? In my case, it’s possible to have a queue of upcoming Async method calls started from the AsyncSequence, a little bit like a buffer. E.g 10 notifications received in 1 second, they all call the Async method but only one executes. When the first completes the 2nd starts. And so on. Possibly, new calls arrives when all calls are not executed yet but they add to the queue.Row
“So how to make sure that only one async method executes at the same time?” ... But is it really async? Your operation queue example is actually calling processPersistentHistory, which obviously is not an asynchronous method. I mention it because the desired behavior is easy to achieve with actor if the method in question is synchronous. (FWIW, the same is true for operations, too, which get far more complicated when you attempt wrap an inherently asynchronous process within an Operation subclass.)Kimmie
K
15

Below, in my original answer, I answer the general question of how to achieve sequential behavior from independent tasks within Swift concurrency.

But, you are asking a more specific question, namely, how to get serial behavior from an asynchronous sequence of events. If you have an AsyncSequence, such as notifications, then the for-await-in approach you contemplate at the end of your answer is a great solution:

notificationListenerTask = Task {
    let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
    for await _ in notifications {
        await self.storeListener?.processRemoteStoreChange()
    }
}

Because you await within the loop, it will not get to the next iteration of the notifications AsyncSequence until the prior processRemoteStoreChange returns and execution of the loop continues.

Bottom line, AsyncSequence (whether notifications or your own AsyncStream or AsyncChannel) are an excellent way to get serial behavior from an asynchronous series of events. WWDC 2021 video Meet AsyncSequence is a great primer on asynchronous sequences for those unfamiliar with the AsyncSequence protocol.


In my original answer, below, I tackle the more general question of getting serial behavior from a series of independent Swift concurrency tasks:


If you want to get the behavior of an OperationQueue with a maxConcurrentOperationCount of 1 (a ”serial” operation queue), one can achieve that with an actor.

There are two patterns that you will see with a serial OperationQueue:

  1. The operations in the queue are, themselves, synchronous.

    If you are using the standard OperationQueue (i.e., you have not subclassed Operation that does manual KVO for isFinished, etc.), a simple actor achieves what we want. An actor will prevent concurrent execution.

    The key here, though, that this only works with synchronous methods (i.e., those methods that do not have await suspension points).

  2. The operations in the queue are asynchronous.

    One of the more advanced use-cases of operation queues is to handle dependencies between tasks that are, themselves, asynchronous. This is a more complicated scenario in operation queues, requiring a custom Operation subclass in which you manually handle the KVO of isFinished, etc. (See this answer for an example of that pattern.)

    The challenge in doing this with Swift concurrency is that actors are reentrant (see reentrancy discussion in SE-0306. If the actor’s method is asynchronous (i.e., with async-await) that introduces suspension points, i.e., where an await in one call will allow another async method to run on that actor.

    To achieve serial execution between separate async methods, you have a couple of options:


Consider the following (which uses OS signposts so that I can graphically illustrate the behavior in Instruments):

import os.signpost

private let pointsOfInterest = OSLog(subsystem: "log", category: .pointsOfInterest)

class ViewController: UIViewController {

    let example = Example()
    let taskSerializer = SerialTasks<Void>()

    @IBAction func didTapSync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        startSynchronous()
    }

    @IBAction func didTapAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startAsynchronous() }
    }

    @IBAction func didTapSerializedAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startSerializedAsynchronous() }
    }

    func startSynchronous() {
        Task {
            await example.synchronousExample("1. synchronous")
        }
    }

    func startAsynchronous() async throws {
        try await example.asynchronousExample("2. asynchronous")
    }

    func startSerializedAsynchronous() async throws {
        try await taskSerializer.add {
            try await self.example.asynchronousExample("3. serial async")
        }
    }
}

actor Example {
    func asynchronousExample(_ name: StaticString) async throws {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        try await Task.sleep(for: .seconds(2))
    }

    func synchronousExample(_ name: StaticString) {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        Thread.sleep(forTimeInterval: 2)
    }
}

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
        let task = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
        previousTask = task
        return try await task.value
    }
}

With synchronous tasks (scenario 1), startSynchronous, is the simplest. Just call the synchronous method of the actor and you get serial execution.

With asynchronous tasks (scenario 2), startAsynchronous, if you have await suspension points, you lose sequential behaviors due to actor reentrancy.

But you can refine that asynchronous task pattern (scenario 3), by having an actor, SerialTasks in the above code, that keeps track of the previous task, awaiting it before starting the next task. A subtle point is that the add method is, itself, synchronous (although the closure it takes is asynchronous). This avoids subtle races if you add multiple tasks.

Running the above in Instruments, we can graphically see the execution, with signposts where tasks were initiated, and intervals showing when the tasks execute:

enter image description here

In short, if your actor is performing only synchronous tasks (which is your case), then the actor yields maxConcurrentOperationCount = 1 sort of behavior automatically. If the tasks are asynchronous, you simply need to await the prior tasks before starting the next one.

Kimmie answered 5/1, 2022 at 1:31 Comment(7)
Thanks @Kimmie for this nice explanation. It consolidate my knowledge about Actors. The method ‘processPersistentHistory’ is asynchronous because it needs to be run on an NSManagedObjectContext thread. So we need to ‘await context.perform { }’ in order to make sure the perform body is executed on the correct thread. This is of course if we want to use the new async/await, because I still ça use the performAndWait method to run the body synchronously.Row
Do not conflate the fact that you run it on another thread (e.g., on your operation queue) with the question of whether processPersistentHistory is, itself, asynchronous. The fact that it doesn’t have a completion handler and that you did a simple addOperation, suggests that it is a synchronous method that you’re simply running asynchronously on a background thread. But if the method, itself, was asynchronous, the addOperation example you shared would not have worked.Kimmie
If you want more details on the processPersistentHistory method, it's more or less what can be found in this article: avanderlee.com/swift/persistent-history-tracking-core-data. You'll see that it creates a new Core Data backgroundContext and perform some operations but in a sync way (.performAndWait). With the new Swift concurrency model & Core Data, this performAndWait is now await context.perform {. But if we use Task { await context.perform { ... sync work ... } }, it's still not clear to me how to serialize these as a Task can run on an arbitrary thread.Row
I have revised the above answer, adding scenario three, showing how you can get serial behavior between asynchronous Tasks on the actor.Kimmie
Hi @Rob, could you please provide an example of using the SerialTasks.add() method for adding async methods that actually return a value, please? Something where the declaration is like SerialTasks<String>Bracing
@SagarD - I've modified SerialTasks to asynchronously return the closure’s result. See github.com/robertmryan/SerialTaskDemo.git for example.Kimmie
FWIW, I increasingly use asynchronous sequences (esp AsyncChannel) to get both serial and constrained actor reentrancy behaviors. E.g., see the download managers in the latter part of this answer.Kimmie

© 2022 - 2024 — McMap. All rights reserved.