I am surprised this has not really been asked in detail but for some reason i could not find this question or solution anywhere. It seems to be that a lot of people are having a problem where you have a fastAPI application that also needs to communicate to some other microservice (in something more efficient than http messages). I have read all of the zmq documentation on integration into asyncio, but so far i have not found anything about how to add zmq into the eventloop with fastapi (or even starlette for that matter). here is an example of the code from the zmq website:
import asyncio
import zmq
from zmq.asyncio import Context
ctx = Context.instance()
async def recv():
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:5555')
s.subscribe(b'')
while True:
msg = await s.recv_multipart()
print('received', msg)
s.close()
This shows us an async function, which is great, but once again this needs to run in the eventloop alongside the fastAPI coroutine. How is this supposed to be done? The fastAPI documentation does not really provide any interface for us to run a separate coroutine besides a background task. I am not sure if there is any other magic happening in a background task, but for something that needs to communicate with another microservice, i would want this to have scheduling similar to the fastAPI coroutine. Furthermore, you can't start a background task at startup, so you have to do some hokey call to even get it running (it's hacky .. but technically works). Furthermore, it would be even better if we could just register a handler with something like
@app.set("zmq_recv)
async def recv():
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:5555')
s.subscribe(b'')....
This would, based on config somewhere, allow all messages on the zmq context to automatically go to this function. This would maybe allow us to run zmq INSIDE the fastAPI coroutine, just binding another port, and making sure that all traffic from that port goes to this special app.set method. I would be more than fine with something like this ...
ctx = Context.instance()
@app.on_event("startup")
async def startup_event():
s = ctx.socket(zmq.PULL)
s.bind('tcp://127.0.0.1:5555').setHandler("zmq_recv") # this setHandler is something magic that tells fastAPI to have all traffic on port 5555 to go to this handler function
@app.on_special_handler("zmq_recv")
async def zmq_recv(socket):
msg = await socket.recv_multipart()
print('received', msg)
This is ideally what i would like, using the fastAPI coroutine... but having zmq sockets invoke responses to a specific function. Is either creating a coroutine example OR this sort templated example possible? If not, how are people claiming you can use zmq with fastapi efficiently? (i guess background tasks could do the trick.. but it seems really hokey)