In Rx, is it a responsibility of the consumer (IObserver) to deal with thread safety?
Asked Answered
S

1

2

In ReactiveX paradigm, Is it a responsibility of the consumer (IObserver) to deal with thread safety?

E.g., if OnCompleted call comes along when OnNext is still executing on another thread?

It looks like it from Rx .NET sources but the docs are somewhat vague.

Spunk answered 31/5, 2022 at 1:24 Comment(2)
A related question: Reactive Extensions OnNext thread-safetyPalinode
Thanks, I've overlooked that one, and while it nails it down.Spunk
S
2

Since I initially asked this question in a tweet, I believe I've now found an authoritative answer.

It appears I was wrong in my assumption that thread-safe serialization is the consumer's responsibility (IObserver).

According to the original Rx Design Guidelines document (a best-kept secret as it seems :)

4.2. Assume observer instances are called in a serialized fashion

As Rx uses a push model and .NET supports multithreading, it is possible for different messages to arrive different execution contexts at the same time. If consumers of observable sequences would have to deal with this in every place, their code would need to perform a lot of housekeeping to avoid common concurrency problems. Code written in this fashion would be harder to maintain and potentially suffer from performance issues.

Further:

6.7. Serialize calls to IObserver methods within observable sequence implementations Rx is a composable API, many operators can play together. If all operators had to deal with concurrency the individual operators would become very complex. Next to this, concurrency is best controlled at the place it first occurs. Finally, Consuming the Rx API would become harder if each usage of Rx would have to deal with concurrency.

And finally:

6.8. Avoid serializing operators As all Rx operators are bound to guideline 6.7, operators can safely assume that their inputs are serialized. Adding too much synchronization would clutter the code and can lead to performance degradation. If an observable sequence is not following the Rx contract (see chapter 0), it is up to the developer writing the end-user application to fix the observable sequence by calling the Synchronize operator at the first place the developer gets a hold of the observable sequence. This way the scope of additional synchronization is limited to where it is needed.

My personal take from this: if an original sequence producing IObservable can introduce parallelism when it calls OnNext, OnError, OnComplete (or when Dispose is called on its subscription), it should take care to serialize these calls properly.

Spunk answered 31/5, 2022 at 1:24 Comment(6)
Updated my list of Rx resources with the link to the Rx Design Guidelines doc: gist.github.com/noseratio/97901e4ee5b5f8acb01e2a7edf9727e0Spunk
Yes, but not in the case of the dispose. The observable is not in charge of when that is called so it can't manage the context. The observer must always assume that the disposable could be called in the middle of processing an event. (I remember reading this somewhere, but I can't find the reference now...)Uther
Vice versa. If I understand the code correctly, consider using the using operator instead of create.Uther
In your last paragraph there's no distinction between a hot nor a cold observable. There's no need to specify "hot".Adam
@Adam what I meant by that, only a source observable (that is actively emitting items) should be synchronizing, rather than an Rx operator connecting to such upstream observable. Or is this not correct?Spunk
Yes, but it is irrelevant as to the source being hot or cold.Adam

© 2022 - 2024 — McMap. All rights reserved.