Calling coroutines in asyncio.Protocol.data_received
Asked Answered
L

2

16

I am having a problem doing asynchronous stuff in the asyncio.Protocol.data_received callback of the new Python asyncio module.

Consider the following server:

class MathServer(asyncio.Protocol):

   @asyncio.coroutine
   def slow_sqrt(self, x):
      yield from asyncio.sleep(1)
      return math.sqrt(x)

   def fast_sqrt(self, x):
      return math.sqrt(x)

   def connection_made(self, transport):
      self.transport = transport

   #@asyncio.coroutine
   def data_received(self, data):
      print('data received: {}'.format(data.decode()))
      x = json.loads(data.decode())
      #res = self.fast_sqrt(x)
      res = yield from self.slow_sqrt(x)
      self.transport.write(json.dumps(res).encode('utf8'))
      self.transport.close()

used with the following client:

class MathClient(asyncio.Protocol):

   def connection_made(self, transport):
      transport.write(json.dumps(2.).encode('utf8'))

   def data_received(self, data):
      print('data received: {}'.format(data.decode()))

   def connection_lost(self, exc):
      asyncio.get_event_loop().stop()

With self.fast_sqrt being called, everything works as expected.

With self.slow_sqrt, it does not work.

It also does not work with self.fast_sqrt and the @asyncio.coroutine decorator on data_received.

I feel I am missing something fundamental here.

Complete code is here:

Tested with:

  • Python 3.4.0b1 (Windows)
  • Python 3.3.3 + asyncio-0.2.1 (FreeBSD)

The issue is the same on both: with slow_sqrt, the client/server will just hang doing nothing.

Leonor answered 23/12, 2013 at 15:37 Comment(0)
L
11

It seems, this needs to be decoupled via a Future - though I am still not sure if this is the right way.

class MathServer(asyncio.Protocol):

   @asyncio.coroutine
   def slow_sqrt(self, x):
      yield from asyncio.sleep(2)
      return math.sqrt(x)

   def fast_sqrt(self, x):
      return math.sqrt(x)

   def consume(self):
      while True:
         self.waiter = asyncio.Future()
         yield from self.waiter
         while len(self.receive_queue):
            data = self.receive_queue.popleft()
            if self.transport:
               try:
                  res = self.process(data)
                  if isinstance(res, asyncio.Future) or \
                     inspect.isgenerator(res):
                     res = yield from res
               except Exception as e:
                  print(e)

   def connection_made(self, transport):
      self.transport = transport
      self.receive_queue = deque()
      asyncio.Task(self.consume())

   def data_received(self, data):
      self.receive_queue.append(data)
      if not self.waiter.done():
         self.waiter.set_result(None)
      print("data_received {} {}".format(len(data), len(self.receive_queue)))

   def process(self, data):
      x = json.loads(data.decode())
      #res = self.fast_sqrt(x)
      res = yield from self.slow_sqrt(x)
      self.transport.write(json.dumps(res).encode('utf8'))
      #self.transport.close()

   def connection_lost(self, exc):
      self.transport = None

Here is an answer by Guido van Rossum:

The solution is simple: write that logic as a separate method marked with @coroutine, and fire it off in data_received() using async() (== Task(), in this case). The reason why this isn't built into the protocol is that if it was, it would require alternate event loop implementations to deal with coroutines.

def data_received(self, data):
    asyncio.ensure_future(self.process_data(data))

@asyncio.coroutine
def process_data(self, data):
    # ...stuff using yield from...

Full code is here: - Client - Server

Leonor answered 23/12, 2013 at 17:45 Comment(0)
E
3

I had a similar problem where I wanted to run a coroutine when my MyProtocol.connection_made was called. My solution was quite similar, except that my Protocol had access to the loop. For those using a more recent version of python, the following worked for me (I'm using python 3.6.8):

class MyProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop

    async def do_async_thing(self):
        await asyncio.sleep(1)

    def connection_made(self, transport):
        self.transport = transport
        self.loop.create_task(self.do_async_thing())

    # Other member functions left out for brevity.

It makes sense that this works - the loop needs to schedule a task which needs to have independent context, i.e. can be run independent of any other call stack. This is why you give the loop a coroutine that it can run, do_async_thing() along with a class instance in this case, which it will call when it can. When it is called is has nothing to do with the connection_made member function anymore.

It's worth noting that this can also be achieved by using asyncio.ensure_future(coro, loop=None) instead of self.loop.create_task(coro), but the latter would presumably use the default loop. In fact, it does - I just checked the source code.

Expressway answered 1/5, 2019 at 8:32 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.