Make tasks in Swift concurrency run serially
Asked Answered
N

1

10

I've a document based application that uses a struct for its main data/model. As the model is a property of (a subclass of) NSDocument it needs to be accessed from the main thread. So far all good.

But some operations on the data can take quite a long time and I want to provide the user with a progress bar. And this is where to problems start. Especially when the user starts two operations from the GUI in quick succession.

If I run the operation on the model synchronously (or in a 'normal' Task {}) I get the correct serial behaviour, but the Main thread is blocked, hence I can't show a progress bar. (Option A)

If I run the operation on the model in a Task.detached {} closure I can update the progress bar, but depending on the run time of the operations on the model, the second action of the user might complete before the first operation, resulting in invalid/unexpected state of the model. This is due to the await statements needed in the detached task (I think). (Option B).

So I want a) to free up the main thread to update the GUI and b) make sure each task runs to full completion before another (queued) task starts. This would be quite possible using a background serial dispatch queue, but I'm trying to switch to the new Swift concurrency system, which is also used to perform any preparations before the model is accessed.

I tried using a global actor, as that seems to be some sort of serial background queue, but it also needs await statements. Although the likelihood of unexpected state in the model is reduced, it's still possible.

I've written some small code to demonstrate the problem:

The model:

struct Model {
    var doneA = false
    var doneB = false

    mutating func updateA() {
        Thread.sleep(forTimeInterval: 5)
        doneA = true
    }

    mutating func updateB() {
        Thread.sleep(forTimeInterval: 1)
        doneB = true
    }
}

And the document (leaving out standard NSDocument overrides):

@globalActor
struct ModelActor {
    actor ActorType { }

    static let shared: ActorType = ActorType()
}

class Document: NSDocument {
    var model = Model() {
        didSet {
            Swift.print(model)
        }
    }

    func update(model: Model) {
        self.model = model
    }

    @ModelActor
    func updateModel(with operation: (Model) -> Model) async {
        var model = await self.model
        model = operation(model)
        await update(model: model)
    }

    @IBAction func operationA(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some A work...")
//            self.model.updateA()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some A work...")
//            var model = await self.model
//            model.updateA()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some A work...")
            await self.updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        }
    }

    @IBAction func operationB(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some B work...")
//            self.model.updateB()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some B work...")
//            var model = await self.model
//            model.updateB()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some B work...")
            await self.updateModel { model in
                var model = model
                model.updateB()
                return model
            }
        }
    }
}

Clicking 'Operation A' and then 'Operation B' should result in a model with two true's. But it doesn't always.

Is there a way to make sure that operation A completes before I get to operation B and have the Main thread available for GUI updates?

EDIT Based on Rob's answer I came up with the following. I modified it this way because I can then wait on the created operation and report any error to the original caller. I thought it easier to comprehend what's happening by including all code inside a single update function, so I choose to go for a detached task instead of an actor. I also return the intermediate model from the task, as otherwise an old model might be used.

class Document {
    func updateModel(operation: @escaping (Model) throws -> Model) async throws {
        //Update the model in the background
        let modelTask = Task.detached { [previousTask, model] () throws -> Model in
            var model = model

            //Check whether we're cancelled
            try Task.checkCancellation()

            //Check whether we need to wait on earlier task(s)
            if let previousTask = previousTask {
                //If the preceding task succeeds we use its model
                do {
                    model = try await previousTask.value
                } catch {
                    throw CancellationError()
                }
            }

            return try operation(model)
        }


        previousTask = modelTask
        defer { previousTask = nil } //Make sure a later task can always start if we throw

        //Wait for the operation to finish and store the model
        do {
            self.model = try await modelTask.value
        } catch {
            if error is CancellationError { return }
            else { throw error }
        }
    }
}

Call side:

@IBAction func operationA(_ sender: Any?) {
    //Option D
    Task {
        do {
            try await updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        } catch {
            presentError(error)
        }
    }
}

It seems to do anything I need, which is queue'ing updates to a property on a document, which can be awaited for and have errors returned, much like if everything happened on the main thread. The only drawback seems to be that on the call side the closure is very verbose due to the need to make the model a var and return it explicitly.

Neologize answered 18/7, 2022 at 17:45 Comment(4)
Isn't this exactly what an actor is for? If actor code is running (operation A), then as long as that code doesn't say await anywhere, no other actor code can run (operation B) until the first code (operation A) has finished.Folkways
Also never say Thread.sleep as part of async/await code. Use Task.sleep. If you say Thread.sleep on the main thread, yes, you will freeze the GUI. But not Task.sleep!Folkways
I specifically use Thread.sleep here to simulate a long running (CPU consuming) work load, not an asynchronous wait. The problem with the actor here is that it needs the await call to fetch and restore the model on the document (which is MainActor bound) and that's exactly where it all goes wrong. So I'm wondering how to properly fix this.Neologize
If it is a bunch of synchronous functions then put them inside an actor A that way it will serialise the execution and won't be on the main thread. If it involves asynchronous functions await on themPithecanthropus
A
20

Obviously if your tasks do not have any await or other suspension points, you would just use an actor, and not make the method async, and it automatically will perform them sequentially.

But, when dealing with asynchronous actor methods, one must appreciate that actors are reentrant (see SE-0306: Actors - Actor Reentrancy). If you really are trying to a series of asynchronous tasks run serially, you will want to manually have each subsequent task await the prior one. E.g.,

actor Foo {
    private var previousTask: Task<Void, Error>?

    func add(block: @Sendable @escaping () async throws -> Void) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result

            return try await block()
        }
    }
}

There are two subtle aspects to the above:

  1. I use the capture list of [previousTask] to make sure to get a copy of the prior task.

  2. I perform await previousTask?.value inside the new task, not before it.

    If you await prior to creating the new task, you have race, where if you launch three tasks, both the second and the third will await the first task, i.e. the third task is not awaiting the second one.

And, perhaps needless to say, because this is within an actor, it avoids the need for detached task, while keeping the main thread free.

enter image description here


Note, when using unstructured concurrency (i.e., Task {…} or Task.detached {…}), you bear responsibility for handling cancelation, e.g. using withTaskCancellationHandler:

actor Foo<Value: Sendable> {
    private var previousTask: Task<Value, Error>?

    func add(block: @Sendable @escaping () async throws -> Value) async throws -> Value {
        let task = Task { [previousTask] in
            try await withTaskCancellationHandler {
                let _ = try await previousTask?.value
            } onCancel: {
                previousTask?.cancel()
            }

            return try await block()
        }

        previousTask = task

        return try await withTaskCancellationHandler {
            try await task.value
        } onCancel: {
            task.cancel()
        }
    }
}

I also extended this for blocks that might return values.

So, for example, here I added four tasks (that just Task.sleep for two seconds and then return a random value):

enter image description here

Or if you cancel the fourth task mid-way through the third task:

enter image description here

(Needless to say, this assumes that the tasks you have added support cancelation, throw CancellationError if canceled, etc. Standard Apple API, like URLSession, do all of this, but as you can see, some care needs to be taken if you introduce unstructured concurrency.)


The above is a tad brittle, so I might suggest asynchronous sequences (e.g., anything that conforms to AsyncSequence protocol, such as AsyncStream or your own custom asynchronous sequence), which can also give you serial behavior.

Or, AsyncChannel from Swift Async Algorithms is another great way to deal with a pipeline of requests triggering a serial execution of some block of code.

E.g., here is a serial download manager using AsyncChannel and a simple for-await-in loop to achieve serial behavior:

actor SerialDownloadManager {
    static let shared = SerialDownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()

    private init() {
        Task { try await startDownloader() }
    }

    // this sends URLs on the channel

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension SerialDownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        // this consumes the URLs on the channel

        for await url in urls {
            // if you want to observe in "points of interest"
            //
            // let id = OSSignpostID(log: poi)
            // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
            // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

            // download

            let (location, response) = try await self.session.download(from: url, delegate: nil)

            if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                let destination = folder.appending(component: url.lastPathComponent)
                try? FileManager.default.removeItem(at: destination)
                try FileManager.default.moveItem(at: location, to: destination)
            }
        }
    }
}

Then you can do things like:

func appendUrls() async {
    for i in 0 ..< 10 {
        await SerialDownloadManager.shared.append(baseUrl.appending(component: "\(i).jpg"))
    }
}

Yielding:

enter image description here

Or, if you want, you can allow for constrained concurrency with a task group, e.g., doing 4 at a time here:

actor DownloadManager {
    static let shared = DownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()
    private var count = 0
    private let maxConcurrency = 4       // change to 1 for serial downloads, but 4-6 is a good balance between benefits of concurrency, but not overtaxing server

    private init() {
        Task {
            do {
                try await startDownloader()
            } catch {
                logger.error("\(error, privacy: .public)")
            }
        }
    }

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension DownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        try await withThrowingTaskGroup(of: Void.self) { group in
            for await url in urls {
                count += 1
                if count > maxConcurrency { try await group.next() }

                group.addTask {
                    // if you want to observe in "points of interest"
                    //
                    // let id = OSSignpostID(log: poi)
                    // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
                    // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

                    // download

                    let (location, response) = try await self.session.download(from: url, delegate: nil)

                    if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                        let destination = folder.appending(component: url.lastPathComponent)
                        try? FileManager.default.removeItem(at: destination)
                        try FileManager.default.moveItem(at: location, to: destination)
                    }
                }
            }

            try await group.waitForAll()
        }
    }
}

Yielding:

enter image description here

For more information on asynchronous sequences, in general, see WWDC 2021 video Meet AsyncSequence.

Artema answered 21/7, 2022 at 21:22 Comment(9)
Just an observation (this might not be requested by original poster)This doesn't ensure sequential execution, I notice that something started earlier ending later.Pithecanthropus
func updateA() { print("updateA started") for _ in 1 ..< 8_000_000 {} print("updateA ended") } Similarly add updateB and updateC. Then execute the following let foo = Foo() await foo.add { await updateA() } await foo.add { await updateA() } await foo.add { await updateB() } await foo.add { await updateC() }await foo.add { await updateA() }. The output is updateA started updateA ended updateA started updateA ended updateB started updateB ended updateC started updateA started updateC ended updateA endedPithecanthropus
I can't reproduce your behavior. I've tweaked your example, as it was strange to await A, B, and C, when they weren't async methods ... if they're not async, then none of this silliness is needed and you'd just use an actor and be done with it. I also added some signposts for Instruments. Anyway, I have uploaded my example to github. Either fork it and show me the changes you've done, or just upload your own repo somewhere.Artema
agreed with your point, I am unable to reproduce it myself, I will check again, thanksPithecanthropus
Remco, as an aside, if you forgive me for saying it, but if you are having trouble reporting progress on concurrent tasks, it is much better to solve that reporting problem than to slow down the whole process by making the tasks run sequentially. Progress handles this aggregate progress problem extremely well (and progress bars can observe the Progress and update automatically).Artema
In the first example of using actors and waiting for the previous task and then call return try await block(). What will happen if a task throws an error? The signature is not marked as async throws like the second example that uses withTaskCancellationHandler. How will that error bubble up to the caller's add site?Breechcloth
@Parth – Yep, that first one is just a simple straw-man to illustrate the basic idea of awaiting the prior task (without distracting the reader with these more subtle issues). But, I am not advocating that first approach. As you note, the second example addresses your question (and supports cancelation). But it also begs the question of what precisely one wants to do if (a) any error occurs; and (b) upon cancelation. See gist.github.com/robertmryan/2cbfeaa369502411d6dbccfc12565736 for a few alternatives. Personally, I now tend to lean towards the AsyncChannel approach, anyway.Artema
Thanks Rob! The first approach is simple, quick and especially useful when you don't have access to AsyncChannel's (interviews for example). I believe AsyncChannel will need SPM, right?Breechcloth
I’ve always used SPM, but that repo provides the full source, so I guess you theoretically could just copy the relevant code into your workspace (though that’s obviously brittle and generally inadvisable). Playing with it, it’s a bit of a pain because the AsyncChannel avails itself of OrderedCollections, so you’d be pulling in a lot. And I tried importing the package in an iPad Playgrounds app (which theoretically supports packages) but failed with a parsing error.Artema

© 2022 - 2024 — McMap. All rights reserved.