With the Rx Subject
, is it thread-safe to call OnNext()
from multiple threads?
So the sequence can be generated from multiple sources.
Will merge do the same thing?
With the Rx Subject
, is it thread-safe to call OnNext()
from multiple threads?
So the sequence can be generated from multiple sources.
Will merge do the same thing?
The Rx contract requires that notifications be sequential, and is a logical necessity for several operators. That said, you can use the available Synchronize
methods to get this behaviour.
var subject = new Subject<int>();
var syncedSubject = Subject.Synchronize(subject);
You can now make concurrent calls to syncedSubject
.
For an observer which must be synchronized, you can also use:
var observer = Observer.Create<Unit>(...);
var syncedObserver = Observer.Synchronize(observer);
Test:
Func<int, Action> onNext = i => () => syncedSubject.OnNext(i);
Parallel.Invoke
(
onNext(1),
onNext(2),
onNext(3),
onNext(4)
);
No, sequences are meant to be sequential, hence no overlapping notifications are allowed. You can use Synchronize extension methods to enforce proper synchronization. Operators like Merge take a lock to call the downstream observer in order to ensure proper serial invocation on On* callbacks.
Calling someSubject.OnNext()
is as thread-safe as someList.Add()
- you can call it from > 1 thread, but not concurrently. Wrap your OnNext
in a lock
statement and it'll be safe.
© 2022 - 2024 — McMap. All rights reserved.