Is non-blocking Redis pubsub possible?
Asked Answered
S

11

35

I want to use redis' pubsub to transmit some messages, but don't want be blocked using listen, like the code below:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']

The last for section will block. I just want to check if a given channel has data, how can I accomplish this? Is there a check like method?

Submit answered 24/10, 2011 at 5:13 Comment(3)
Is there a reason you don't want to be blocked using listen? Redis connections are pretty cheap and it's generally typical to spawn several of them.Crabbed
Asynchronous PubSub in Python using Redis, ZMQ, Tornado - github.com/abhinavsingh/async_pubsubSmokeless
use the pubsub object's .get_message() method instead of .listen() (there's an example below). [That method may not have been supported in the Python Redis driver when this question was posted].Issacissachar
I
8

I don't think that would be possible. A Channel doesn't have any "current data", you subscribe to a channel and start receiving messages that are being pushed by other clients on the channel, hence it is a blocking API. Also if you look at the Redis Commands documentation for pub/sub it would make it more clear.

Illsorted answered 24/10, 2011 at 5:21 Comment(1)
I think this answer combined with the other one is pretty complete. He could put this into a thread. If he didnt want instant action taken when a chanel has activity then he could store it maybe in a dict and have his own check method that looks into the dict with a lock mutexLezlielg
L
50

If you're thinking of non-blocking, asynchronous processing, you're probably using (or should use) asynchronous framework/server.

UPDATE: It's been 5 years since the original answer, in the mean time Python got native async IO support. There now is AIORedis, an async IO Redis client.

Lietuva answered 8/1, 2013 at 16:32 Comment(5)
This is the right answer that should be checked-marked. I'm not sure why people would reinvent the wheel, there is an already existing async-client for redis, spawning a new thread is not really needed in the existance of such a client.Sprayberry
@securecurve: to be fair, I've added that answer more than a year after the one marked. However, both txRedis and brükva (from which Tornado-Redis forked) are both 3 years old, so it's not really an excuse.Lietuva
This answer has nothing to do with the question. As pointed out in the accepted answer, redis pushes messages to clients who are listening. Hence there is no way to ask for messages.Triumphal
@Glaslos: There is way to not block on listening for new messages. Which is exactly the question asked.Lietuva
I disagree, the listen call on the redis pub/sub channel will always block, also in the case where you run it in a thread/greenlet and switch the context while blocking.Triumphal
L
28

Accepted answer is obsolete as redis-py recommends you to use the non-blocking get_message(). But it also provides a way to easily use threads.

https://pypi.python.org/pypi/redis

There are three different strategies for reading messages.

Behind the scenes, get_message() uses the system’s ‘select’ module to quickly poll the connection’s socket. If there’s data available to be read, get_message() will read it, format the message and return it or pass it to a message handler. If there’s no data to be read, get_message() will immediately return None. This makes it trivial to integrate into an existing event loop inside your application.

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

Older versions of redis-py only read messages with pubsub.listen(). listen() is a generator that blocks until a message is available. If your application doesn’t need to do anything else but receive and act on messages received from redis, listen() is an easy way to get up an running.

 for message in p.listen():
     # do something with the message

The third option runs an event loop in a separate thread. pubsub.run_in_thread() creates a new thread and starts the event loop. The thread object is returned to the caller of run_in_thread(). The caller can use the thread.stop() method to shut down the event loop and thread. Behind the scenes, this is simply a wrapper around get_message() that runs in a separate thread, essentially creating a tiny non-blocking event loop for you. run_in_thread() takes an optional sleep_time argument. If specified, the event loop will call time.sleep() with the value in each iteration of the loop.

Note: Since we’re running in a separate thread, there’s no way to handle messages that aren’t automatically handled with registered message handlers. Therefore, redis-py prevents you from calling run_in_thread() if you’re subscribed to patterns or channels that don’t have message handlers attached.

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()

So to answer you question, just check get_message when you want to know if a message has arrived.

Lodmilla answered 24/2, 2016 at 15:50 Comment(1)
Here message reading is inside a loop. Why the subscriber won't receive the same message multiple times (as it is inside a loop)? Does the publisher keep track which subscriber has already received the message, or something else?Clarke
P
17

The new version of redis-py has support for asynchronous pubsub, check https://github.com/andymccurdy/redis-py for more details. Here's an example from the documentation itself:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)
Papery answered 26/6, 2014 at 12:38 Comment(0)
I
8

I don't think that would be possible. A Channel doesn't have any "current data", you subscribe to a channel and start receiving messages that are being pushed by other clients on the channel, hence it is a blocking API. Also if you look at the Redis Commands documentation for pub/sub it would make it more clear.

Illsorted answered 24/10, 2011 at 5:21 Comment(1)
I think this answer combined with the other one is pretty complete. He could put this into a thread. If he didnt want instant action taken when a chanel has activity then he could store it maybe in a dict and have his own check method that looks into the dict with a lock mutexLezlielg
L
8

This is a working example to thread the blocking listener.

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()
Lacerate answered 10/1, 2012 at 7:55 Comment(3)
Doesn't the GIL come into picture now? We could perhaps use multiprocessing (docs.python.org/2/library/multiprocessing.html) instead? But that methods incurs the overhead of creating a processGuernsey
GIL won't come to ruin the party because you are not operating in the CPU zone but network listen instead.Mathre
you should not be using listen() if the connection drops, listen() will never complete and your code will hang indefinitely #20485420Ruderal
D
4

Here is a nonblocking solution without threads:

fd = ps.connection._sock.fileno();
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block
if rlist:
    for rfd in rlist:
        if fd == rfd:
            message = ps.get_message()

ps.get_message() is enough on its own, but I use this method so that I can wait on multiple fds instead of just the redis connection.

Damnatory answered 6/12, 2014 at 2:7 Comment(0)
A
1

To reach a none blocking code you must do another kind of paradigm code. It's not hard, using a new thread to listen all changes and leaving main thread to do another things.

Also, you will need some mechanism to interchange data between main thread and redis subscriber thread.

Autum answered 24/10, 2011 at 18:5 Comment(0)
M
1

The most efficient approach would be greenlet-based rather than thread-based. As a greenlet-based concurrency framework, gevent is already quite established in the Python world. A gevent integration with redis-py would be therefore be wonderful. That is exactly what's being discussed in this issue on github:

https://github.com/andymccurdy/redis-py/issues/310

Mandelbaum answered 24/1, 2013 at 16:15 Comment(0)
S
0

You can use gevent, gevent monkey patching to build a non-blocking redis pubsub app.

Stream answered 28/11, 2012 at 12:44 Comment(0)
T
0

Redis' pub/sub sends messages to clients subscribed (listening) on a channel. If you are not listening, you will miss the message (hence the blocking call). If you want to have it non-blocking, I recommend using a queue instead (redis is pretty good at that too). If you have to use pub/sub you can use as suggested gevent to have a asynchronous, blocking listener, push messages to a queue and use a separate consumer to process messages from that queue in a non-blocking way.

Triumphal answered 24/3, 2014 at 14:0 Comment(0)
M
0

This is pretty straightforward. We check if a message exist, and continue go through the subscription until all messages have been processed.

import redis

r = redis.Redis(decode_responses=True)
subscription = r.pubsub()
subscription.psubscribe('channel')

r.publish('channel', 'foo')
r.publish('channel', 'bar')
r.publish('channel', 'baz')

message = subscription.get_message()
while message is not None:
  if message['data'] != 1:
      # Do something with message
      print(message)
  # Get next message
  message = subscription.get_message()
Mourant answered 1/11, 2021 at 11:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.