I'm currently having problems closing asyncio coroutines during the shutdown CTRL-C of an application. The following code is a stripped down version of what I have right now:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import asyncio
import time
import functools
import signal
class DummyProtocol(asyncio.Protocol):
def __init__(self, *args, **kwargs):
self._shutdown = asyncio.Event()
self._response = asyncio.Queue(maxsize=1)
super().__init__(*args, **kwargs)
def connection_made(self, transport):
self.transport = transport
def close(self):
print("Closing protocol")
self._shutdown.set()
def data_received(self, data):
#data = b'OK MPD '
# Start listening for commands after a successful handshake
if data.startswith(b'OK MPD '):
print("Ready for sending commands")
self._proxy_task = asyncio.ensure_future(self._send_commands())
return
# saving response for later consumption in self._send_commands
self._response.put_nowait(data)
async def _send_commands(self):
while not self._shutdown.is_set():
print("Waiting for commands coming in ...")
command = None
# listen for commands coming in from the global command queue. Only blocking 1sec.
try:
command = await asyncio.wait_for(cmd_queue.get(), timeout=1)
except asyncio.TimeoutError:
continue
# sending the command over the pipe
self.transport.write(command)
# waiting for the response. Blocking until response is complete.
res = await self._response.get()
# put it into the global response queue
res_queue.put_nowait(res)
async def connect(loop):
c = lambda: DummyProtocol()
t = asyncio.Task(loop.create_connection(c, '192.168.1.143', '6600'))
try:
# Wait for 3 seconds, then raise TimeoutError
trans, proto = await asyncio.wait_for(t, timeout=3)
print("Connected to <192.168.1.143:6600>.")
return proto
except (asyncio.TimeoutError, OSError) as e:
print("Could not connect to <192.168.1.143:6600>. Trying again ...")
if isinstance(e, OSError):
log.exception(e)
def shutdown(proto, loop):
# https://mcmap.net/q/279650/-what-39-s-the-correct-way-to-clean-up-after-an-interrupted-event-loop
print("Shutdown of DummyProtocol initialized ...")
proto.close()
# give the coros time to finish
time.sleep(2)
# cancel all other tasks
# for task in asyncio.Task.all_tasks():
# task.cancel()
# stopping the event loop
if loop:
print("Stopping event loop ...")
loop.stop()
print("Shutdown complete ...")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
cmd_queue = asyncio.Queue()
res_queue = asyncio.Queue()
dummy_proto = loop.run_until_complete(connect(loop))
for signame in ('SIGINT','SIGTERM'):
loop.add_signal_handler(getattr(signal, signame), functools.partial(shutdown, dummy_proto, loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
what gives me the following output if CTRL-C is pressed:
Connected to <192.168.1.143:6600>.
Ready for sending commands
Waiting for commands coming in ...
Waiting for commands coming in ...
Waiting for commands coming in ...
Waiting for commands coming in ...
^CShutdown of DummyProtocol initialized ...
Closing protocol
Stopping event loop ...
Shutdown complete ...
Task was destroyed but it is pending!
task: <Task pending coro=<DummyProtocol._send_commands() running at ./dummy.py:45> wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:344]>
Exception ignored in: <generator object Queue.get at 0x10594b468>
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py", line 170, in get
File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 227, in cancel
File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
I'm not very experienced with asyncio, so I'm pretty sure that I'm missing something important here. What really gives me headaches is the part of the output after Shutdown complete ...
. Beginning with Task was destroyed but it is pending!
, I have to admit that I have no idea what's going on. I had a look on other questions but couldn't get it to work. So, why is this code outputting stuff like Task was destroyed but it is pending! aso.
and how can cleany close the coroutines?
Thank's for your help!