asyncio : How to queue object when an exception occurs
Asked Answered
H

1

0

Hi I have to process several objects queued 5 at time. I have a queue of 5 items. Sometimes process fails and an exception occurs:

async def worker(nam):
while True:
    queue_item = await queue.get()

Worker starts the process loop and tries to process items

try:
        loop = asyncio.get_event_loop()
        task = loop.create_task(download(queue_item, path))
        download_result = await asyncio.wait_for(task, timeout=timeout)

        except asyncio.TimeoutError:

unfortunately the process timed out. Can I add like this ?

     except asyncio.TimeoutError:
     await queue.put(queue_item)

I want to process again that item on next round Thank

Hanshaw answered 6/1, 2021 at 21:43 Comment(0)
Y
3

Yes, you can re-queue an object at the end of the queue for processing. A simple example based on your code:

import asyncio
from random import randrange


async def download(item):
    print("Process item", item)

    if randrange(4) == 1:  # simulate occasional event
        await asyncio.sleep(100)  # trigger timeout error


async def worker(queue):
    while True:
        queue_item = await queue.get()
        try:
            result = await asyncio.wait_for(download(queue_item), timeout=1)
        except asyncio.TimeoutError:
            print("Timeout for ", queue_item)
            await queue.put(queue_item)
        queue.task_done()


async def main():
    q = asyncio.Queue()
    asyncio.create_task(worker(q))
    for i in range(5):  # put 5 items to process
        await q.put(i)
    await q.join()


asyncio.run(main())
Process item 0
Timeout for  0
Process item 1
Process item 2
Process item 3
Timeout for  3
Process item 4
Process item 0
Process item 3
Yogini answered 7/1, 2021 at 10:27 Comment(1)
Wonderful, Thanks Alex, have a nice day.Hanshaw

© 2022 - 2024 — McMap. All rights reserved.