How can I make an rx.py Observable from a stream such as stdin?
Asked Answered
G

2

5

I'm trying to get my head around the rxpy library for functional reactive programming (FRP) and I've already hit a roadblock. I'm writing a small program that expects data to be streamed in via standard input (sys.stdin).

My question is therefore simple: how can I create an rx.Observable instance that will asynchronously read from stdin? Are there built-in mechanisms to create Observable instances from streams?

Geller answered 28/7, 2014 at 11:54 Comment(0)
T
4

I've never used RxPy, but I have a bit of familiarity with RxJS.

RxPy has a number of built-in methods that you could likely use for this purpose, but I'm inclined to create an Observable factory. Taking ObservableCreation.from_array as our guide, let's try that now. (Note: I haven't run this code, but it should get you most of the way there)

from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler

class ObservableFile(Observable, metaclass=ObservableMeta):

    @classmethod
    def from_file(cls, readableFile, scheduler=None):
        scheduler = scheduler or current_thread_scheduler

        def subscribe(observer):
            def action(action1, state=None):
                try:
                    observer.on_next(readableFile.next())
                    action1(action)

                except StopIteration: # EOF
                    observer.on_completed()

            return scheduler.schedule_recursive(action)
        return AnonymousObservable(subscribe)

Then just use it like this:

res = rx.Observable.from_file(sys.stdin)

This will create an observable over each line of stdin until EOF. It's blocking, but there are ways around that. It can also be tuned with a different scheduler.

Taboret answered 31/7, 2014 at 6:37 Comment(0)
P
3

I've just been playing with this today and

 d = rx.Observable.from_(sys.stdin).subscribe(print)

appears to work (echos lines to stdout). from_ is an alias for from_iterable. d is a Disposable to unsubscribe.

Pacian answered 22/3, 2016 at 21:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.