How to wait for RxPy parallel threads to complete
Asked Answered
C

3

7

Based on this excellent SO answer I can get multiple tasks working in parallel in RxPy, my problem is how do you wait for them to all complete? I know using threading I can do .join() but there doesn't seem to be any such option with Rx Schedulers. .to_blocking() doesn't help either, the MainThread completes before all notifications have fired and the complete handler has been called. Here's an example:

from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread

def printthread(val):
    print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
    printthread("calc {}".format(value))
    time.sleep(random.randint(5, 20) * .1)
    return value

if __name__ == "__main__":
    Observable.range(1, 3) \
        .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
        .observe_on(Scheduler.event_loop) \
        .subscribe(
            on_next=lambda x: printthread("on_next: {}".format(x)),
            on_completed=lambda: printthread("on_completed"),
            on_error=lambda err: printthread("on_error: {}".format(err)))

    printthread("\nAll done")
    # time.sleep(2)

Expected output

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread

Actual output

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread

Actual output if I uncomment the sleep call

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
Colemancolemanite answered 15/5, 2017 at 21:33 Comment(0)
D
3

For ThreadPoolScheduler, you can:

  1. scheduler = ThreadPoolScheduler(pool_size)
  2. parallel calls.
  3. scheduler.executor.shutdown()

then, you can get all results once all are done.

Drongo answered 17/7, 2017 at 8:32 Comment(1)
Perfect! Thank you Kaka, scheduler.executor.shutdown() did it.Colemancolemanite
C
7

Posting complete solution here:

from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
from rx.concurrency import ThreadPoolScheduler

def printthread(val):
    print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
    printthread("calc {}".format(value))
    time.sleep(random.randint(5, 20) * .1)
    return value

if __name__ == "__main__":
    scheduler = ThreadPoolScheduler(4)

    Observable.range(1, 3) \
        .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=scheduler)) \
        .observe_on(Scheduler.event_loop) \
        .subscribe(
            on_next=lambda x: printthread("on_next: {}".format(x)),
            on_completed=lambda: printthread("on_completed"),
            on_error=lambda err: printthread("on_error: {}".format(err)))

    printthread("\nAll done")
    scheduler.executor.shutdown()
    # time.sleep(2)
Colemancolemanite answered 19/7, 2017 at 10:59 Comment(1)
Some operators doesn't work on RXpy version3 and select_many doesn't seem to exist anymore.Guillotine
C
4

Use run() to wait for RxPy parallel threads to complete.

BlockingObservables have been removed from RxPY v3.

from threading import current_thread
import rx, random, multiprocessing, time
from rx import operators as ops

def intense_calculation(value):
   delay = random.randint(5, 20) * 0.2
   time.sleep(delay)
   print("From adding_delay: {0} Value : {1} {2}".format(current_thread(), value, delay))
   return (value[0], value[1]+ " processed")

thread_pool_scheduler = rx.scheduler.NewThreadScheduler()

my_dict={'A':'url1', 'B':'url2', 'C':'url3'}

new_dict = rx.from_iterable(my_dict.items()).pipe(
    ops.flat_map(lambda a: rx.of(a).pipe(
        ops.map(lambda a: intense_calculation(a)),
        ops.subscribe_on(thread_pool_scheduler)
    )),
    ops.to_dict(lambda x: x[0], lambda x: x[1])
).run()

print("From main: {0}".format(current_thread()))
print(str(new_dict))
Cutlery answered 18/8, 2021 at 23:40 Comment(1)
This should be the accepted answer. scheduler.executor.shutdown() is a blocking method to return until all threads are free. But what actually need is to wait for the the observalbe to complete.Papillote
D
3

For ThreadPoolScheduler, you can:

  1. scheduler = ThreadPoolScheduler(pool_size)
  2. parallel calls.
  3. scheduler.executor.shutdown()

then, you can get all results once all are done.

Drongo answered 17/7, 2017 at 8:32 Comment(1)
Perfect! Thank you Kaka, scheduler.executor.shutdown() did it.Colemancolemanite

© 2022 - 2024 — McMap. All rights reserved.