An old question, and variations of self._sentinel = Object()
will work. Revisiting this in 2021, I would instead suggest using concurrent.futures combined with using None
as your sentinel:
# Note: this is Python 3.8+ code
import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor
def worker(tup):
(q,i) = tup
print(f"Starting thread {i}")
partial_sum = 0
numbers_added = 0
while True:
try:
item = q.get()
if item is None:
# 'propagate' this 'sentinel' to anybody else
q.put(None)
break
numbers_added += 1
partial_sum += item
# need to pretend that we're doing something asynchronous
time.sleep(random.random()/100)
except Exception as e:
print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
break
print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
return partial_sum
MAX_RANGE = 1024
MAX_THREADS = 12
with ThreadPoolExecutor() as executor:
# create a queue with numbers to add up
(q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))
# kick off the threads
future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])
# they'll be done more or less instantly, but we'll make them wait
print("Threads launched with first batch ... sleeping 2 seconds")
time.sleep(2)
# threads are still available for more work!
for i in range(MAX_RANGE):
q.put(i)
print("Finished giving them another batch, this time we're not sleeping")
# now we tell them all to wrap it up
q.put(None)
# this will nicely catch the outputs
sum = functools.reduce(lambda x, y: x+y, future_partials)
print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")
# Starting thread 0
# Starting thread 1
# Starting thread 2
# Starting thread 3
# Starting thread 4
# Starting thread 5
# Starting thread 6
# Starting thread 7
# Starting thread 8
# Starting thread 9
# Starting thread 10
# Starting thread 11
# Threads launched with first batch ... sleeping 2 seconds
# Finished giving them another batch, this time we're not sleeping
# Thread 0 is done, saw a total of 175 numbers to add up
# Thread 3 is done, saw a total of 178 numbers to add up
# Thread 11 is done, saw a total of 173 numbers to add up
# Thread 4 is done, saw a total of 177 numbers to add up
# Thread 9 is done, saw a total of 169 numbers to add up
# Thread 1 is done, saw a total of 172 numbers to add up
# Thread 7 is done, saw a total of 162 numbers to add up
# Thread 10 is done, saw a total of 161 numbers to add up
# Thread 5 is done, saw a total of 169 numbers to add up
# Thread 2 is done, saw a total of 157 numbers to add up
# Thread 6 is done, saw a total of 169 numbers to add up
# Thread 8 is done, saw a total of 186 numbers to add up
# Got total sum 1047552 (correct answer is 1047552
Note how the de facto 'master thread' just need to push None
into the queue, similar to a conditional variable 'signal', which the threads all pick up (and propagate).
Also, this does not use the multiprocessor Queue
which is heavier-weight than the standard (thread-safe) queue. The above code also has the benefit of easily being modified to using ProcessPoolExecutor
or hybrids of both (in either case yes you would need to use multiprocessing.Queue
).
(Side note: generally speaking, if classes are needed to solve a "fundamental" issue in any given generation of Python, there are often new options in more modern versions.)
(Second side note: The only reason the code is Python 3.8+ is because I'm a fan of assignment expressions, which, in line with the above side note, resolves the historical issue of how to initialize a queue from a list without having to resort to non-functional solutions.)