How can I notify RxPY observers on separate threads using asyncio?
Asked Answered
G

1

6

(Note: The background for this problem is pretty verbose, but there's an SSCCE at the bottom that can be skipped to)

Background

I'm trying to develop a Python-based CLI to interact with a web service. In my codebase I have a CommunicationService class that handles all direct communication with the web service. It exposes a received_response property that returns an Observable (from RxPY) that other objects can subscribe to in order to be notified when responses are received back from the web service.

I've based my CLI logic on the click library, where one of my subcommands is implemented as below:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)
    if response_handler is None:
        return None

    while True:
        response = await self.on_response
        success, value = response_handler(response)
        print(success, value)
        if success:
            return value

What's happening here (in the case that response_handler is not None) is that the subcommand is behaving as a coroutine that awaits responses from the web service (self.on_response == CommunicationService.received_response) and returns some processed value from the first response it can handle.

I'm trying to test the behaviour of my CLI by creating test cases in which CommunicationService is completely mocked; a fake Subject is created (which can act as an Observable) and CommunicationService.received_response is mocked to return it. As part of the test, the subject's on_next method is invoked to pass mock web service responses back to the production code:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    context.mock_received_response_subject.on_next(context.text)

I use a click 'result callback' function that gets invoked at the end of the CLI invocation and blocks until the coroutine (the subcommand) is done:

@cli.resultcallback()
def _handle_command_task(task: Coroutine, **_) -> None:
    if task:
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task)
        loop.close()
        print('RESULT:', result) 

Problem

At the start of the test, I run CliRunner.invoke to fire off the whole shebang. The problem is that this is a blocking call and will block the thread until the CLI has finished and returned a result, which isn't helpful if I need my test thread to carry on so it can produce mock web service responses concurrently with it.

What I guess I need to do is run CliRunner.invoke on a new thread using ThreadPoolExecutor. This allows the test logic to continue on the original thread and execute the @when step posted above. However, notifications published with mock_received_response_subject.on_next do not seem to trigger execution to continue within the subcommand.

I believe the solution would involve making use of RxPY's AsyncIOScheduler, but I'm finding the documentation on this a little sparse and unhelpful.

SSCCE

The snippet below captures what I hope is the essence of the problem. If it can be modified to work, I should be able to apply the same solution to my actual code to get it to behave as I want.

import asyncio
import logging
import sys
import time

import click
from click.testing import CliRunner
from rx.subjects import Subject

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()

thread_loop = asyncio.new_event_loop()


@click.group()
def cli():
    asyncio.set_event_loop(thread_loop)


@cli.resultcallback()
def result_handler(task, **_):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task) # Should block until subject publishes value
    loop.close()

    print(result)


@cli.command()
async def get_web_response():
    return await web_response_observable


def test():
    runner = CliRunner()
    future = thread_loop.run_in_executor(None, runner.invoke, cli, ['get_web_response'])
    time.sleep(1)
    web_response_subject.on_next('foo') # Simulate reception of web response.
    time.sleep(1)
    result = future.result()
    print(result.output)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(threadName)10s %(name)18s: %(message)s',
    stream=sys.stderr,
)

test()

Current Behaviour

The program hangs when run, blocking at result = loop.run_until_complete(task).

Acceptance Criteria

The program terminates and prints foo on stdout.

Update 1

Based on Vincent's help I've made some changes to my code.

Relay.enabled (the subcommand that awaits responses from the web service in order to process them) is now implemented like this:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)

    if response_handler is None:
        return None

    return await self.on_response \
        .select(response_handler) \
        .where(lambda result, i: result[0]) \
        .select(lambda result, index: result[1]) \
        .first()

I wasn't quite sure how await would behave with RxPY observables - would they return execution to the caller on each element generated, or only when the observable has completed (or errored?). I now know it's the latter, which honestly feels like the more natural choice and has allowed me to make the implementation of this function feel a lot more elegant and reactive.

I've also modified the test step that generates mock web service responses:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = asyncio.get_event_loop()
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

Unfortunately, this will not work as it stands, since the CLI is being invoked in its own thread...

@when('the CLI is run with "{arguments}"')
def step_impl(context, arguments):
    loop = asyncio.get_event_loop()
    if 'async.cli' in context.tags:
        context.async_result = loop.run_in_executor(None, context.cli_runner.invoke, testcube.cli, arguments.split())
    else:
        ...

And the CLI creates its own thread-private event loop when invoked...

def cli(context, hostname, port):
    _initialize_logging(context.meta['click_log.core.logger']['level'])

    # Create a new event loop for processing commands asynchronously on.
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    ...

What I think I need is a way to allow my test steps to invoke the CLI on a new thread and then fetch the event loop it's using:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = _get_cli_event_loop() # Needs to be implemented.
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

Update 2

There doesn't seem to be an easy way to get the event loop that a particular thread creates and uses for itself, so instead I took Victor's advice and mocked asyncio.new_event_loop to return an event loop that my test code creates and stores:

def _apply_mock_event_loop_patch(context):
    # Close any already-existing exit stacks.
    if hasattr(context, 'mock_event_loop_exit_stack'):
        context.mock_event_loop_exit_stack.close()

    context.test_loop = asyncio.new_event_loop()
    print(context.test_loop)
    context.mock_event_loop_exit_stack = ExitStack()
    context.mock_event_loop_exit_stack.enter_context(
        patch.object(asyncio, 'new_event_loop', spec=True, return_value=context.test_loop))

I change my 'mock web response received' test step to do the following:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = context.test_loop
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

The great news is that I'm actually getting the Relay.enabled coroutine to trigger when this step gets executed!

The only problem now is the final test step in which I await the future I got from executing the CLI in its own thread and validate that the CLI is sending this on stdout:

@then('the CLI should print "{output}"')
def step_impl(context, output):
    if 'async.cli' in context.tags:
        loop = asyncio.get_event_loop() # main loop, not test loop
        result = loop.run_until_complete(context.async_result)
    else:
        result = context.result
    assert_that(result.output, equal_to(output))

I've tried playing around with this but I can't seem to get context.async_result (which stores the future from loop.run_in_executor) to transition nicely to done and return the result. With the current implementation, I get an error for the first test (1.1) and indefinite hanging for the second (1.2):

 @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.1                           # testcube/tests/features/relay.feature:45
    When the user queries the enable state of relay 0                             # testcube/tests/features/steps/relay.py:17 0.003s
    Then the CLI should query the web service about the enable state of relay 0   # testcube/tests/features/steps/relay.py:48 0.000s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[0].enabled','data':[True]}'
      """
    Then the CLI should print "True"                                              # testcube/tests/features/steps/core.py:94 0.003s
      Traceback (most recent call last):
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1456, in run
          match.run(runner.context)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1903, in run
          self.func(context, *args, **kwargs)
        File "testcube/tests/features/steps/core.py", line 99, in step_impl
          result = loop.run_until_complete(context.async_result)
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
          return future.result()
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
          raise self._exception
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/concurrent/futures/thread.py", line 55, in run
          result = self.fn(*self.args, **self.kwargs)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/click/testing.py", line 299, in invoke
          output = out.getvalue()
      ValueError: I/O operation on closed file.

      Captured stdout:
      RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[0].enabled','data':[True]}'
      <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py:431]>

  @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.2                           # testcube/tests/features/relay.feature:46
    When the user queries the enable state of relay 1                             # testcube/tests/features/steps/relay.py:17 0.005s
    Then the CLI should query the web service about the enable state of relay 1   # testcube/tests/features/steps/relay.py:48 0.001s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[1].enabled','data':[False]}'
      """
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[1].enabled','data':[False]}'
    Then the CLI should print "False"                                             # testcube/tests/features/steps/core.py:94

Chapter 3: Finale

Screw all this asynchronous multi-threaded stuff, I'm too dumb for it.

First off, instead of describing the scenario like this...

When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
When the communications service receives a response from TestCube Web Service:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
Then the CLI should print "<relay_enabled>"

We describe it like this:

Given the communications service will respond to requests:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
And the CLI should print "<relay_enabled>"

Implement the new given step:

@given('the communications service will respond to requests')
def step_impl(context):
    response = context.text

    def publish_mock_response(_):
        loop = context.test_loop
        loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, response)

    # Configure the mock comms service to publish a mock response when a request is made.
    instance = context.mock_comms.return_value
    instance.send_request.on_next.side_effect = publish_mock_response

BOOM

2 features passed, 0 failed, 0 skipped
22 scenarios passed, 0 failed, 0 skipped
58 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.111s
Greed answered 4/9, 2016 at 15:48 Comment(1)
This question made my day :)Gypsy
B
5

I can see two problems with your code:

  • asyncio is not thread-safe, unless you use call_soon_threadsafe or run_coroutine_threadsafe. RxPy doesn't use any of those in Observable.to_future, so you have to access RxPy objects in the same thread that runs the asyncio event loop.
  • RxPy sets the result of the future when on_completed is called, so that awaiting for an observable returns the last object emitted. This means you have to call both on_next and on_completed to get await to return.

Here is a working example:

import click
import asyncio
from rx.subjects import Subject
from click.testing import CliRunner

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
main_loop = asyncio.get_event_loop()

@click.group()
def cli():
    pass

@cli.resultcallback()
def result_handler(task, **_):
    future = asyncio.run_coroutine_threadsafe(task, main_loop)
    print(future.result())

@cli.command()
async def get_web_response():
    return await web_response_observable

def test():
    runner = CliRunner()
    future = main_loop.run_in_executor(
        None, runner.invoke, cli, ['get_web_response'])
    main_loop.call_later(1, web_response_subject.on_next, 'foo')
    main_loop.call_later(2, web_response_subject.on_completed)
    result = main_loop.run_until_complete(future)
    print(result.output, end='')

if __name__ == '__main__':
    test()
Boulevard answered 5/9, 2016 at 11:0 Comment(12)
Thanks, I really appreciate the help. This works great and I think I've managed to make a bit of headway (I've updated my post). I don't suppose there's a way to inspect the event loop set within a thread? Some way to query asyncio's global context maybe? My production code shouldn't know or care about my test code and so it won't be able to use an event loop that the test code creates (unless maybe I do something crazy with unittest.mock).Greed
@Greed I'm not sure I understand all you constraints but my advice would be to keep it simple, and use a mock whenever you need to inject anything into your production code (e.g. patch new_event_loop to return a test loop). Also, try not to mix event loops: use a ThreadPoolExecutor if you really need to run a task in a thread.Boulevard
Keep it simple? I'm keeping it really simple. I was plumbing right into the heart of asyncio to create an AbstractEventLoopPolicy decorator subclass that intercepted calls to new_event_loop. :) Your idea of just mocking it is a lot simpler, though, and I've had better success with that. The only thing left - I think - is just to have the Future created by loop.run_in_executor to neatly finish and return its results in the final test step. I've updated my post.Greed
@Greed I would use a concurrent.future instead: context.async_result = context.executor.submit(context.cli_runner.invoke, testcube.cli, arguments) then later result = context.async_result.result()Boulevard
Hm, there is no executor stored within context, so I assumed I should just instantiate one in the prior step. Also the first argument to submit should just be the function. I tried this: context.executor = ThreadPoolExecutor(); context.async_result = context.executor.submit(context.cli_runner.invoke, testcube.cli, arguments.split()). I get exactly the same result as before (ValueError: I/O operation on closed file.)Greed
@Greed Yes that's what I meant. It might not have solved your problem, but at least you have only one event loop to care about :) The error you get has to do with the BytesIO object mocking stderr and stdout. It's probably closed before invoke terminates. Are you sure click and click.testing can run in a child thread?Boulevard
"Are you sure click and click.testing can run in a child thread?" I'll be very sad if we've come all this way and the answer to that is 'no'. I've been stepping through all the stuff in the click module but I can't find anything that might close the BytesIO object. I might have to raise it as an issue on their Github page.Greed
@Greed Why do you have to run cli_runner.invoke in a thread by the way?Boulevard
context.result = context.cli_runner.invoke(...) will block the test (main) thread until the invocation completes. The invocation won't complete unless the test thread is able to run and produce a mock web service response in a later test step.Greed
But since you have access to the loop, you should be able to schedule the callback to produce the response beforehand (using loop.call_later for instance).Boulevard
I did it! The way you phrased "you should be able to schedule the callback to produce the response beforehand" just gave me a whole new angle on the problem. I didn't do what you suggested, I did even better. The test solution is now considerably simpler. I've updated my post with the solution if you're interested. You've more than earned the upvote and accepted answer - thank you for all your help. What an epic adventure.Greed
@Greed Good thing you made it single-threaded. A motto I heard recently: adding a thread to a program introduces at least one bug ;)Boulevard

© 2022 - 2024 — McMap. All rights reserved.