I'm trying to get my head around scheduling in reactive extensions for python. I would like to use subscribe_on
to process multiple observables in parallel. This works fine if the observable is created with just
, but not if for example range
or from_
are used.
just
defaults to Scheduler.immediate
, while other generators default to Scheduler.current_thread
. Which causes the difference, but feels inconsistent to me. Probably because I don't grasp the full problem.
Consider the following example:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
It works with observe_on
or if the Scheduler is passed directly to the generator, but I would like to decouple observable creation from processing and achieve something like this:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
def factory_single():
return rx.Observable.just(1).do_action(work)
def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)
def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)
# Creates a new thread (I like)
process(factory_single)
# Runs on MainThread (I don't like)
process(factory_multiple)
Am I misunderstanding subscribe_on
? Is my approach wrong?
Scheduler.new_thread
creates a new thread for every event. Thanks! – Idiocrasy