Calling a coroutine from asyncio.Protocol.data_received
Asked Answered
I

2

12

This is similar to Calling coroutines in asyncio.Protocol.data_received but I think it warrants a new question.

I have a simple server set up like this

loop.create_unix_server(lambda: protocol, path=serverSocket)

It works fine, if I do this

 def data_received(self, data):
    data = b'data reply'
    self.send(data)

my client gets the reply. But I can't get it to work with any sort of asyncio call. I tried all of the following and none of them worked.

@asyncio.coroutine
def go(self):
    yield from asyncio.sleep(1, result = b'data reply')

def data_received(self, data):
    print('Data Received', flush=True)

    task = asyncio.get_event_loop().create_task(self.go())
    data = yield from asyncio.wait_for(task,10)
    self.send(data)

that one hung and printed nothing (if I decorated data_received with @asyncio.coroutine I get that that is not yielded from) OK, I get that using yield in data_received isn't right.

If I try a new event loop, as below, that hangs in run_until_complete

    loop = asyncio.new_event_loop()
    task = loop.create_task(self.go())
    loop.run_until_complete(task)
    data = task.result()
    self.send(data)

If I use a Future, that also hangs in run_until_complete

@asyncio.coroutine
def go(self, future):
    yield from asyncio.sleep(1)
    future.set_result(b'data reply')

def data_received(self, data):
    print('Data Received', flush=True)

    loop = asyncio.new_event_loop()
    future = asyncio.Future(loop=loop)
    asyncio.async(self.go(future))
    loop.run_until_complete(future)
    data = future.result()
    self.send(data)

The following gets close, but it returns immediately and the result is of type asyncio.coroutines.CoroWrapper, implying that the wait_for line returned immediately with the unfinished task?

@asyncio.coroutine
def go(self):
    return(yield from asyncio.sleep(3, result = b'data reply'))

@asyncio.coroutine
def go2(self):
    task = asyncio.get_event_loop().create_task(self.go())
    res = yield from asyncio.wait_for(task, 10)
    return result

def data_received(self, data):
    print('Data Received', flush=True)

    data = self.go2()
    self.send(data)

I'm a bit stuck really, and would appreciate some pointers about what to look at.

Immiscible answered 18/3, 2015 at 15:39 Comment(0)
U
14

You need to add your coroutine to the event loop, and then use Future.add_done_callback to handle the result when the coroutine completes:

@asyncio.coroutine
def go(self):
    return(yield from asyncio.sleep(3, result = b'data reply'))

def data_received(self, data):
    print('Data Received', flush=True)

    task = asyncio.async(self.go()) # or asyncio.get_event_loop().create_task()
    task.add_done_callback(self.handle_go_result)

def handle_go_result(self, task):
    data = task.result()
    self.send(data)

Calling a coroutine directly in data_received just simply isn't allowed, since the caller isn't going to try to yield from it, and creating/running a new event loop inside of data_received will always end up blocking the main event loop until the inner event loop finishes its work.

You just want to schedule some work with your main event loop (asyncio.async/loop.create_task()), and schedule a callback to run when the work is done (add_done_callback).

Urbanus answered 18/3, 2015 at 16:0 Comment(5)
Hi, thanks for that reply - that does work in my code. I'm just integrating it in to my non-trivial code (that has a calling stack etc) I had thought about using callbacks, but I thought that yielding/tasks/futures were the more recent approach, and had the benefit of not worrying about your calling stack or how to get data into your callback. Do you know why the inner loop wouldn't finish its work? Or why the future approach wouldn't work with the main loop?Immiscible
Oh, I guess the inner loop is waiting on the blocked outer one to do some IO etc.Immiscible
@Immiscible asyncio definitely prefers a coroutine-style approach for higher-level usage, but low-level features like Protocols still use the callback style. If you want to use coroutines instead, the docs recommend using stream objects instead of protocols.Urbanus
What's the proper Python 3.8 equivalent of this?Flowerer
@Flowerer Nearly the same. Just upgrade the @asyncio.coroutine to an async def and use loop.create_task() instead of asyncio.async().Constabulary
C
0

@dano's answer for Python 3.7+:

async def go(self):
    message = await asyncio.sleep(3, result=b'data reply')
    return message

def data_received(self, data):
    print('Data Received', flush=True)
    loop = asyncio.get_running_loop()
    task = loop.create_task(self.go())
    task.add_done_callback(self.handle_go_result)

def handle_go_result(self, task):
    data = task.result()
    self.send(data)
Constabulary answered 28/8 at 0:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.