Swift 5.6 how to put async Task into a queue
Asked Answered
C

5

18

Let say I have this code

class Duck{
    
    func walk() async {
        //do something
        print("walk start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("walk end")
    }
    
    func quack() async {
        //do something...
        print("quack start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("quack end")
    }
    
    func fly() async{
        //do something
        print("fly start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("fly end")
    }
    
}

let duck = Duck()

Task{
    await duck.walk()
}

Task{
    await duck.quack()
}

Task{
    await duck.fly()
}

This would print

walk start
quack start
fly start
walk end
quack end
fly end

which I understand and expected. But what if I want those 3 Tasks run sequentially? Let say each Task is created by user pressing a button. I want the tasks to queue up in the background and run one by one. Is there any thing like you can queue up DispatchWorkItem in a DispatchQueue, but a Task version?


Edit:

I came up with a solution, but I am not sure if this is a good way to implement it. As this implementation potentially create many layer of cascaded Task, I wonder if there would be risk of stack overflow or memory leaks?

class TaskQueue{
    private var currentTask : Task<Void,Never> = Task{}
    
    func dispatch(block:@escaping () async ->Void){
        
        let oldTask = currentTask
        currentTask = Task{
            _ = await oldTask.value
            await block()
        }
    }
}

taskQueue.dispatch {
    await duck.walk()
}
taskQueue.dispatch {
    await duck.quack()
}
taskQueue.dispatch {
    await duck.fly()
}
Choe answered 23/3, 2022 at 11:43 Comment(6)
Just put the 3 await in the same taskMarcos
@PtitXav This is a simplified example. Please assume those 3 tasks are created separately in different part of the program, e.g. By user pressing buttons.Choe
If you put tasks in same dispatch queue these task should process in order (cf appleMarcos
@PtitXav Tasks do not serialize on DispatchQueues the same way blocks do. They can be interrupted any time they await and other Tasks may be scheduled on the same queue.Lauralee
@RobNapier - I get why we need reentrancy in our actors, but I really hope they proceed with the non-reentrancy “future direction” (or some other control over the degree of concurrency other than just the cooperative thread pool ceiling) at some point.Cardona
@Cardona Absolutely. I'm just saying they don't. I'm currently contemplating whether the new asyncsequence algorithms will help here. We really do need a something like TaskQueue below (a traditional "actor mailbox" which is what most people who are used to actors expect; Swift's take on actors is very interesting and I think really innovative, but it's also deeply unlike every other use of the word "actor"). I think a data structure like TaskQueue is better than more magical annotations, though. It's hard to reason about whether code is correct when it could be annotated far away.Lauralee
C
21

I once was a proponent of the unstructured task approach, where each would await the prior one. In retrospect, this feels a bit brittle to me. Increasingly (with credit to Rob Napier for nudging me in this direction), I now use asynchronous sequences, specifically AsyncChannel from Apple’s swift-async-algorithms. I think it is a more robust behavior and is more consistent with the asynchronous sequences of modern Swift concurrency.

Before we come to your example, consider this serial downloader, where we have one process (the user button clicking) send URL objects to another process monitoring the channel for URLs in a for-await-in loop:

struct DownloadView: View {
    @StateObject var viewModel = DownloadViewModel()

    var body: some View {
        VStack {
            Button("1") { Task { await viewModel.appendDownload(1) } }
            Button("2") { Task { await viewModel.appendDownload(2) } }
            Button("3") { Task { await viewModel.appendDownload(3) } }
        }
        .task {
            await viewModel.monitorDownloadRequests()
        }
    }
}

@MainActor
class DownloadViewModel: ObservableObject {
    private let session: URLSession = …
    private let baseUrl: URL = …
    private let folder: URL = …
    private let channel = AsyncChannel<URL>()   // note, we're sending URLs on this channel

    func monitorDownloadRequests() async {
        for await url in channel {
            await download(url)
        }
    }

    func appendDownload(_ index: Int) async {
        let url = baseUrl.appending(component: "\(index).jpg")
        await channel.send(url)
    }

    func download(_ url: URL) async {
        do {
            let (location, _) = try await session.download(from: url)
            let fileUrl = folder.appending(component: url.lastPathComponent)
            try? FileManager.default.removeItem(at: fileUrl)
            try FileManager.default.moveItem(at: location, to: fileUrl)
        } catch {
            print(error)
        }
    }
}

We start monitorDownloadRequests and then append download requests to the channel.

This performs the requests serially (because monitorDownloadRequests has a for-await loop). E.g., in Instruments’ “Points of Interest” tool, I have added some Ⓢ signposts where I clicked these buttons, and show intervals where the requests happen, and you can see that these three requests happen sequentially.

enter image description here

But the wonderful thing about channels is that they offer serial behaviors without introducing the problems of unstructured concurrency. They also handle cancelation automatically (if you want that behavior). If you cancel the for-await-in loop (which the .task {…} view modifier does for us automatically in SwiftUI when the view is dismissed). If you have a bunch of unstructured concurrency, with one Task awaiting the prior one, handling cancelation gets messy quickly.


Now, in your case, you are asking about a more general queue, where you can await tasks. Well, you can have an AsyncChannel of closures:

typealias AsyncClosure = () async -> Void

let channel = AsyncChannel<AsyncClosure>()

E.g.:

typealias AsyncClosure = () async -> Void

struct ExperimentView: View {
    @StateObject var viewModel = ExperimentViewModel()

    var body: some View {
        VStack {
            Button("Red")   { Task { await viewModel.addRed() } }
            Button("Green") { Task { await viewModel.addGreen() } }
            Button("Blue")  { Task { await viewModel.addBlue() } }
        }
        .task {
            await viewModel.monitorChannel()
        }
    }
}

@MainActor
class ExperimentViewModel: ObservableObject {
    let channel = AsyncChannel<AsyncClosure>()

    func monitorChannel() async {
        for await block in channel {
            await block()
        }
    }

    func addRed() async {
        await channel.send { await self.red() }
    }

    func addGreen() async {
        await channel.send { await self.green() }
    }

    func addBlue() async {
        await channel.send { await self.blue() }
    }

    func red() async { … }

    func green() async { … }

    func blue() async { … }
}

That yields:

enter image description here

Here again, I am using Instruments to visualize what is going on. I clicked the “red”, “green”, and “blue” buttons quickly, in succession, twice. I then watched the six corresponding intervals for these three second tasks. I then repeated that six-click process a second time, but this time I dismissed the view in question before they finished, mid-way through the green task of the second series of button taps, illustrating the seamless cancelation capabilities of AsyncChannel (and asynchronous sequences in general).

Now, I hope you forgive me, as I omitted the code to create all of these “Points of Interest” signposts and intervals, as it adds a lot of kruft that really is not relevant to the question at hand (but see this if you are interested). But hopefully these visualizations help illustrate what is going on.

The take-home message is that AsyncChannel (and its sibling AsyncThrowingChannel) is a great way to remain within structured concurrency, but get serial (or constrained behavior, like shown at the end of this answer) that we used to get with queues, but with asynchronous tasks.

I must confess that this latter AsyncClosure example, while it hopefully answers your question, feels a little forced to my eye. I have been using AsyncChannel for a few months now, and I personally always have a more concrete object being handled by the channel (e.g., URLs, GPS locations, image identifiers, etc.). This example with closures feels like it is trying just a little too hard to reproduce old fashioned dispatch/operation queue behaviors.

Cardona answered 14/3, 2023 at 8:9 Comment(6)
Thank you for suggesting the swift-async-alogrithm pacakge! I have been doing similar implementation with kotlin coroutine when working on android. I have also noticed that swift got the AsyncSequence and AsyncStream API and started translating my kotlin code to swift. But swift lack the very convenient Channel API like kotlin does so I wrote it with AsyncStream. AsyncChannel seems like the missing piece I have been looking for! I felt like I was almost going to write my own AsyncChannel from scratch. No idea that it resides in a package that need to be manually added. Thank you.Choe
What if you had to return something from the closure? (for example let result = await viewModel.addRed()?) Would AsyncChannel still work in this case?Costumer
Sadly, no. As the documentation says, “Each value sent by the channel will await the consumption of that value by iteration” (emphasis added). So awaits the “consumption” of that value, but not whatever the routine subsequently does with that consumed value. In short, send cannot (and does not) return anything. If you're returning stuff, you would need some other mechanism for that, e.g., another asynchronous sequence (!) of emitted values.Cardona
I confess that when I first stumbled across this, I had a knee-jerk “what were they thinking” reaction, but after noodling on what the implementation of a bidirectional channel might look like (esp on the consumer side), I can see why they did it that way. It’s not immediately obvious how a bidirectional channel would fit nicely into the AsyncSequence pattern. Sure, we could make something with some completion handler closure rendition of next, but that gets pretty ugly, pretty quickly.Cardona
Can you test it with Task.sleep before await viewModel.addRed() like in the OP's code? Just wondering if your implementation causes overlapping tasks.Waldowaldon
In the OP’s question, he has Task.sleep inside his walk, quack, and fly methods, just like I did in my red, green, and blue methods. I assume you ask because you’re concerned about some actor reentrancy, but since the channel does an await of each closure/block in the channel, that’s not a concern. These 3 methods can even be nonisolated (i.e., run on different threads) and you still enjoy sequential behavior. Anyway, I created a MRE here, so you can play with it, if interested.Cardona
C
6

Update:

For future people who find this post useful, I have created a swift package with better implementation and added support for queuing up AsyncThrowingStream too.

https://github.com/rickymohk/SwiftTaskQueue


Here is my updated implementation which I think is safer than the one I posted in the question. The TaskQueueActor part does all of the job, I wrap it with an outer class just to make it cleaner when calling from a non-async context.

class TaskQueue{
    
    private actor TaskQueueActor{
        private var blocks : [() async -> Void] = []
        private var currentTask : Task<Void,Never>? = nil
        
        func addBlock(block:@escaping () async -> Void){
            blocks.append(block)
            next()
        }
        
        func next()
        {
            if(currentTask != nil) {
                return
            }
            if(!blocks.isEmpty)
            {
                let block = blocks.removeFirst()
                currentTask = Task{
                    await block()
                    currentTask = nil
                    next()
                }
            }
        }
    }
    private let taskQueueActor = TaskQueueActor()
    
    func dispatch(block:@escaping () async ->Void){
        Task{
            await taskQueueActor.addBlock(block: block)
        }
    }
}
Choe answered 23/3, 2022 at 13:47 Comment(1)
Tasks don't execute in the same order they are created. So "dispatch" method you added actually breaks queue functionalityOperon
W
4

I found this one on Github: https://github.com/gshahbazian/playgrounds/blob/main/AsyncAwait.playground/Sources/TaskQueue.swift

via

https://forums.swift.org/t/how-do-you-use-asyncstream-to-make-task-execution-deterministic/57968/18

import Foundation

public actor TaskQueue {
    private let concurrency: Int
    private var running: Int = 0
    private var queue = [CheckedContinuation<Void, Error>]()

    public init(concurrency: Int) {
        self.concurrency = concurrency
    }

    deinit {
        for continuation in queue {
            continuation.resume(throwing: CancellationError())
        }
    }

    public func enqueue<T>(operation: @escaping @Sendable () async throws -> T) async throws -> T {
        try Task.checkCancellation()

        try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
            queue.append(continuation)
            tryRunEnqueued()
        }

        defer {
            running -= 1
            tryRunEnqueued()
        }
        try Task.checkCancellation()
        return try await operation()
    }

    private func tryRunEnqueued() {
        guard !queue.isEmpty else { return }
        guard running < concurrency else { return }

        running += 1
        let continuation = queue.removeFirst()
        continuation.resume()
    }
}

Seems to work

@StateObject var taskQueue = TaskQueue(concurrency: 1)

            .task {
                try? await taskQueue.enqueue {
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 1")
                }
                try? await taskQueue.enqueue {
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 2")
                }
                try? await taskQueue.enqueue {
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 3")
                }
Waldowaldon answered 15/12, 2022 at 15:50 Comment(0)
B
0

one more way is to use SwiftPM package AsyncFifo https://github.com/sugarbaron/async-fifo . it allows you to enqueue your tasks like with DispatchQueue, but in async/await way:

let background: AsyncFifo = .init()

background.enqueue { await someFunc1() }
background.enqueue { someSynchFunc() }
background.enqueue { await whatever() }

// executes sequentally
// someFunc1() -> then someSycnhFunc() -> then whatever()

or you can just add AsyncFifo source code in your project: (wall of code incoming)



import Foundation

// MARK: constructor
public extension Async {
    
    final class Fifo {
        
        private var queue: [Scheduled]
        private let access: NSRecursiveLock
        private var executing: Bool
        
        public init() {
            self.queue = [ ]
            self.access = NSRecursiveLock()
            self.executing = false
        }
        
    }
    
}

// MARK: interface
public extension Async.Fifo {
    
    func enqueue(_ coroutine: @Sendable @escaping () async throws -> Void,
                 catch: @escaping (Error) -> Void = { print("[x][Async.Fifo] coroutine throws: \($0)") }) {
        schedule(coroutine, `catch`)
        inBackground { [weak self] in await self?.executeSequentally() }
    }
    
    var isBusy: Bool {
        access.lock()
        let isBusy: Bool = executing || !(queue.isEmpty)
        access.unlock()
        return isBusy
    }
    
    var queueSize: Int {
        access.lock()
        let size: Int = queue.count + (executing ? 1 : 0)
        access.unlock()
        return size
    }
    
    func cancelAll() {
        access.lock()
        queue = [ ]
        access.unlock()
    }
    
}

// MARK: tools
private extension Async.Fifo {
    
    func schedule(_ coroutine: @Sendable @escaping () async throws -> Void, _ catch: @escaping (Error) -> Void) {
        access.lock()
        queue.append((coroutine, `catch`))
        access.unlock()
    }
    
    func executeSequentally() async {
        if alreadyExecuting { return }
        while let next: Scheduled {
            do    { try await next.coroutine() }
            catch { next.catch(error) }
        }
    }
    
    var next: Scheduled? {
        access.lock()
        if queue.isEmpty { executing = false; access.unlock(); return nil }
        let next: Scheduled = queue.removeFirst()
        access.unlock()
        return next
    }
    
    var alreadyExecuting: Bool {
        access.lock()
        let executing = self.executing
        if executing == false { self.executing = true }
        access.unlock()
        return executing
    }
    
    typealias Scheduled = (coroutine: () async throws -> Void, catch: (Error) -> Void)
    
}

/// namespace class
public final class Async { }

public extension Async {  typealias Task = _Concurrency.Task }

@inlinable public func concurrent<T>(function: String = #function, _ callback: (CheckedContinuation<T, Error>) -> Void)
async throws -> T {
    try await withCheckedThrowingContinuation(function: function, callback)
}

@discardableResult
public func inBackground<T:Sendable>(_ coroutine: @Sendable @escaping () async throws -> T) -> Async.Task<T, Error> {
    Async.Task.detached(priority: .low, operation: coroutine)
}

@discardableResult
public func onMain<T:Sendable>(_ coroutine: @MainActor @Sendable @escaping () throws -> T) -> Async.Task<T, Error> {
    Async.Task.detached { try await MainActor.run { try coroutine() } }
}

@discardableResult
public func onMain<T:Sendable>(after delay: TimeInterval, _ coroutine: @MainActor @Sendable @escaping () throws -> T)
-> Async.Task<T, Error> {
    Async.Task.detached { await idle(delay); return try await MainActor.run { try coroutine() } }
}

public func idle(_ duration: TimeInterval) async {
    do    { try await Task.sleep(nanoseconds: UInt64(duration * 1e9)) }
    catch { print("[x][Async] sleep interrupted: \(error)") }
}
Bakst answered 22/7, 2023 at 15:29 Comment(0)
A
0

My own version of a task queue, similar to @malhal's answer but shorter and in my opinion easier to understand. I didn't include any check for cancellation as I think this should be left to the tasks themselves, just like OperationQueue does.

actor TaskQueue {
    
    private let maxConcurrentTasks: Int
    private var runningTasks = 0
    private var queue = [CheckedContinuation<Void, Never>]()
    
    init(maxConcurrentTasks: Int) {
        self.maxConcurrentTasks = maxConcurrentTasks
    }
    
    func add<T>(_ task: @escaping () async throws -> T) async throws -> T {
        if runningTasks >= maxConcurrentTasks {
            await withCheckedContinuation { continuation in
                queue.append(continuation)
            }
        }
        
        runningTasks += 1
        defer {
            runningTasks -= 1
            if runningTasks < maxConcurrentTasks && !queue.isEmpty {
                queue.removeFirst().resume()
            }
        }
        return try await task()
    }
    
}
Amphitropous answered 7/8, 2024 at 14:53 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.