Swift: Map AsyncStream into another AsyncStream
Asked Answered
T

6

8

Update

The accepted answer did not directly answer the original question, but helped resolve the underlying issue I tried to solve: I wanted to map an AsyncStream (which is an AsyncSequence) into another AsyncSequence with element type T2. I added some details in this comment.

Original question

I would like to map an AsyncStream into another AsyncStream. I wonder if there is a .map that can be used just like for arrays.

Quoting from Apple documentation:

Creates an asynchronous sequence that maps the given closure over the asynchronous sequence’s elements.

To code below has an error:

Cannot convert value of type 'AsyncMapSequence<AsyncStream<Int>, Int>' to specified type 'AsyncStream<Int>'

As I understand, it is because the return type of .map in this case is AsyncMapSequence<...> instead of AsyncStream<Int>.

Is there a way to just map an AsyncStream<T1> into an AsyncStream<T2> with a transform function T1 → T2, as it works for mapping Array<T1> into Array<T2>?

Thank you in advance!

import SwiftUI

@main
struct MacosPlaygroundApp: App {
    var body: some Scene {
        WindowGroup("Playground") {
            Text("Hello World")
                .padding(100)
                .onAppear {
                    Task {
                        let numStream: AsyncStream<Int> = AsyncStream { continuation in
                            Task {
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(0)
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(1)
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(2)
                                continuation.finish()
                            }
                        }

                        let doubleNumStream: AsyncStream<Int> = numStream.map { num in
                            return 2 * num
                        }

                        for await doubleNum in doubleNumStream {
                            print("Next num is \(doubleNum)")
                        }

                    }
                }
        }
    }
}

Tager answered 20/8, 2022 at 9:58 Comment(3)
Why not just remove : AsyncStream<Int> and let the type of doubleNum be AsyncMapSequence<AsyncStream<Int>, Int>. See also: https://mcmap.net/q/1469211/-how-to-implement-a-function-returning-an-asyncsequence-whose-results-relies-on-another-asyncsequence/5133585Tawnytawnya
Let's say I have a function, input is some async sequence of data of a certain type T, and for each such T item, it does something with it (e.g. stores something about it in UserDefaults). For this function, it doesn't matter how that async sequence was calculated, e.g. whether it was mapped from another sequence or not. Ideally I would type it as AsyncSequence<T> (T being a specific type in my actual code), but AsyncSequence doesn't take type parameters. So I thought the next most generic observable type is AsyncStream<T>.Tager
@Tawnytawnya What do you think about this solution?Tager
K
2

You said:

Let's say I have a function, input is some async sequence of data of a certain type T, and for each such T item, it does something with it... For this function, it doesn't matter how that async sequence was calculated, e.g. whether it was mapped from another sequence or not. Ideally I would type it as AsyncSequence<T> (T being a specific type in my actual code), but AsyncSequence doesn't take type parameters.

I would suggest that you define this function to use AsyncSequence, e.g., here is a method that prints the values of the sequence:

func printSequence<S: AsyncSequence>(_ sequence: S) async throws where S.Element == Int {
    for try await value in sequence {
        print("Next num is \(value)")
    }
    print("done")
}

This will work with any AsyncSequence of Int, either the original numStream or the mapped doubleNumStream.

Then, as Sweeper said, you can just use the existing map of AsyncSequence:

Task {
    let numStream = AsyncStream<Int> { continuation in
        Task {
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(0)
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(1)
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(2)
            continuation.finish()
        }
    }

    let doubleNumStream = numStream.map { num in             // let it just infer the type for you
        return 2 * num
    }

    try await printSequence(doubleNumStream)
}
Kitten answered 20/8, 2022 at 22:17 Comment(1)
Thank you for showing me how to constrain the associated type Element of AsyncSequence, it helps express precisely the function input type, better than AsyncStream<Int>. Also thank you for reminding me of continuation.finish(), in my sandbox I did add it, but forgot to update this post - just did now.Tager
T
2

How about extending AsyncStream?

extension AsyncStream {
    public func map<Transformed>(_ transform: @escaping (Self.Element) -> Transformed) -> AsyncStream<Transformed> {
        return AsyncStream<Transformed> { continuation in
            Task {
                for await element in self {
                    continuation.yield(transform(element))
                }
                continuation.finish()
            }
        }
    }

    public func map<Transformed>(_ transform: @escaping (Self.Element) async -> Transformed) -> AsyncStream<Transformed> {
        return AsyncStream<Transformed> { continuation in
            Task {
                for await element in self {
                    continuation.yield(await transform(element))
                }
                continuation.finish()
            }
        }
    }
}
Tager answered 20/8, 2022 at 12:5 Comment(4)
Any chance you managed to improve that ? Works fine for me thoughVagrancy
I think you lose cancellation support if you use Task { like thatMaziemazlack
@Maziemazlack is right, although the fix to address that is pretty simple: ` public func map<T>(transform: @escaping (Self.Element) -> T) -> AsyncStream<T> { AsyncStream<T> { continuation in let task = Task<Void, Never> { for await element in self { continuation.yield(transform(element)) } continuation.finish() } continuation.onTermination = { _ in task.cancel() } } } `Palaestra
Does not work on Swift 6. Task-isolated value of type '() async -> ()' passed as a strongly transferred parameter; later accesses could raceStreetlight
K
2

You said:

Let's say I have a function, input is some async sequence of data of a certain type T, and for each such T item, it does something with it... For this function, it doesn't matter how that async sequence was calculated, e.g. whether it was mapped from another sequence or not. Ideally I would type it as AsyncSequence<T> (T being a specific type in my actual code), but AsyncSequence doesn't take type parameters.

I would suggest that you define this function to use AsyncSequence, e.g., here is a method that prints the values of the sequence:

func printSequence<S: AsyncSequence>(_ sequence: S) async throws where S.Element == Int {
    for try await value in sequence {
        print("Next num is \(value)")
    }
    print("done")
}

This will work with any AsyncSequence of Int, either the original numStream or the mapped doubleNumStream.

Then, as Sweeper said, you can just use the existing map of AsyncSequence:

Task {
    let numStream = AsyncStream<Int> { continuation in
        Task {
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(0)
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(1)
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(2)
            continuation.finish()
        }
    }

    let doubleNumStream = numStream.map { num in             // let it just infer the type for you
        return 2 * num
    }

    try await printSequence(doubleNumStream)
}
Kitten answered 20/8, 2022 at 22:17 Comment(1)
Thank you for showing me how to constrain the associated type Element of AsyncSequence, it helps express precisely the function input type, better than AsyncStream<Int>. Also thank you for reminding me of continuation.finish(), in my sandbox I did add it, but forgot to update this post - just did now.Tager
S
2

You can add:

extension AsyncStream {
    init<Sequence: AsyncSequence>(_ sequence: Sequence) where Sequence.Element == Element {
        self.init {
            var iterator: Sequence.AsyncIterator?
            if iterator == nil {
                iterator = sequence.makeAsyncIterator()
            }
            return try? await iterator?.next()
        }
    }

    func eraseToStream() -> AsyncStream<Element> {
        AsyncStream(self)
    }
}

And then do

let doubleNumStream: AsyncStream<Int> = numStream
    .map { num in
        return 2 * num
    }
    .eraseToStream()
Speckle answered 2/3, 2023 at 15:16 Comment(1)
This didn't work for me, elements were lost. This one has a lock that perhaps fixes the problem: github.com/pointfreeco/swift-concurrency-extras/blob/…Maziemazlack
P
1

This suggestion from bzyr is a good one. One thing missing is proper cleanup of the task when the continuation is terminated.

Dropping this as a top level comment since StackOverflow doesn't seem to properly format code blocks in replies:

extension AsyncStream {
public func map<Transformed>(_ transform: @escaping (Self.Element) -> Transformed) -> AsyncStream<Transformed> {
    return AsyncStream<Transformed> { continuation in
        let task = Task {
            for await element in self {
                continuation.yield(transform(element))
            }
            continuation.finish()
        }
        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

public func map<Transformed>(_ transform: @escaping (Self.Element) async -> Transformed) -> AsyncStream<Transformed> {
    return AsyncStream<Transformed> { continuation in
        let task = Task {
            for await element in self {
                continuation.yield(await transform(element))
            }
            continuation.finish()
        }
        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

}

Palaestra answered 9/5, 2024 at 21:14 Comment(1)
Does not work on Swift 6. Task-isolated value of type '() async -> ()' passed as a strongly transferred parameter; later accesses could raceStreetlight
S
0

I too find this intriguing for a few things

  1. Initially I used map as that is how we intuitively deal with synchronous sequences. As you mentioned in the description, that gives "AsyncMapSequence<AsyncStream, Int>" which to me seems like a leaking of implementation detail, as we don't need to know what is the exact type of the input sequence. Similar thoughts shared in Donny Wals' post here.
  2. I tried your solution with the Task block and it works totally fine. With an unmanaged task though, I always have the fear for it outliving the context and subsequently consume system resources longer than we wanted.
  3. Again, the strong typing of AsyncMapSequence and its other counterparts (compactMap, filter, throwing, etc.) make them hard to pass around. The community had some discussion on this but there are no leads on what could be fixed. Using such a type locally isn't a big deal, but their combinatorial complexity makes them not feasible to be designed into APIs that process various types of generated async sequences.

In our usecase, we end up designing the API by loosen the concrete type requirement to a protocol. Because the Element in the AsyncMapSequence is defined as the second type Transformed, it allows our protocol to access it without a problem.

Swartz answered 15/8, 2023 at 1:15 Comment(0)
S
0

Updating @bzyr and @julo answers for Swift 6.

extension AsyncStream {
    public func map<Transformed: Sendable>(
        _ transform: @escaping @Sendable (Element) -> Transformed
    ) -> AsyncStream<Transformed> where Element: Sendable {
        return AsyncStream<Transformed> { continuation in
            let task = Task {
                for await element in self {
                    continuation.yield(transform(element))
                }
                continuation.finish()
            }
            continuation.onTermination = { _ in
                task.cancel()
            }
        }
    }
    
    public func map<Transformed>(
        _ transform: @escaping @Sendable (Element) async -> Transformed
    ) -> AsyncStream<Transformed> where Element: Sendable {
        return AsyncStream<Transformed> { continuation in
            let task = Task {
                for await element in self {
                    continuation.yield(await transform(element))
                }
                continuation.finish()
            }
            continuation.onTermination = { _ in
                task.cancel()
            }
        }
    }
}
Streetlight answered 24/9, 2024 at 17:21 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.