How to use FastAPI as consumer for RabbitMQ (RPC)
Asked Answered
O

1

5

The example here shows how to create both client & server in python using Remote procedure call (RPC).

But I can't imagine how the FastAPI service might be a server to consume requests from RCP client using pika for RabbitMQ.

Any web service will be requested by calling them explicitly, however, I can't imagine how to integrate RabbitMQ consumer inside web service.

On other hand, for the client it could be easy to do that, by calling the web service explicitly you can publish a request for the queue, see this example

Any help please? or a good start for that?

Obidiah answered 5/1, 2021 at 21:31 Comment(1)
I'm not sure where you're heading with your application. IMHO API endpoints are "passive" applications that wait for a request and provide an answer to the client, while a queue/stream consumer usually is a different usecase, always listening for message to then do something with the data without "replying". I'm not saying the two things can't go together, but without additional details on your goal I find it hard to fully understand how to answer.Banish
V
10

You can use aio_pika with RPC pattern and do the following:

Service 1 (consumes)

Consume in a loop:

# app/__init__.py

from fastapi import FastAPI
from app.rpc import consume

app = FastAPI()

...

@app.on_event('startup')
def startup():
    loop = asyncio.get_event_loop()
    # use the same loop to consume
    asyncio.ensure_future(consume(loop))

...

Create connection, channel and register remote methods to be called from another service:

# app/rpc.py

from aio_pika import connect_robust
from aio_pika.patterns import RPC

from app.config import config

__all__ = [
    'consume'
]


def remote_method():
    # DO SOMETHING
    # Move this method along with others to another place e.g. app/rpc_methods
    # I put it here for simplicity
    return 'It works!'

async def consume(loop):
    connection = await connect_robust(config.AMQP_URI, loop=loop)
    channel = await connection.channel()
    rpc = await RPC.create(channel)

    # Register your remote method
    await rpc.register('remote_method', remote_method, auto_delete=True)
    return connection

That's all you need to consume and respond now let's see the second service that calls this remote method.

Service 2 (calls remote method)

Let's create RPC middleware first to easily manage and access RPC object to call our remote methods from API functions:

# app/utils/rpc_middleware.py

import asyncio

from fastapi import Request, Response

from aio_pika import connect_robust
from aio_pika.patterns import RPC

from app.config import config

__all__ = [
    'get_rpc',
    'rpc_middleware'
]


async def rpc_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        # You can also pass a loop as an argument. Keep it here now for simplicity
        loop = asyncio.get_event_loop()
        connection = await connect_robust(config.AMQP_URI, loop=loop)
        channel = await connection.channel()
        request.state.rpc = await RPC.create(channel)
        response = await call_next(request)
    finally:

        # UPD: just thought that we probably want to keep queue and don't
        # recreate it for each request so we can remove this line and move
        # connection, channel and rpc initialisation out from middleware 
        # and do it once on app start

        # Also based of this: https://github.com/encode/starlette/issues/1029
        # it's better to create ASGI middleware instead of HTTP
        await request.state.rpc.close()
    return response


# Dependency to use rpc inside routes functions
def get_rpc(request: Request):
    rpc = request.state.rpc
    return rpc

Apply RPC middleware:

# app/__init__.py

from app.utils import rpc_middleware

...

app.middleware('http')(rpc_middleware)

...

Use RPC object via dependency in an API function:

# app/api/whatever.py

from aio_pika.patterns import RPC

from app.utils import get_rpc

...

@router.get('/rpc')
async def rpc_test(rpc: RPC = Depends(get_rpc)):
    response = await rpc.proxy.remote_method()
    ...

Add some logging to track what's happening in both services. Also you can combine RPC logic from both services into one to be able to consume and call remote methods from whithin the same service.

Hope it helps to get basic idea.

Vue answered 3/5, 2021 at 23:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.