How to put an item back to a queue.Queue
Asked Answered
M

3

8

How do you return an item to a queue.Queue? This would be useful in threading or multiprocessing if the task fails, so that the task can not be lost.

The docs for queue.Queue.get() say that the function can "Remove and return an item from the queue," but I believe the use of the word "return" here refers to the function returning the item to the calling thread, not placing it back into the item queue. This is demonstrated by the below sample code just blocks infinitely on the main thread's second queue.Queue.get() call, instead of making it to the print() call in the thread.

import time
import threading
import queue


def threaded_func():
    thread_task = myqueue.get()
    print('thread_task: ' + thread_task)

myqueue = queue.Queue()
myqueue.put('some kind of task')
main_task = myqueue.get()
print('main_task: ' + main_task)

t = threading.Thread(target=threaded_func)
t.daemon = True
t.start()

time.sleep(5)
myqueue.get()   # This blocks indefinitely

I have to believe that there is a simple way to put the task back, so what is it? Calling task_done() and then put() with the task to put it back into the queue in two operations is not atomic and so could result in a lost item.

One possible, but clunky, solution would be to just try to execute the task again, but then you'd have to add a few extra lines to handle that complexity and I'm not even sure that all failed tasks could necessarily recover in that way.

Metabolize answered 15/4, 2017 at 4:2 Comment(2)
You're explaining the problem, but not really explaining the overall goal. So what are you even trying to accomplish here?Nailbiting
It's to not lose a work item that failed, to be able to process it later. My specific use is in calling REST API URLs, when some problem not innate to the work item URL may cause it to fail.Metabolize
F
4

Not all failed tasks can recover. You shouldn't retry them unless there is some reason to think they will pass at a later date. For instance, if your work item is a URL and connection failed count, you could implement some sort of a max-retries thing.

Your biggest problem is that you haven't implemented a viable worker model yet. You need 2 queues to have a bidirectional conversation with a worker. One to post work items and one to receive status. Once you have that, the receiver can always decide to cram that message back on the work queue. Here is an example with a lazy worker that just passes what its told.

import threading
import queue

def worker(in_q, out_q):
    while True:
        try:
            task, data = in_q.get()
            print('worker', task, data)
            if task == "done":
                return
            elif task == "pass this":
                out_q.put(("pass", data))
            else:
                out_q.put(("fail", data))
        except Exception as e:
            print('worker exception', e)
            out_q.put("exception", data)

in_que = queue.Queue()
out_que = queue.Queue()

work_thread = threading.Thread(target=worker, args=(in_que, out_que))
work_thread.start()

# lets make every other task a fail
in_que.put(('pass this', 0))
in_que.put(('fail this', 1))
in_que.put(('pass this', 2))
in_que.put(('fail this', 3))
in_que.put(('pass this', 4))
in_que.put(('fail this', 5))

pending_tasks = 6

while pending_tasks:
    status, data = out_que.get()
    if status == "pass":
        pending_tasks -= 1
    else:
        # make failing tast pass
        in_que.put(('pass this', data))

in_que.put(("done", None))
work_thread.join()
print('done')
Farah answered 15/4, 2017 at 5:11 Comment(3)
My demo code is just to illustrate the inability to put an item back on the queue, it's not my actual worker code.Metabolize
whoops, I didn't know keyboard enter would create the post instead of newline. I also wanted to say that I can see your code is using a second queue and counting down the items from the first queue and requeuing them if they failed. I see this extra queue and handling logic as an unnecessary complexity when one could just return an item back into the main queue, which is what my original question is about. So are you saying that one cannot return an item to the queue?Metabolize
Well, sure. All I did was put the message back on the main queue. The other queue was there to handle returns from the thread. Your code can't possibly work... that get on the main thread is supposed to do what, exactly? So I wrote code that did. "Can't work" to "works" is perhaps the minimum complexity.Farah
C
0

You can use a PriorityQueue where entries are returned in priority order.

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((2, 'item 1'))
q.put((2, 'item 2'))

q.get()  # item 1
q.put((1, 'item 1'))  # put item 1 back on the queue

q.get()  # item 1
q.get()  # item 2

Note that the return from get() is also a tuple of priority number and item.

Cherlycherlyn answered 16/10, 2020 at 7:24 Comment(0)
D
0

bear with me, if i unnecessarily go into more detail...

myqueue.get() blocks because there are no more open tasks which are not assigned to a thread.

i don't see why putting back failed tasks should have to be 'atomic':

mark as done with myqueue.task_done() and put it back with a new myqueue.put(main_task)

because of the sleep(5) the threadef_func should now be fast enough to get the thread_task. you might be missing a myqueue.task_done() in there as well.

the block on your last line will unfortunately still be there since the one task in the queue is now assigned to the threaded_func and there are no more tasks available.

you can readd that one task inside the threadef_func or add more tasks in general.

have a look at https://docs.python.org/3/library/queue.html in my opinion you are missing loops (in the threads working the queue) and perhaps a myqueue.join() at the end to make sure, the queue is done before the program ends. perhaps setting block and timeout in your get() will work out in some cases, i use it to end threads if the queue is empty for some seconds. while others block and run forever to take care of incoming tasks.

Dulse answered 27/8, 2024 at 21:24 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.