"select" on multiple Python multiprocessing Queues?
Asked Answered
O

10

33

What's the best way to wait (without spinning) until something is available in either one of two (multiprocessing) Queues, where both reside on the same system?

Optometrist answered 14/7, 2009 at 6:58 Comment(0)
M
15

It doesn't look like there's an official way to handle this yet. Or at least, not based on this:

You could try something like what this post is doing -- accessing the underlying pipe filehandles:

and then use select.

Mausoleum answered 14/7, 2009 at 7:27 Comment(1)
@AndreHolzner there is a working version: web.archive.org/web/20141124021104/http://haltcondition.net:80/…Leukas
L
34

Actually you can use multiprocessing.Queue objects in select.select. i.e.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

would select que only if it is ready to be read from.

No documentation about it though. I was reading the source code of the multiprocessing.queue library (at linux it's usually sth like /usr/lib/python2.6/multiprocessing/queue.py) to find it out.

With Queue.Queue I didn't have found any smart way to do this (and I would really love to).

Liebowitz answered 29/1, 2010 at 13:29 Comment(3)
This works great on Unix, but on Windows the select.select implementation can only deal with sockets, not file descriptors and therefore this fails.Waistcloth
What's the main difference between Queue.Queue and multiprocessing.Queue, and can multiprocessing.Queue be used for multithreading and not just multiprocessing?Vidda
@Vidda I think queue.Queue is a data structure of blocking queue; the synchronization around queue.Queue depends on Mutex. There is no file descriptor or anything similar beneath queue.Queue, so we cannot OS system calls like select, epoll, kqueue, to wait for it.Stoichiometry
M
15

It doesn't look like there's an official way to handle this yet. Or at least, not based on this:

You could try something like what this post is doing -- accessing the underlying pipe filehandles:

and then use select.

Mausoleum answered 14/7, 2009 at 7:27 Comment(1)
@AndreHolzner there is a working version: web.archive.org/web/20141124021104/http://haltcondition.net:80/…Leukas
E
4

Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

My code for doing this is:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

The following test routine shows how to use it:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Hope this helps.

Tony Wallace

Erkan answered 26/4, 2011 at 11:10 Comment(0)
L
3

Seems like using threads which forward incoming items to a single Queue which you then wait on is a practical choice when using multiprocessing in a platform independent manner.

Avoiding the threads requires either handling low-level pipes/FDs which is both platform specific and not easy to handle consistently with the higher-level API.

Or you would need Queues with the ability to set callbacks which i think are the proper higher level interface to go for. I.e. you would write something like:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Maybe the multiprocessing package could grow this API but it's not there yet. The concept works well with py.execnet which uses the term "channel" instead of "queues", see here http://tinyurl.com/nmtr4w

Latvia answered 14/7, 2009 at 8:24 Comment(3)
That would be a very nice interface! (Though clearly there's benefit to keeping the stdlib interfaces tight, as Jesse mentions in the @ars' referenced bug report.)Optometrist
true but the current Queue public API doesn't handle your use case which i think is a common one.Latvia
If it's "common" - file a bug report + patch (with tests for the love of pete) on bugs.python.org and I can evaluate it for 2.7/3.xBromate
A
1

You could use something like the Observer pattern, wherein Queue subscribers are notified of state changes.

In this case, you could have your worker thread designated as a listener on each queue, and whenever it receives a ready signal, it can work on the new item, otherwise sleep.

Asaasabi answered 14/7, 2009 at 7:5 Comment(2)
Well, the get is destructive, so you can't really do observation on the queue itself as GoF describe it. The dequeue-ing thread would have to be the "observed" -- I was hoping for less overhead than two additional threads.Optometrist
Also, if I wanted a single point of access for the calling process (like in select) I would need a thread-safe queue on top of those two threads.Optometrist
E
1

New version of above code...

Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

My code for doing this is:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

The follow code is my test routine to show how it works:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)
Erkan answered 26/4, 2011 at 11:14 Comment(1)
speaking of engineered solutions...Terryterrye
G
1

As of Python 3.3 you can use multiprocessing.connection.wait to wait on multiple Queue._reader objects at once.

Geordie answered 11/2, 2019 at 13:16 Comment(1)
This is not part of the official API and can only be done by accessing the _reader part of the implementation, which is fragile.Wivina
T
0

The one situation where I'm usually tempted to multiplex multiple queues is when each queue corresponds to a different type of message that requires a different handler. You can't just pull from one queue because if it isn't the type of message you want, you need to put it back.

However, in this case, each handler is essentially a separate consumer, which makes it an a multi-producer, multi-consumer problem. Fortunately, even in this case you still don't need to block on multiple queues. You can create different thread/process for each handler, with each handler having its own queue. Basically, you can just break it into multiple instances of a multi-producer, single-consumer problem.

The only situation I can think of where you would have to wait on multiple queues is if you were forced to put multiple handlers in the same thread/process. In that case, I would restructure it by creating a queue for my main thread, spawning a thread for each handler, and have the handlers communicate with the main thread using the main queue. Each handler could then have a separate queue for its unique type of message.

Transmontane answered 11/7, 2021 at 14:45 Comment(0)
G
0

I'm trying something like this with threading.Queue; I imagine there is a multiprocessing analog:

event = threading.Event()

# in data producer
queue_a.put(data)
event.set()

# in data consumer
while True:
  event.clear()
  if queue_a.qsize():
    data = queue_a.get_nowait()
  elif queue_b.qsize():
    data = queue_b.get_nowait()
  else:
    event.wait(timeout=0.5)
    continue
  process(data)
Grannie answered 5/1 at 20:47 Comment(0)
A
-3

Don't do it.

Put a header on the messages and send them to a common queue. This simplifies the code and will be cleaner overall.

Amman answered 17/1, 2014 at 20:42 Comment(2)
Imagine we have different handlers for different message types in a system. If they are all reading from the same queue, then they need to put back any messages that aren't for them. By splitting the queues we don't have this concurrency issue.Bruell
In my situation, stepping back and reconsidering helped me make the code cleaner and robuster. I had two producers fighting over one consumer. Having only one queue for it was the right thing to do.Roughhouse

© 2022 - 2024 — McMap. All rights reserved.