What is the 'twisted' way of consuming messages from rabbitmq and forwarding them through its client connections?
Asked Answered
S

1

8

I am writing a websocket server in twisted to learn the framework. It will be receiving messages from a rabbitmq broker, and and sending out updates to connected clients. If I want to broadcast/multi-cast many messages at a time through many client connections, is calling (just as an example) deferToThread(channel.basic_consume, queue), or callInThread(" ") a very good option for doing so?

If not, what would be the twisted way of consuming messages from rabbitmq and forwarding them to connected clients?

My strategy is thus so far:

reactor_thread: listen on port(x) to setup and maintain client connections

other_thread: subscribe to a rabbitmq queue and consume messages if any (goes on forever)

Sedlik answered 6/8, 2016 at 0:19 Comment(1)
You should add the tags websocket, autobahn, crossbar so that the devs working on async websockets from Tavendo can help you too. They maybe able to provide a better solution.Commines
C
4

is calling (just as an example) deferToThread(channel.basic_consume, queue), or callInThread(" ") a very good option for doing so?

Using threads won't really provide much benefit in this situation since messages are already queued in RabbitMQ. I've been in similar situations in the past and I can give you a high level overview of what I did to solve the problem without using threads. Disclaimer: I haven't worked with RabbitMQ or Websockets for a year or 2 so my knowledge may be a bit fuzzy.

List Connected Clients

Assuming you're using autobahn for websockets, you can add a variable in the factory class (autobahn.twisted.websocket.WebSocketServerFactory) which will keep track of connected clients. Either list or dict will work fine.

factory = WebSocketServerFactory()
factory.connection_list = []

The connection_list variable will store protocol objects (autobahn.twisted.websocket.WebSocketServerProtocol) after a connection is made. In the protocol, you would need to overload the connectionMade function to append the protocol (self in this case) into self.factory.connection_list.

def connectionMade(self):
    super(WSProtocol, self).connectionMade()
    self.factory.connection_list.append(self)

It's probably best to create something like a "onConnect deferred" for flexibility but this is the gist of it. Maybe autobahn provides an interface to do so.

RabbitMQ

Using pika, you can consume messages asynchronously by using this example. Make the changes to channel and exchange names as necessary to make it work with your setup. Then we're going to make 2 changes. First we'll pass in factory.connection_list to the callbacks, then when a message is consumed, we'll write it to the connected client's protocols.

@defer.inlineCallbacks
def run(connection, proto_list):
    #...
    l = task.LoopingCall(read, queue_object, proto_list)
    l.start(0.01)

@defer.inlineCallbacks
def read(queue_object, proto_list):
    #...
    if body:
        print(body)
        for client in sorted(proto_list):
            yield client.write(body)

    yield ch.basic_ack(delivery_tag=method.delivery_tag)

#...
d.addCallback(run, factory.connection_list)
reactor.run()

In the read callback function, every time a message is consumed, the looping task will iterate the list of connected clients and send them the message.

Commines answered 11/8, 2016 at 12:58 Comment(4)
Thanks; can you add the part where the server is listening on a port for incoming client connections and how that will all work together? The reason I thought to use another thread was because I was going to use "consume" instead of "get" to recieve messages from rabbitmq. Their documentation recommended it because apparently more resources are used when executing a get.Sedlik
Thank you for accepting my answer (and the bounty :)) Sorry for the delayed response I hadn't noticed you made a comment. Do you still need the connection code? I figured you already had that part down. As for threads I recommend you learn to do it without them as their will be a shared variable (connection_list) and then you inherit the issues that come with shared states. Also threads delay learning of Twisted's vast async functionality (in my personal opinion). I'd recommend learning the async model then use something like crochet once you're comfortable.Commines
Sure, no problem it was an informative answer but I'm still new to this and I wanted to really understand how all of it fits together. Would I simply add in "reactor.listenTCP(8989, wsfactory)" where "wsfactory" is the websocket protocol factory? And as for the loopingcall, how about creating a rabbitmq consumer instead?Sedlik
Yep all you need to do is reactor.listenTCP(8‌​989, wsfactory). I haven't seen any examples using rabbit's consumer and Twisted so I'm not sure how this can be done unfortunately.Commines

© 2022 - 2024 — McMap. All rights reserved.