Combining threading and asyncio to process audio stream through a WebSocket connection
Asked Answered
N

0

7

Overview

I have a server which has an an open WebSocket connection with one client application. This client application (android app) can send live microphone audio data. What the server has to do in response to receiving this data is to reply with partial transcriptions so that the user can see what they are saying be transcribed in real time. I use Google Speech-to-text API for this.

I am also well aware that android has a built in speech recogniser which achieves exactly this.

The server is launched using asyncio.run and incoming data is passed to handlers which all use asynchronous methods. These are the methods that are given the responsibility of handling the reception of an audio frame:

elif action == util.ActionMessages.AUDIO_FRAME:
    audio_id, audio = content["id"], content["audio"]
    await self._audio_handler.receive_audio(audio, audio_id)


# Audio handler method
class AudioHandler:
    def __init__(self, client_handler: ClientHandler):
        self._client_handler = client_handler


        self._audio_finished = dict()

        self._is_streaming = False
        self._audio_queue = queue.Queue()
        self._languages = "en-US"

        self._speech_client = speech.SpeechClient()
        config = speech.RecognitionConfig(...)
        self._streaming_config = speech.StreamingRecognitionConfig(...)

        self._executor = ThreadPoolExecutor(max_workers=1)
        self._request_built = False


    async def receive_audio(self, content: str | None, audio_id: str):

        is_audio_complete = self._audio_finished.setdefault(audio_id, False)
        if content and not is_audio_complete:
            self._is_streaming = True
            content = base64.b64decode(content)
            self._audio_queue.put(content)

            if not self._request_built:
                future = self._executor.submit(self._build_requests)
                future.add_done_callback(lambda f: self._on_audio_processing_complete(f, audio_id))
                self._request_built = True

        elif is_audio_complete:
            # TODO: Implement audio processing complete like clean up dictionary
            pass

        else:
            self._request_built = False
            self._is_streaming = False
            self._audio_queue.put(None)



    def _on_audio_processing_complete(self, future, audio_id):
        self._audio_finished[audio_id] = True
        self._request_built = False

    def _read_audio(self):
        while self._is_streaming:
            chunk = self._audio_queue.get()
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    chunk = self._audio_queue.get_nowait()
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)

    def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Send transcript clients
            print(transcript + overwrite_chars)

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

    def _build_requests(self):
        audio_generator = self._read_audio()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in audio_generator
        )

        responses = self._speech_client.streaming_recognize(self._streaming_config, requests)
        self._listen_print_loop(responses)

When audio comes in with the utterance id (audio id) it fills up a queue. On first arrival a new thread is launched which instantiates a generator which reads audio samples from the queue and converts them to the appropriate type. The google speech client uses this generator to perform the transcription. This speech client returns a responses generator which is the used by the _listen_print_loop method to (for now) print the responses/transcriptions.

The logic for using the Google speech api is largely based on their docs.

The issue

As you can imagine, printing the transcription server side is not what I want. I would like to send these partial transcriptions to my client application. However, the method I use to send messages through the socket is async and thus in this implementation cannot be sent from the _listen_print_loop method as it itself is not async. Here is what I mean:

def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Cannot do this!
            await send_to_client(transcript + overwrite_chars)

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

I would like to know what the best solution for this is. Is it switching from using threading to only asyncio? If so, would that not mean I would have to implement an async generator function? Would that not cause issues with the speech client?

I am relatively new to asyncio, any pointers would be greatly appreciated!

Edit: Using asyncio.run_coroutine_threadsafe()

I have tried the following to no avail:

def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Send on current even loop passed to client handler
            print(transcript + overwrite_chars)
            asyncio.run_coroutine_threadsafe(
                send_to_clients(transcript + overwrite_chars),
                self._client_handler.loop,
            )

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

In this change I use asyncio's asyncio.run_coroutine_threadsafe() to run the coroutine send_to_clients. The loop variable is set on launch as follows:

async def launch_server():
    # For threads
    client_handler.loop = asyncio.get_running_loop()

    ip_address = "0.0.0.0"
    port = int(os.getenv("PORT"))
    server = await websockets.serve(
        websocket_server,
        ip_address,
        port,
        process_request=health_check
    )
   
    await asyncio.shield(server.wait_closed())


if __name__ == "__main__":
    asyncio.run(launch_server())

This solution does not work as when it reaches the line to send the transcript the WebSocket closes with the following exception:

WebSocket connection closed: no close frame received or sent.

Edit: Using speech.SpeechAsyncClient

I have discovered the the speech module offers the speech.SpeechAsyncClient (link) which I have used as follows:

async def receive_audio(self, content: str | None, audio_id: str):
    is_audio_complete = self._audio_finished.setdefault(audio_id, False)
    if content and not is_audio_complete:
        self._is_streaming = True
        content = base64.b64decode(content)
        await self._audio_queue.put(content)

        if not self._request_built:
            self._request_built = True
            await self._build_requests()

    elif is_audio_complete:
        pass

    else:
        self._request_built = False
        self._is_streaming = False
        await self._audio_queue.put(None)

async def _read_audio(self):
    print("Reading audio")

    config_request = speech.StreamingRecognizeRequest()
    config_request.streaming_config = self._streaming_config
    yield config_request

    while self._is_streaming:
        chunk = await self._audio_queue.get()
        if chunk is None:
            return
        data = [chunk]

        while True:
            try:
                chunk = await self._audio_queue.get_nowait()
                if chunk is None:
                    return
                data.append(chunk)
            except queue.Empty:
                break

        request = speech.StreamingRecognizeRequest()
        request.audio_content = b"".join(data)
        yield request

async def _build_requests(self):
    audio_generator = self._read_audio()
    responses = await self._speech_client.streaming_recognize(
        requests=audio_generator,
    )
    print("Listening for audio")
    await self._listen_print_loop(responses)

This does not cause any error, however, for some reason the program hangs when await the streaming_recognize(...) method. More specifically, the generator _read_audio() is never called meaning no audio is ever processed.

Edit 2.1: Forgot to mention that asyncio.Queue is being used here

Edit 2.2: I have implemented the feature in a scratch file using this method (and my microphone directly) and it works. The issue here is still that the generator is never called (print("Reading audio") never reached. This leads me to believe it is the way I am handling asyncio.

Nightdress answered 31/8, 2024 at 15:27 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.