Python web service subscribed to reactive source produces strange behavior in object
Asked Answered
U

1

10

I have implemented a web service using Falcon. This service stores a state machine (pytransitions) that is passed to service's resources in the constructor. The service is runs with gunicorn.

The web service launches a process on start using RxPy. The event returned in the on_next(event) is used to trigger a transition in the state machine.

THE BUG

I expect that the state machine has a consistent state both in the service and in the resources but it seems that in the resources the state never changes.

We have a test that tries to reproduce this behavior, but surprisingly the test works

class TochoLevel(object):

    def __init__(self, tochine):
        self.tochine = tochine

    def on_get(self, req, res):
        res.status = falcon.HTTP_200
        res.body = self.tochine.state


def get_machine():
    states = ["low", "medium", "high"]

    transitions = [
        {'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
        {'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
        {'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
    ]

    locked_factory = MachineFactory.get_predefined(locked=True)

    return locked_factory(
        states=states,
        transitions=transitions,
        initial='low',
        auto_transitions=False,
        queued=False
    )

def _level_observable(observer):
    for i in range(1, 21):
        sleep(0.1)
        next_val = 'to_low'

        if 8 <= i <= 15:
            next_val = 'to_medium'
        elif i > 15:
            next_val = 'to_high'
        observer.on_next(next_val)

    observer.on_completed()

def get_level_observable():
    return Observable.create(_level_observable)

class NotBlockingService(falcon.API):
    def __init__(self):
        super(NotBlockingService, self).__init__()

        self.tochine = get_machine()
        self.add_route('/tochez', TochoLevel(self.tochine))

    def _run_machine(self, val):
        self.tochine.trigger(val)
        print('machine exec: {}, state: {}'.format(val, self.tochine.state))
        return self.tochine.state

    def start(self):
        source = get_level_observable()
        (source.subscribe_on(ThreadPoolScheduler(2))
            .subscribe(self._run_machine))


def test_can_query_falcon_service_while_being_susbcribed_as_observer():

    svc = NotBlockingService()
    client = testing.TestClient(svc)

    assert client.simulate_get('/tochez').text == 'low'

    start = time()
    svc.start()
    sleep(1.2)

    assert client.simulate_get('/tochez').text == 'medium'
    end = time()

    sleep(1.2)

    assert client.simulate_get('/tochez').text == 'high'
    assert (end - start) < 2

THE QUESTION

Why the state machine does not change the state in the resource TochoLevel when I launch the service with gunicorn and propagate the states in the on_next method of rxpy?

Unscrupulous answered 14/5, 2018 at 6:59 Comment(1)
Can you provide a minimal git repo, i may have an idea whats wrong but need to try few thingsMinelayer
P
6

Surely when you execute your service in develop mode, you are using only one fork (one execution process). When you use software like Gunicorn your are using preforking strategy for reliable service in production environment.

Preforking strategy generates many subprocess to resolve the request and the logic are independent, working each fork in standalone mode between different requests.

Gunicorn, thanks to standardized App scheme for WSGI in Python (Python2_PEP-333 & Python3_PEP-3333), receives an APP object. Gunicorn launches as many instances (preforks) as indicated in its configuration. Gunicorn calls such forks workers and by default it uses 1 worker. Each worker will work with its status and maybe Gunicorn also creates new App object instance for each request...

This is the reason behind your state machine without persistence.

💡 Tip: Try first to launch Gunicorn with 1 worker and check the state persistence of the state machine. If you achieve the persistence of the state machine the second problem to solve will be the state machine synchronization along all the workers.

Promotion answered 14/5, 2018 at 8:11 Comment(3)
I am using workers = 1 in configuration. Also I tried the option preload_app = True, which is supposed to preload the app before forks. The thing is that if I launch the service without rx-py, the state machine works properly.Unscrupulous
Maybe it has no sense to use Gunicorn wich is focused in preforking solution to use only one worker... Be careful with state machine consistency because Gunicorn only ensures that your App is running in the number of workers that you desire and they respond the requests. During the execution Gunicorn can kill the worker and relaunch it again if it is necessary. Maybe you need something focused on pool requests and not in preforking strategy. About rx-py, I'm not familiarized with the library.Promotion
We finally used a cache on memory to store the state of the state machine, which allows to have different workers.Unscrupulous

© 2022 - 2024 — McMap. All rights reserved.