Python iterable Queue
Asked Answered
J

4

12

I need to know when a Queue is closed and wont have more items so I can end the iteration.

I did it by putting a sentinel in the queue:

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return self

    def close(self):
        self.put(self._sentinel)

    def next(self):
        item = self.get()
        if item is self._sentinel:
            raise StopIteration
        else:
            return item

Given that this is a very common use for a queue, isn't there any builtin implementation?

Jorgensen answered 2/7, 2012 at 5:26 Comment(1)
I either use the sentinel, or a flag within the thread to stop the iteration over the queue. For the later, I usually wait with a timeout.Eulalia
C
15

A sentinel is a reasonable way for a producer to send a message that no more queue tasks are forthcoming.

FWIW, your code can be simplified quite a bit with the two argument form of iter():

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)
Cordell answered 2/7, 2012 at 6:8 Comment(0)
E
4

The multiprocessing module has its own version of Queue that does include a close method. I am not sure how it works in threading, but its worth a try. I don't see why it shouldn't work the same:

from multiprocessing import Queue

q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()

You could just catch the IOError as the close signal.

TEST

from multiprocessing import Queue
from threading import Thread

def worker(q):
    while True:
        try:
            item = q.get(timeout=.5)
        except IOError:
            print "Queue closed. Exiting thread."
            return
        except:
            continue
        print "Got item:", item

q = Queue()
for i in xrange(3):
    q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.

Though to be honest, its not too much different than setting a flag on the Queue.Queue. The multiprocessing.Queue is just using a closed file descriptor as a flag:

from Queue import Queue

def worker2(q):
    while True:
        if q.closed:
            print "Queue closed. Exiting thread."
            return
        try:
            item = q.get(timeout=.5)
        except:
            continue
        print "Got item:", item

q = Queue()
q.closed = False
for i in xrange(3):
    q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.
Eulalia answered 2/7, 2012 at 5:41 Comment(0)
F
0

An old question, and variations of self._sentinel = Object() will work. Revisiting this in 2021, I would instead suggest using concurrent.futures combined with using None as your sentinel:

# Note: this is Python 3.8+ code                                                                                                                                                   

import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor

def worker(tup):
    (q,i) = tup
    print(f"Starting thread {i}")
    partial_sum = 0
    numbers_added = 0
    while True:
        try:
            item = q.get()
            if item is None:
                # 'propagate' this 'sentinel' to anybody else                                                                                                                      
                q.put(None)
                break
            numbers_added += 1
            partial_sum += item
            # need to pretend that we're doing something asynchronous                                                                                                              
            time.sleep(random.random()/100)

    except Exception as e:
            print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
            break

    print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
    return partial_sum

MAX_RANGE = 1024
MAX_THREADS = 12

with ThreadPoolExecutor() as executor:

    # create a queue with numbers to add up                                                                                                                                        
    (q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))

    # kick off the threads                                                                                                                                                         
    future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])

    # they'll be done more or less instantly, but we'll make them wait                                                                                                             
    print("Threads launched with first batch ... sleeping 2 seconds")
    time.sleep(2)

    # threads are still available for more work!                                                                                                                                   
    for i in range(MAX_RANGE):
        q.put(i)

    print("Finished giving them another batch, this time we're not sleeping")

    # now we tell them all to wrap it up                                                                                                                                           
    q.put(None)
    # this will nicely catch the outputs                                                                                                                                           
    sum = functools.reduce(lambda x, y: x+y, future_partials)
    print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")

# Starting thread 0                                                                                                                                                                
# Starting thread 1                                                                                                                                                                
# Starting thread 2                                                                                                                                                                
# Starting thread 3                                                                                                                                                                
# Starting thread 4                                                                                                                                                                
# Starting thread 5                                                                                                                                                                
# Starting thread 6                                                                                                                                                                
# Starting thread 7                                                                                                                                                                
# Starting thread 8                                                                                                                                                                
# Starting thread 9                                                                                                                                                                
# Starting thread 10                                                                                                                                                               
# Starting thread 11                                                                                                                                                               
# Threads launched with first batch ... sleeping 2 seconds                                                                                                                         
# Finished giving them another batch, this time we're not sleeping                                                                                                                 
# Thread 0 is done, saw a total of 175 numbers to add up                                                                                                                           
# Thread 3 is done, saw a total of 178 numbers to add up                                                                                                                           
# Thread 11 is done, saw a total of 173 numbers to add up                                                                                                                          
# Thread 4 is done, saw a total of 177 numbers to add up                                                                                                                           
# Thread 9 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 1 is done, saw a total of 172 numbers to add up                                                                                                                           
# Thread 7 is done, saw a total of 162 numbers to add up                                                                                                                           
# Thread 10 is done, saw a total of 161 numbers to add up                                                                                                                          
# Thread 5 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 2 is done, saw a total of 157 numbers to add up                                                                                                                           
# Thread 6 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 8 is done, saw a total of 186 numbers to add up                                                                                                                           
# Got total sum 1047552 (correct answer is 1047552      

                                                                                                                       

Note how the de facto 'master thread' just need to push None into the queue, similar to a conditional variable 'signal', which the threads all pick up (and propagate).

Also, this does not use the multiprocessor Queue which is heavier-weight than the standard (thread-safe) queue. The above code also has the benefit of easily being modified to using ProcessPoolExecutor or hybrids of both (in either case yes you would need to use multiprocessing.Queue).

(Side note: generally speaking, if classes are needed to solve a "fundamental" issue in any given generation of Python, there are often new options in more modern versions.)

(Second side note: The only reason the code is Python 3.8+ is because I'm a fan of assignment expressions, which, in line with the above side note, resolves the historical issue of how to initialize a queue from a list without having to resort to non-functional solutions.)

Frictional answered 29/8, 2021 at 19:8 Comment(0)
B
0

I have implemented and asyncio.Queue compatible queue that can be iterated using a async for loop. See queutils.IterableQueue.

Features

  • asyncio.Queue interface
  • AsyncIterable support: async for item in queue:
  • Automatic termination of the consumers with QueueDone exception when the queue has been emptied
  • Producers must be registered with add_producer() and they must notify the queue with finish() once they have finished adding items
  • Countable interface to count number of items task_done() through count property
  • Countable property can be disabled with count_items=False. This is useful when you want to sum the count of multiple IterableQueues.

Install

pip install queutils

Usage

Producers fill a queue

A Producer is "process" that adds items to the queue. A producer needs to be registered to the queue with add_producer() coroutine. Once a producer has added all the items it intends to, it notifies the queue with finish()

from queutils import IterableQueue

async def producer(
    Q: IterableQueue[int], N: int
) -> None:

    # Add a producer to add items to the queue
    await Q.add_producer()
    
    for i in range(N):
        await Q.put(i)
    
    # notify the queue that this producer does not add more
    await Q.finish()
    
    return None

Consumers take items from the queue

Consumer is a "process" that takes items from a queue with get() coroutine. Since IterableQueue is AsyncIterable, it can be iterated over async for.

from queutils.iterablequeue import IterableQueue

async def consumer(Q: IterableQueue[int]):
    """
    Consume the queue
    """
    async for i in Q:
        print(f"consumer: got {i} from the queue")        
    print(f"consumer: queue is done")
Brunner answered 25/3 at 20:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.