Reactive Extensions OnNext thread-safety
Asked Answered
S

3

25

With the Rx Subject, is it thread-safe to call OnNext() from multiple threads?

So the sequence can be generated from multiple sources.

Will merge do the same thing?

Stiegler answered 4/9, 2012 at 20:7 Comment(1)
very interesting question !Milburr
E
28

The Rx contract requires that notifications be sequential, and is a logical necessity for several operators. That said, you can use the available Synchronize methods to get this behaviour.

var subject = new Subject<int>();
var syncedSubject = Subject.Synchronize(subject);            

You can now make concurrent calls to syncedSubject. For an observer which must be synchronized, you can also use:

var observer = Observer.Create<Unit>(...);
var syncedObserver = Observer.Synchronize(observer);

Test:

Func<int, Action> onNext = i => () => syncedSubject.OnNext(i);
Parallel.Invoke
(
    onNext(1),
    onNext(2),
    onNext(3),
    onNext(4)
);
Encyst answered 5/9, 2012 at 6:50 Comment(0)
M
8

No, sequences are meant to be sequential, hence no overlapping notifications are allowed. You can use Synchronize extension methods to enforce proper synchronization. Operators like Merge take a lock to call the downstream observer in order to ensure proper serial invocation on On* callbacks.

Markova answered 4/9, 2012 at 20:44 Comment(1)
It is possible to provide a example using the Synchronize extension method?Stiegler
N
8

Calling someSubject.OnNext() is as thread-safe as someList.Add() - you can call it from > 1 thread, but not concurrently. Wrap your OnNext in a lock statement and it'll be safe.

Nest answered 5/9, 2012 at 6:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.