No, this is not thread safe, for two reasons:
The shared
instance is not concurrency-safe because it represents an unsynchronized mutable state. If you change the “Strict Concurrency Checking” build setting to “Complete”, you will receive a warning this this effect:
Static property 'shared' is not concurrency-safe because it is not either conforming to 'Sendable' or isolated to a global actor; this is an error in Swift 6
Furthermore, the cancellables
is a Set
which is not thread-safe. You need to synchronize your access to it.
In a variation of Cy-4AH’s answer (subsequently deleted), I would suggest using an actor
for synchronization. I would also add an onTermination
handler to remove the associated continuation if the asynchronous sequence was canceled. E.g.:
actor Notifier<Element: Sendable> {
typealias CancellableIdentifier = UUID
private let valuesPublisher = PassthroughSubject<Element, Never>()
private var cancellables: [CancellableIdentifier: AnyCancellable] = [:]
func values() -> AsyncStream<Element> {
.init { [valuesPublisher] continuation in
let id = CancellableIdentifier()
cancellables[id] = valuesPublisher.sink { completion in
continuation.finish()
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { _ in
Task { [weak self] in
await self?.removeCancellable(id: id)
}
}
}
}
func send(_ element: Element) {
valuesPublisher.send(element)
}
}
private extension Notifier {
func removeCancellable(id: UUID) {
cancellables.removeValue(forKey: id)
}
}
There are tons of variations on the theme, but the details of the implementation matter less than the general observations of (a) the use of an actor
; and (b) the use of the onTermination
handler to clean up in case the notifier object might outlive the individual sequences.
FWIW, if I really wanted to create a singleton for String
notifications:
actor StringNotifier {
static let shared = Notifier<String>()
private init() {}
}
As an aside, the Swift concurrency alternative to a Combine Subject
is an AsyncChannel
. But, a channel does not allow multiple observers, so you might have a collection of these channels:
actor Notifier<Element: Sendable> {
typealias ChannelIdentifier = UUID
private var channels: [ChannelIdentifier: AsyncChannel<Element>] = [:]
func values() -> AsyncStream<Element> {
.init { continuation in
let channel = AsyncChannel<Element>()
let id = ChannelIdentifier()
channels[id] = channel
let task = Task {
for await value in channel {
continuation.yield(value)
}
continuation.finish()
}
continuation.onTermination = { state in
if case .cancelled = state { task.cancel() }
Task { [weak self] in
await self?.removeChannel(id: id)
}
}
}
}
func send(_ element: Element) async {
await withDiscardingTaskGroup { group in
for channel in channels.values {
group.addTask { await channel.send(element) }
}
}
}
}
private extension Notifier {
func removeChannel(id: ChannelIdentifier) {
channels(forKey: id)
}
}
value.values
is an async stream already? – Swansonreceive(on)
? Another wheel that doesn't have to be reinvented. – SwansongetStream
, because it's callingstore(in:)
, butSet
is not thread safe. – Lightface