InterruptedException in RxJava cache thread when debugging on Android
Asked Answered
B

1

7

Sometimes when I am debugging my app, I encounter InterruptedException in RxCachedThreadScheduler-1. Here's the trace:

Fatal Exception: java.lang.InterruptedException
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1991)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2025)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1048)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:776)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1035)

I have a custom view in which I subscribe to my observable like this:

@Override
protected void onAttachedToWindow() {
    super.onAttachedToWindow();

    sub = FollowHandler.getInstance().getObservable()
            .filter(new Func1<FollowEvent, Boolean>() {
                @Override
                public Boolean call(FollowEvent followEvent) {
                    if(followEvent == null || followEvent.user == null
                            || user == null)
                        return false;

                    return followEvent.user.id == user.id;
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<FollowEvent>() {
                @Override
                public void onCompleted() {}

                @Override
                public void onError(Throwable e) {}

                @Override
                public void onNext(FollowEvent followEvent) {
                    reactToThisNiceEvent(followEvent);
                }
            });
}

@Override
protected void onDetachedFromWindow() {
    super.onDetachedFromWindow();

    if(sub != null)
        sub.unsubscribe();
}

Here's the observable:

eventSubject.asObservable()
        .observeOn(Schedulers.io())
        .doOnNext(new Action1<FollowEvent>() {
            @Override
            public void call(FollowEvent followEvent) {
                if(followEvent != null)
                    doSomethingNice(followEvent);
            }
        })
        .share();

in which eventSubject is a simple PublishSubject. I am using RxAndroid 1.1.0 alongside RxJava 1.1.0.

Does anyone know why this is happening?

Bianka answered 19/1, 2016 at 14:57 Comment(0)
H
0

I'm not sure, why it happens, but try to do this:

sub = FollowHandler.getInstance().getObservable()
            .filter(...)
            .subscribeOn(Schedulers.io())    //  <<<<<<<<<<
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);

Also, I think you don't need share():

eventSubject.asObservable()
        .doOnNext(...)
        .subscribeOn(Schedulers.io())   // <<<<< subscribeOn instead of observeOn, but actually, you don't need it here...
        .share();     // <<<<< remove it

I use Subject as eventbus very often, as I described above. And I never had such issues.

P.S. In onDetachedFromWindow() would be better if you will check is subscription unsubscribed or not. I know that this method called in main thread, and concurrent access to this Subscription is impossible, but I think it's good style:

if(sub != null && !sub.isUnsubscribed())
        sub.unsubscribe();
Hogtie answered 20/1, 2016 at 11:30 Comment(1)
Thanks for the answer. I will try and do the subscribeOn and observeOn as you suggested. However it may take a while for me to test it. About the share, I do need it. Because the tasks before share must be shared between all subscribers (and result is the same for all subscribers). If I remove share, the doOnNext will be called for all of the subscribers (which is pointless). And thanks for the correction for isUnsubscribed() check. I totally missed that.Bianka

© 2022 - 2024 — McMap. All rights reserved.