Recommented way to implement observable collections in Python?
Asked Answered
S

2

6

I would like to have some observable collections/sequences in Python that allow me to listen on change events, like for adding new items or updating items:

list = ObservableList(['a','b','c'])
list.addChangeListener(lambda new_value: print(new_value))
list.append('a') # => should trigger the attached change listener

data_frame = ObservableDataFrame({'x': [1,2,3], 'y':[10,20,30]})
data_frame.addChangeListener(update_dependent_table_cells) # => allows to only update dependent cells instead of a whole table

A. I found following project that provides implementations of observable collections and looks quite promising:

https://github.com/dimsf/Python-observable-collections

It does what I want:

from observablelist import ObservableList

def listHandler(event):
    if event.action == 'itemsUpdated':
        print event.action + ', old items: ' + str(event.oldItems) + ' new items: ' + str(event.newItems) + ' at index: ' + str(event.index)
    elif event.action == 'itemsAdded' or event.action == 'itemsRemoved':
        print(event.action + ', items: ' + str(event.items) + ' at index: ' + str(event.index))

myList = ObservableList()
myList.attach(listHandler)

#Do some mutation actions, just like normal lists.
myList.append(10)
myList.insert(3, 0)

However, the last change is 6 years ago and I am wondering if there are some more up to date or build in Python alternatives?

B. I also found RxPy: https://github.com/ReactiveX/RxPY

import rx
list = ["Alpha", "Beta", "Gamma"]
source = rx.from_(list)
source.subscribe(
   lambda value: print(value),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
) 

Is it somehow possible, to keep the subscription open, so that I am able to append new values to the list after the subscription? Dummy code:

source.subscribe(..., keep_open = True)
source.append("Delta")  # <= does not work; there is no append method
source.close()

With other words: can/should I use RxPy sources as observable collections?

C. There seem to exist many different possibilities in Python to handle events and implement observer patterns:

Event system in Python

Python Observer Pattern: Examples, Tips?

alternate ways to implement observer pattern in python

Using decorators to implement Observer Pattern in Python3

=> What is the recommented/pythonic way to implement observable collections in Python? Should I use (outdated?) A. or an adapted form of B. (which seems to serve a different purpose?) or even another strategy from C.?

=> Are there plans to somehow standardize that possibilities and include default observable collections directly in Python?

Related question, specific to DataFrames:

How to make tables/spreadsheets (e.g. pandas DataFrame) observable, use triggers or change events?

Sisak answered 28/1, 2021 at 15:19 Comment(0)
S
3

I've never used RxPy but it seems to be an implementation of the rx pattern very close to the js/ts one.

First you want to have an observable that you both use to push data into it and observer. That's a subject, potentially a behaviour subject or replay subject. Create the subject, then push new values in it using the on_next() operator.

For you second question, it seems that you want to "combine" multiple observables into one observable. There's multiple way to do this, but most likely, what you're looking for is CombineLatest or Concat. Loot at the operators.

If I take your second example the code will look like that:

from rx.subject.subject import Subject

list = ["Alpha", "Beta", "Gamma"]
# assuming that you want each item to be emitted one after the other
subject = Subject()
subject.subscribe(
    lambda value: print(value),
    on_error = lambda e: print("Error : {0}".format(e)),
    on_completed = lambda: print("Job Done!")
)
subject.on_next('Alpha')
subject.on_next('Beta')
subject.on_next('Gamma')
subject.on_next('Delta')

If you use a BehaviourSubject, you will be able to provide an initial value, and when a new observer subscribe, it will receive the last emitted value. If you use a ReplaySubject you can provide values, then subscribe, the observer will receive all the values that the subject emitted up to this point.

Samoyed answered 4/2, 2021 at 16:33 Comment(0)
S
0

Just found an implementation that is based on RxPy. The last change is from 2018 and it does not seem to be ready for RxPY 3.x, yet.

https://github.com/shyam-s00/ObservableCollections

https://github.com/shyam-s00/ObservableCollections/issues/1

from reactive.ObservableList import ObservableList

ol = ObservableList([1, 2, 3, 4])
ol.when_collection_changes() \
    .map(lambda x: x.Items) \
    .subscribe(print, print)

ol.append(5)

It provides

  • ObservableList
  • ObservableDict
  • ObservableSet

Also see https://github.com/ReactiveX/RxPY/issues/553

Sisak answered 29/1, 2021 at 9:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.