Python synchronous pyaudio data in asynchronous code
Asked Answered
O

0

3

Here is my use-case:

  • I am streaming some audio from a microphone using Pyaudio. I receive chunks of audio data every x milliseconds.
  • I am opening a websockets connection if some condition is met.
  • Once the websockets is opened, I am streaming some messages in and out in an asyncronous manner. There is a consumer / producer way of doing things (I must be able to receive or send messages independently). The messages sent corresponds to the new chunks of data that I receive from the microphone.

The issue I have:

  • the streaming from the microphone is synchronous, and is blocking the thread. I do not have a deep understanding of the event loop, but my basic understanding is that this synchronous waiting blocks everything else. The consequence is that I cannot even open the websockets connection, except if I add some asyncio.sleep everywhere, which will be not robust at all.

The way I tried to solve it:

  • First idea: run the websockets connector in a second thread, and send the chunks from the main thread. The problem is that asynchronous queues are not thread safe, so in order to work I must have a synchronous queue... and the problem is still there. I also tried this package but that didn't help.
  • Second idea: running everything in an asynchronous fashion. But the synchronous incoming data is blocking everything.
  • Third idea: implementing this solution in the following manner (code below). The idea is to use Pyaudio callbacks with an asynchronous stream.
import asyncio
import msgpack
import os
import websockets
import asyncio
import pyaudio
from src.utils.constants import CHANNELS, CHUNK, FORMAT, RATE
from dotenv import load_dotenv

from .utils import websocket_data_packet  # just a utils function

load_dotenv()

QUEUE_MAX_SIZE = 10
MY_URL = os.environ.get("WEBSOCKETS_URL")

def make_iter():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    def put(in_data, frame_count, time_info, status):
        loop.call_soon_threadsafe(queue.put_nowait, in_data)
        return None, pyaudio.paContinue
    async def get():
        while True:
            yield await queue.get()
    return get(), put


class MicrophoneStreamer(object):
    chunk: int = CHUNK
    channels: int = CHANNELS
    format: int = FORMAT
    rate: int = RATE

    def __init__(self):
        self._pyaudio = pyaudio.PyAudio()
        self.is_stream_open: bool = True
        self.stream_get, stream_put = make_iter()
        self.stream = self._pyaudio.open(
            format=self.format,
            channels=self.channels,
            rate=self.rate,
            input=True,
            frames_per_buffer=self.chunk,
            stream_callback=stream_put,
        )
        self.stream.start_stream()
        
    def close(self):
        self.is_stream_open = False
        self.stream.close()
        self._pyaudio.terminate()


async def consumer(websocket):
    async for message in websocket:
        print(f"Received message: {msgpack.unpackb(message)}")
        await asyncio.sleep(0.2)


async def producer(websocket, audio_queue):
    while True:
        chunck = await audio_queue.get()
        print(f"Sending message with audio data of size: {len(chunck)}")
        await websocket.send(msgpack.packb(websocket_data_packet(chunck)))
        await asyncio.sleep(0.2)


async def handler(audio_queue):
    print("This is before creating a new websocket")
    async with websockets.connect(MY_URL) as websocket:
        print("Just created a new websocket")
        producer_task = asyncio.create_task(producer(websocket, audio_queue))
        consumer_task = asyncio.create_task(consumer(websocket))
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED,
            timeout=60,
        )
        for task in pending:
            task.cancel()
        websocket.close()


async def main():
    audio_queue = asyncio.Queue(maxsize=5)
    i = 0

    async for in_data in MicrophoneStreamer().stream_get:
        print(f"Processing item {len(in_data)}")
        # on trigger, create websockets connection
        if i == 2:
            asyncio.create_task(handler(audio_queue))
            await asyncio.sleep(0)  # starts the task
        # on each iteration, add element to the queue
        if audio_queue.full():
            _ = await audio_queue.get()
        await audio_queue.put(in_data)
        i += 1


asyncio.run(main())

When I run this code, here is what happens:

  • before i==2, elements gets added to the queue, everything works fine.
  • then, at i==2, the websockets get opened, messages get out and get in without issue
  • but then, the loop blocks. No new incoming bytes from the microphone streamer. Nothing happens anymore.

Would somebody know how to solve this issue?

Thank you very much

Original answered 16/8, 2023 at 9:27 Comment(4)
Yes - open the blocking thing in a second thread, and use a "plain" queue.Queue() to pass data back and forth. On the asynchronous end, all you have to do is not to try a blocking read of the queue - there are get_nowait and timeout=0 approaches to do that.Bidet
Thanks a lot, this is a great idea, I just tried and this almost works. The consumer and producer queue are then kind of "racing" (if one of them runs too fast, the other cant run), and I have to add an ugly asyncio.sleep(0.02) as pointed out in this post (https://mcmap.net/q/670736/-python-websockets-sends-messages-but-cannot-receive-messages-asynchronously). I had a similar version working before, but thought that going to an asynchronous queue would allow to solve this issue, as awaiting a new result would leave place for the other task to run. Or maybe using this asyncio.sleep is not the way to go? I can post the code if needed. Thank you!Original
that will vary. Adding await asyncio.sleep(0) in async code is something that is oddly needed when writting Python asyncio code. (0.02 will also work, but needing the ".02" or other arbitrary value is less common) - the "0" wait in sleep is handled in a separate code path, and just yields the control to the asyncio loop. If there are no awaits in a loop in an async function, the loop will never be reached, and other tasks will stall.Bidet
await asyncio.sleep(0) does not work for me, the .02 seems necessary in this case. It is true that it is quite odd and not very good, but I guess I'll live with it then. Thank you for your time and answer.Original

© 2022 - 2025 — McMap. All rights reserved.