I’m currently migrating my app to use the concurrency model in Swift. I want to serialize Tasks to make sure they are executed one after the other (no paralellism). In my use case, I want to listen to notifications posted by the NotificationCenter and execute a Task every time a new notification is posted. But I want to make sure no previous task is running. It's the equivalent of using an OperationQueue with maxConcurrentOperationCount = 1.
For example, I’m using CloudKit with Core Data in my app and I use persistent history tracking to determine what changes have occurred in the store. In this Synchronizing a Local Store to the Cloud Sample Code, Apple uses an operation queue for handling history processing tasks (in CoreDataStack). This OperationQueue has a maximum number of operations set to 1.
private lazy var historyQueue: OperationQueue = {
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
return queue
}()
When a Core Data notification is received, a new task is added to this serial operation queue. So if many notifications are received, they will all be performed one after the other one in a serial way.
@objc
func storeRemoteChange(_ notification: Notification) {
// Process persistent history to merge changes from other coordinators.
historyQueue.addOperation {
self.processPersistentHistory()
}
}
In this Loading and Displaying a Large Data Feed Sample Code, Apple uses Tasks to handle history changes (in QuakesProvider).
// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
Task {
await self.fetchPersistentHistory()
}
}
I feel something is wrong in the second project as Tasks could happen in any order, and not necessarily in a serial order (contrary to the first project where the OperationQueue as a maxConcurrentOperationCount = 1).
Should we use an actor somewhere to make sure the methods are serially called?
I thought about an implementation like this but I’m not yet really comfortable with that:
actor PersistenceStoreListener {
let historyTokenManager: PersistenceHistoryTokenManager = .init()
private let persistentContainer: NSPersistentContainer
init(persistentContainer: NSPersistentContainer) {
self.persistentContainer = persistentContainer
}
func processRemoteStoreChange() async {
print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
}
}
where the processRemoteStoreChange method would be called by when a new notification is received (AsyncSequence):
notificationListenerTask = Task {
let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
for await _ in notifications {
print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
await self.storeListener?.processRemoteStoreChange()
}
}
async let
or a detached task. In a task group the iterations can be performed concurrently, but the result is awaited in order. In your case anAsyncSequence
is the right way. – ThermionAsyncSequence
implies it's a sequence and a sequence is executed sequentially. But feel free to try it out. – ThermionprocessRemoteStoreChange()
really does. But if the goal is to serialize operations but not on the main thread, that's an actor. – Wryasync
? Your operation queue example is actually callingprocessPersistentHistory
, which obviously is not an asynchronous method. I mention it because the desired behavior is easy to achieve withactor
if the method in question is synchronous. (FWIW, the same is true for operations, too, which get far more complicated when you attempt wrap an inherently asynchronous process within anOperation
subclass.) – Kimmie