Here is my use-case:
- I am streaming some audio from a microphone using Pyaudio. I receive chunks of audio data every x milliseconds.
- I am opening a websockets connection if some condition is met.
- Once the websockets is opened, I am streaming some messages in and out in an asyncronous manner. There is a consumer / producer way of doing things (I must be able to receive or send messages independently). The messages sent corresponds to the new chunks of data that I receive from the microphone.
The issue I have:
- the streaming from the microphone is synchronous, and is blocking the thread. I do not have a deep understanding of the event loop, but my basic understanding is that this synchronous waiting blocks everything else. The consequence is that I cannot even open the websockets connection, except if I add some asyncio.sleep everywhere, which will be not robust at all.
The way I tried to solve it:
- First idea: run the websockets connector in a second thread, and send the chunks from the main thread. The problem is that asynchronous queues are not thread safe, so in order to work I must have a synchronous queue... and the problem is still there. I also tried this package but that didn't help.
- Second idea: running everything in an asynchronous fashion. But the synchronous incoming data is blocking everything.
- Third idea: implementing this solution in the following manner (code below). The idea is to use Pyaudio callbacks with an asynchronous stream.
import asyncio
import msgpack
import os
import websockets
import asyncio
import pyaudio
from src.utils.constants import CHANNELS, CHUNK, FORMAT, RATE
from dotenv import load_dotenv
from .utils import websocket_data_packet # just a utils function
load_dotenv()
QUEUE_MAX_SIZE = 10
MY_URL = os.environ.get("WEBSOCKETS_URL")
def make_iter():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def put(in_data, frame_count, time_info, status):
loop.call_soon_threadsafe(queue.put_nowait, in_data)
return None, pyaudio.paContinue
async def get():
while True:
yield await queue.get()
return get(), put
class MicrophoneStreamer(object):
chunk: int = CHUNK
channels: int = CHANNELS
format: int = FORMAT
rate: int = RATE
def __init__(self):
self._pyaudio = pyaudio.PyAudio()
self.is_stream_open: bool = True
self.stream_get, stream_put = make_iter()
self.stream = self._pyaudio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk,
stream_callback=stream_put,
)
self.stream.start_stream()
def close(self):
self.is_stream_open = False
self.stream.close()
self._pyaudio.terminate()
async def consumer(websocket):
async for message in websocket:
print(f"Received message: {msgpack.unpackb(message)}")
await asyncio.sleep(0.2)
async def producer(websocket, audio_queue):
while True:
chunck = await audio_queue.get()
print(f"Sending message with audio data of size: {len(chunck)}")
await websocket.send(msgpack.packb(websocket_data_packet(chunck)))
await asyncio.sleep(0.2)
async def handler(audio_queue):
print("This is before creating a new websocket")
async with websockets.connect(MY_URL) as websocket:
print("Just created a new websocket")
producer_task = asyncio.create_task(producer(websocket, audio_queue))
consumer_task = asyncio.create_task(consumer(websocket))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
timeout=60,
)
for task in pending:
task.cancel()
websocket.close()
async def main():
audio_queue = asyncio.Queue(maxsize=5)
i = 0
async for in_data in MicrophoneStreamer().stream_get:
print(f"Processing item {len(in_data)}")
# on trigger, create websockets connection
if i == 2:
asyncio.create_task(handler(audio_queue))
await asyncio.sleep(0) # starts the task
# on each iteration, add element to the queue
if audio_queue.full():
_ = await audio_queue.get()
await audio_queue.put(in_data)
i += 1
asyncio.run(main())
When I run this code, here is what happens:
- before i==2, elements gets added to the queue, everything works fine.
- then, at i==2, the websockets get opened, messages get out and get in without issue
- but then, the loop blocks. No new incoming bytes from the microphone streamer. Nothing happens anymore.
Would somebody know how to solve this issue?
Thank you very much
queue.Queue()
to pass data back and forth. On the asynchronous end, all you have to do is not to try a blocking read of the queue - there areget_nowait
and timeout=0 approaches to do that. – Bidetawait asyncio.sleep(0)
in async code is something that is oddly needed when writting Python asyncio code. (0.02 will also work, but needing the ".02" or other arbitrary value is less common) - the "0" wait in sleep is handled in a separate code path, and just yields the control to the asyncio loop. If there are no awaits in a loop in an async function, the loop will never be reached, and other tasks will stall. – Bidetawait asyncio.sleep(0)
does not work for me, the .02 seems necessary in this case. It is true that it is quite odd and not very good, but I guess I'll live with it then. Thank you for your time and answer. – Original