RxSwift. Execute separate Observables sequently
Asked Answered
L

3

7

I'm trying to achieve my Observables to execute only when previous Observable has completed. I can't use flatMap, because subscriptions can be called from different places, and this Observables is not connected with each other. To be specific: I have my CollectionView loading more content from server and 2 seconds after that user clicks "Send comment" button while CollectionView is still loading its batch. So I want to wait until CollectionView update completes and only then execute my comment's posting request. I created a class named ObservableQueue and it's working just fine. But I need to know if it has issues like memory leaks, dead locks or maybe I just missing something. Here it is:

extension CompositeDisposable {

    @discardableResult
    func insert(disposeAction: @escaping () -> ()) -> DisposeKey? {
        return insert(Disposables.create(with: disposeAction))
    }

}

class ObservableQueue {

    private let lock = NSRecursiveLock()
    private let relay = BehaviorRelay(value: 0)
    private let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "ObservableQueue.scheduler")

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        return Observable.create({ observer -> Disposable in
            let disposable = CompositeDisposable()

            let relayDisposable = self
                .relay
                .observeOn(self.scheduler)
                .filter({ value -> Bool in
                    if value > 0 {
                        return false
                    }

                    self.lock.lock(); defer { self.lock.unlock() }

                    if self.relay.value > 0 {
                        return false
                    }

                    self.relay.accept(self.relay.value + 1)

                    disposable.insert {
                        self.lock.lock(); defer { self.lock.unlock() }
                        self.relay.accept(self.relay.value - 1)
                    }

                    return true
                })
                .take(1)
                .flatMapLatest { _ in observable }
                .subscribe { observer.on($0) }

            _ = disposable.insert(relayDisposable)

            return disposable
        })
    }

}

And then I can use it like this:

let queue = ObservableQueue()

...

// first observable
let observable1 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable1)
    .subscribe(onNext: { _ in
        print("here1")
     })
    .disposed(by: rx.disposeBag)

// second observable
let observable2 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable2)
    .subscribe(onNext: { _ in
        print("here2")
    })
    .disposed(by: rx.disposeBag)

// third observable
let observable3 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable3)
    .subscribe(onNext: { _ in
        print("here3")
    })
    .disposed(by: rx.disposeBag)
Lucarne answered 28/9, 2018 at 10:29 Comment(0)
C
9

CLGeocoder has the same issue. According to the documentation, you can't call one of the geocoder methods while it's working on a previous request so very much like what you are trying to do. In this gist (https://gist.github.com/danielt1263/64bda2a32c18b8c28e1e22085a05df5a), you will find that I make the observable calls on a background thread and protect the job with semaphore. That's the key, you need a semaphore, not a lock.

Something like this should work for you:

class ObservableQueue {

    private let semaphore = DispatchSemaphore(value: 1)
    private let scheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated)

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        let _semaphore = semaphore // To avoid the use of self in the block below
        return Observable.create { observer in
            _semaphore.wait()
            let disposable = observable.subscribe { event in
                switch event {
                case .next:
                    observer.on(event)
                case .error, .completed:
                    observer.on(event)
                }
            }
            return Disposables.create {
                disposable.dispose()
                _semaphore.signal()
            }
        }
        .subscribeOn(scheduler)
    }
}
Centerboard answered 29/9, 2018 at 0:47 Comment(3)
I will not use a lot of instances of ObservableQueue in a time. Generally speaking I will use only one ObservableQueue. But blocking hole thread on which SerialDispatchQueueScheduler is executing just to wait DispatchSemaphore doesn't look good solution to me. Anyway I appreciate your help. Can you tell whether everything is fine with my solution besides capturing self?Lucarne
Your solution is "fine" in the same way that var count = 0; for _ in myString { count += 1 } is a "fine" way to count the characters in a string. Your solution performs side effects inside a filter and modifies the very thing it is observing both of which are very much against best practices. As important, and something I don't think you realize, is that in your solution you are implementing a semaphore. Your manual implementation of semaphore is likely less efficient because you can't take advantage of knowledge of the hardware like the language provided semaphore does.Centerboard
The gist is 404, could someone update the link please?Innings
D
0

I will give you some suggestions that I think will help you in the future.

  1. Avoid as much as possible the Observable.create, this is the "brute force" creation of an observable and it doesn't handle back pressure at all, you'll have to implement it yourself, and it's not something easy.

  2. Usually for HTTP api calls, you don't need Observable, you should use Single or Completable since you expect only one response from your server, not a stream of responses.

  3. You should be careful with strong self inside the onNext/on..., as a rule of thumb if the class that subscribes to the observer has the dispose bag, you should use a weak self.

Now for your particular case, if you need to just this pair of observers (fetch & send comment), I think the queue is a little bit overkill. You can simply call the post comment observer (if available) on the do(onNext:) method of your "fetch" observer. Do on next is called every time an "onNext" event is triggered.

If you still need a queue, I would go with an OperationQueue that enqueues only operations and has a method like observeOperationchanges() -> Observeble<Operation> this will be triggered every time an operation is completed. In this way you subscribe once and enqueue multiple times, but this might not fit your needs.

Deodorant answered 28/9, 2018 at 10:47 Comment(3)
Thanks for answer, but it's not enough clear to me why should I avoid using Observable.create and what does mean "doesn't handle back pressure at all". I mean, official RxSwift docs contains a lot of example of creating Observables with create method. Also when class that subscribes to the observer has the dispose bag deinits, DisposeBag will be deinited too and it wil cause captured self inside filter operator of enqueue method to escape. Am I correct?Lucarne
I can't use do(onNext:) because yes, user can press "Send comment" button when CollectionView is fetching something, but he also can do it when there is no fetching at the moment, so I would want to execute Observable immediately.Lucarne
RxSwift itself doesn't handle back pressure at all anywaySpaceship
M
0

I would use .combineLatest() to produce an event once both observables have emitted something. See http://rxmarbles.com/#combineLatest

Monospermous answered 28/9, 2018 at 11:41 Comment(2)
Thanks for answer, but as I've mentioned in my question, I don't have reference to currently executing Observables, so I can use neither flatMap nor combineLatest.Lucarne
But both have access to 'queue', right? You will need this reference one way or another. I personally use github.com/maxvol/RaspSwift to keep a global state in case to synchronise between different async activities.Monospermous

© 2022 - 2024 — McMap. All rights reserved.