Thread safe combine publisher to AsyncStream
Asked Answered
K

1

5

I try to figure out is this approach thread safe if getStream() and update(value: ...) will be called on difference thread simultaneously?

final class SomeNotifier {

static let shared = SomeNotifier()

private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []

private init() {}

func getStream() -> AsyncStream<String> {
    return AsyncStream { [weak self] continuation in
        guard let self = self else { return }

        self.value.sink { completion in
            switch completion {
            case .finished:
                continuation.finish()
            case .failure:
                continuation.finish()
            }
        } receiveValue: { value in
            continuation.yield(value)
        }
        .store(in: &cancellables)
    }
}

func update(value: String) {
    self.value.send(value)
}

I want to have some repository that can notify different observers about change of internal state

Kappa answered 27/3, 2023 at 11:28 Comment(6)
Is it really necessary to reinvent the wheel? Given that value.values is an async stream already?Swanson
And if there's a thread doubt why not insert receive(on)? Another wheel that doesn't have to be reinvented.Swanson
So what happens if the stream ins cancelled? This seems very unnecessary and unsafeLyrebird
I think cancellation is ok. Not ok is calling getStream, because it's calling store(in:), but Set is not thread safe.Lightface
about values - unfortunately have to support iOS 13Kappa
about cancellation - in my case this object will be alive all time when app aliveKappa
C
7

No, this is not thread safe, for two reasons:

  1. The shared instance is not concurrency-safe because it represents an unsynchronized mutable state. If you change the “Strict Concurrency Checking” build setting to “Complete”, you will receive a warning this this effect:

    Static property 'shared' is not concurrency-safe because it is not either conforming to 'Sendable' or isolated to a global actor; this is an error in Swift 6

  2. Furthermore, the cancellables is a Set which is not thread-safe. You need to synchronize your access to it.


In a variation of Cy-4AH’s answer (subsequently deleted), I would suggest using an actor for synchronization. I would also add an onTermination handler to remove the associated continuation if the asynchronous sequence was canceled. E.g.:

actor Notifier<Element: Sendable> {
    typealias CancellableIdentifier = UUID

    private let valuesPublisher = PassthroughSubject<Element, Never>()
    private var cancellables: [CancellableIdentifier: AnyCancellable] = [:]

    func values() -> AsyncStream<Element> {
        .init { [valuesPublisher] continuation in
            let id = CancellableIdentifier()

            cancellables[id] = valuesPublisher.sink { completion in
                continuation.finish()
            } receiveValue: { value in
                continuation.yield(value)
            }

            continuation.onTermination = { _ in
                Task { [weak self] in
                    await self?.removeCancellable(id: id)
                }
            }
        }
    }

    func send(_ element: Element) {
        valuesPublisher.send(element)
    }
}

private extension Notifier {
    func removeCancellable(id: UUID) {
        cancellables.removeValue(forKey: id)
    }
}

There are tons of variations on the theme, but the details of the implementation matter less than the general observations of (a) the use of an actor; and (b) the use of the onTermination handler to clean up in case the notifier object might outlive the individual sequences.


FWIW, if I really wanted to create a singleton for String notifications:

actor StringNotifier {
    static let shared = Notifier<String>()
    private init() {}
}

As an aside, the Swift concurrency alternative to a Combine Subject is an AsyncChannel. But, a channel does not allow multiple observers, so you might have a collection of these channels:

actor Notifier<Element: Sendable> {
    typealias ChannelIdentifier = UUID

    private var channels: [ChannelIdentifier: AsyncChannel<Element>] = [:]

    func values() -> AsyncStream<Element> {
        .init { continuation in
            let channel = AsyncChannel<Element>()
            let id = ChannelIdentifier()
            channels[id] = channel

            let task = Task {
                for await value in channel {
                    continuation.yield(value)
                }
                continuation.finish()
            }

            continuation.onTermination = { state in
                if case .cancelled = state { task.cancel() }

                Task { [weak self] in
                    await self?.removeChannel(id: id)
                }
            }
        }
    }

    func send(_ element: Element) async {
        await withDiscardingTaskGroup { group in
            for channel in channels.values {
                group.addTask { await channel.send(element) }
            }
        }
    }
}

private extension Notifier {
    func removeChannel(id: ChannelIdentifier) {
        channels(forKey: id)
    }
}
Corruption answered 27/3, 2023 at 18:31 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.