Celery as networked pub/sub events
Asked Answered
O

3

18

I want to set up a network pub/sub event system but also needs to be able to run tasks asynchronously. I have tried getting celery to do the heavy lifting but I feel like I am trying to shim a whole bunch of things just to get it working.

I have two machines (input and output) and they both have access to RabbitMQ. I would like to have a main program kick off a loop that waits for input (movement detected by a webcam). I have it set up that input_machine starts main.py which starts a celery task that is monitored by a worker on the input_machine subbed to "input" queue. This task just runs a while True loop until some input is detected, which it then calls another named ('project.entered_room' which does nothing) celery task to the "output" queue.

Meanwhile on output_machine, I have a celery instance watching the "output" queue with a task named ('project.entered_room' which responds to someone entering the room).

So when input is detected on input_machine, a task runs on output machine. I can get this to work but run into lots of import issues and other headaches. Is there an easier way to accomplish this? I am going about it all wrong? Am I using the wrong tools?

I have looked into a number of different frameworks including circuits and twisted. Twisted is very complex and I feel like I would be hitting a nail with a jackhammer.

Ostensory answered 26/6, 2015 at 1:39 Comment(0)
I
7

I would suggest to skip Celery and directly use Redis with its pub/sub functionality. You can spin up Redis for example by running the Docker image. Then on your input machine, when something is detected, you publish a message to a channel. On your output machine you subscribe to that channel and act on the events.

For example your input machine could use something like this:

import redis

def publish(message):
    r = redis.Redis(host="redis")
    r.publish("test-channel", message)

And then on the output side:

import time
import redis

def main():
    r = redis.Redis(host="redis", decode_responses=True)
    p = r.pubsub(ignore_subscribe_messages=True)
    p.subscribe("test-channel")

    while True:
        message = p.get_message()
        if message:
            print(message.get("data", ""))
            # Do more things...
        time.sleep(0.001)

In this way you can send plain text or JSON data between the input and output machine.

Find a sample implementation here: https://github.com/moritz-biersack/simple-async-pub-sub

Isogamete answered 1/11, 2020 at 15:4 Comment(1)
Since RabbitMQ can do pubsub (OP is using RabbitMQ), what are the advantages of Redis over RabbitMQ?Whelp
D
1

Celery is just a task manager.

RabbitMQ is your message broker. I would implement a RabbitMQ channel between your two machines and use publish/subscribe to manage your input.

Maybe this link can help you

Drop answered 10/11, 2017 at 0:16 Comment(0)
U
1

I was asking myself a similar question and found out that there is a Python package celery-pubsub that brings Pub/Sub capabilities to Celery.

Here is an example usage from the package description:

    import celery
    import celery_pubsub
    
    @celery.task
    def my_task_1(*args, **kwargs):
        return "task 1 done"
    
    
    @celery.task
    def my_task_2(*args, **kwargs):
        return "task 2 done"
    
    
    # First, let's subscribe
    celery_pubsub.subscribe('some.topic', my_task_1)
    celery_pubsub.subscribe('some.topic', my_task_2)
    
    # Now, let's publish something
    res = celery_pubsub.publish('some.topic', data='something', value=42)
    
    # We can get the results if we want to (and if the tasks returned something)
    # But in pub/sub, usually, there's no result.
    print(res.get())
    
    # This will get nowhere, as no task subscribed to this topic
    res = celery_pubsub.publish('nowhere', data='something else', value=23)
Uppercut answered 17/3, 2022 at 10:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.