FastAPI how to add ZMQ to eventloop [duplicate]
Asked Answered
F

1

14

I am surprised this has not really been asked in detail but for some reason i could not find this question or solution anywhere. It seems to be that a lot of people are having a problem where you have a fastAPI application that also needs to communicate to some other microservice (in something more efficient than http messages). I have read all of the zmq documentation on integration into asyncio, but so far i have not found anything about how to add zmq into the eventloop with fastapi (or even starlette for that matter). here is an example of the code from the zmq website:

import asyncio
import zmq
from zmq.asyncio import Context

ctx = Context.instance()

async def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')
    while True:
        msg = await s.recv_multipart()
        print('received', msg)
    s.close() 

This shows us an async function, which is great, but once again this needs to run in the eventloop alongside the fastAPI coroutine. How is this supposed to be done? The fastAPI documentation does not really provide any interface for us to run a separate coroutine besides a background task. I am not sure if there is any other magic happening in a background task, but for something that needs to communicate with another microservice, i would want this to have scheduling similar to the fastAPI coroutine. Furthermore, you can't start a background task at startup, so you have to do some hokey call to even get it running (it's hacky .. but technically works). Furthermore, it would be even better if we could just register a handler with something like

@app.set("zmq_recv)
async def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')....

This would, based on config somewhere, allow all messages on the zmq context to automatically go to this function. This would maybe allow us to run zmq INSIDE the fastAPI coroutine, just binding another port, and making sure that all traffic from that port goes to this special app.set method. I would be more than fine with something like this ...

ctx = Context.instance()

@app.on_event("startup")
async def startup_event():
    s = ctx.socket(zmq.PULL)
    s.bind('tcp://127.0.0.1:5555').setHandler("zmq_recv") # this setHandler is something magic that tells fastAPI to have all traffic on port 5555 to go to this handler function

@app.on_special_handler("zmq_recv")
async def zmq_recv(socket):
    msg = await socket.recv_multipart()
    print('received', msg)
    

This is ideally what i would like, using the fastAPI coroutine... but having zmq sockets invoke responses to a specific function. Is either creating a coroutine example OR this sort templated example possible? If not, how are people claiming you can use zmq with fastapi efficiently? (i guess background tasks could do the trick.. but it seems really hokey)

Freshet answered 20/5, 2020 at 12:9 Comment(2)
I'm not quite sure what you want in your question. Do you want to receive messages from zmq inside your asgi application worker?Toluidine
i want to receive messages inside the event loop, that doesn't block the fastapi coroutine. Or inside the fastapi coroutine itself.Freshet
R
4

I'm actually looking to do something similar, specifically for listenening to events comming from a message queue via a non http handler. My current solution is hooking into the event loop and adding the listener there. It could look something like this: (Taken from this issue in fastapi -> issue)

loop = asyncio.get_event_loop()

loop.create_task(serve(app, config))  # run fastapi
loop.create_task(your_tcp_app()) # run your app
loop.run_forever()  # start event loop

You could also maybe summon the event loop the app is using after initialization, more similarly to what you originaly did:

@app.on_event("startup")
async def startup_event():
    loop = asyncio.get_event_loop() # should return the loop fastapi is already using
    loop.create_task(your_tcp_app()) # run your app

I'm contemplating this solution though as i'm afraid of the thread managment in python as this will be live in a production service and i might just end up separating the two processes complelty to make things more simple... I'd like to know if that worked for you or if any one else has has any ideas on this sort of implementation :)

Roice answered 16/2, 2021 at 12:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.