You can use aio_pika
with RPC pattern and do the following:
Service 1 (consumes)
Consume in a loop:
# app/__init__.py
from fastapi import FastAPI
from app.rpc import consume
app = FastAPI()
...
@app.on_event('startup')
def startup():
loop = asyncio.get_event_loop()
# use the same loop to consume
asyncio.ensure_future(consume(loop))
...
Create connection, channel and register remote methods to be called from another service:
# app/rpc.py
from aio_pika import connect_robust
from aio_pika.patterns import RPC
from app.config import config
__all__ = [
'consume'
]
def remote_method():
# DO SOMETHING
# Move this method along with others to another place e.g. app/rpc_methods
# I put it here for simplicity
return 'It works!'
async def consume(loop):
connection = await connect_robust(config.AMQP_URI, loop=loop)
channel = await connection.channel()
rpc = await RPC.create(channel)
# Register your remote method
await rpc.register('remote_method', remote_method, auto_delete=True)
return connection
That's all you need to consume and respond now let's see the second service that calls this remote method.
Service 2 (calls remote method)
Let's create RPC middleware first to easily manage and access RPC object to call our remote methods from API functions:
# app/utils/rpc_middleware.py
import asyncio
from fastapi import Request, Response
from aio_pika import connect_robust
from aio_pika.patterns import RPC
from app.config import config
__all__ = [
'get_rpc',
'rpc_middleware'
]
async def rpc_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500)
try:
# You can also pass a loop as an argument. Keep it here now for simplicity
loop = asyncio.get_event_loop()
connection = await connect_robust(config.AMQP_URI, loop=loop)
channel = await connection.channel()
request.state.rpc = await RPC.create(channel)
response = await call_next(request)
finally:
# UPD: just thought that we probably want to keep queue and don't
# recreate it for each request so we can remove this line and move
# connection, channel and rpc initialisation out from middleware
# and do it once on app start
# Also based of this: https://github.com/encode/starlette/issues/1029
# it's better to create ASGI middleware instead of HTTP
await request.state.rpc.close()
return response
# Dependency to use rpc inside routes functions
def get_rpc(request: Request):
rpc = request.state.rpc
return rpc
Apply RPC middleware:
# app/__init__.py
from app.utils import rpc_middleware
...
app.middleware('http')(rpc_middleware)
...
Use RPC object via dependency in an API function:
# app/api/whatever.py
from aio_pika.patterns import RPC
from app.utils import get_rpc
...
@router.get('/rpc')
async def rpc_test(rpc: RPC = Depends(get_rpc)):
response = await rpc.proxy.remote_method()
...
Add some logging to track what's happening in both services. Also you can combine RPC logic from both services into one to be able to consume and call remote methods from whithin the same service.
Hope it helps to get basic idea.