I need to create an Observable stream which emits the result of a async coroutine at regular intervals.
intervalRead
is a function which returns an Observable, and takes as parameters the interval rate
and an async coroutine function fun
, which needs to be called at the defined interval.
My first aproach was to create an observable with the interval factory method, and then use map to call the coroutine, using from_future to wrap it in a Observable, and then get the value returned by the coroutine.
async def foo():
await asyncio.sleep(1)
return 42
def intervalRead(rate, fun) -> Observable:
loop = asyncio.get_event_loop()
return rx.interval(rate).pipe(
map(lambda i: rx.from_future(loop.create_task(fun()))),
)
async def main():
obs = intervalRead(5, foo)
obs.subscribe(
on_next= lambda item: print(item)
)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
Yet the output I get is not the result of the coroutine, but the Observable returned by from_future, emited at the specified interval
output: <rx.core.observable.observable.Observable object at 0x033B5650>
How could I could get the actual value returned by that Observable? I would expect 42
My second aproach was to create a custom observable:
def intervalRead(rate, fun) -> rx.Observable:
interval = rx.interval(rate)
def subs(observer: Observer, scheduler = None):
loop = asyncio.get_event_loop()
def on_timer(i):
task = loop.create_task(fun())
from_future(task).subscribe(
on_next= lambda i: observer.on_next(i),
on_error= lambda e: observer.on_error(e),
on_completed= lambda: print('coro completed')
)
interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))
return rx.create(subs)
However, on subscription from_future(task)
never emits a value, why does this happen?
Yet if i write intervalRead
like this:
def intervalRead(rate, fun):
loop = asyncio.get_event_loop()
task = loop.create_task(fun())
return from_future(task)
I get the expected result: 42
. Obviously this doesn´t solve my issue, but it confuses me why it doesn´t work in my second approach?
Finally, I experimented with a third approch using the rx.concurrency CurrentThreadScheduler
and schedule an action perdiocally with the schedule_periodic
method. Yet i'm facing the same issue I get with the second approach.
def funWithScheduler(rate, fun):
loop = asyncio.get_event_loop()
scheduler = CurrentThreadScheduler()
subject = rx.subjects.Subject()
def action(param):
obs = rx.from_future(loop.create_task(fun())).subscribe(
on_next= lambda item: subject.on_next(item),
on_error= lambda e: print(f'error in action {e}'),
on_completed= lambda: print('action completed')
)
obs.dispose()
scheduler.schedule_periodic(rate,action)
return subject
Would appreciate any insight into what am I missing or any other suggestions to accomplish what I need. This is my first project with asyncio and RxPY, I have only use RxJS in the context of an angular project so any help is welcome.