Identify current thread in concurrent.futures.ThreadPoolExecutor
Asked Answered
A

1

5

the following code has 5 workers .... each opens its own worker_task()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(worker_task, command_, site_): site_ for site_ in URLS}

    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try: data = future.result()

BUT ..... inside each worker_task() ...... I cannot identify ... which of the 5 workers is currently being used (Worker_ID)

If I want to print('worker 3 has finished') inside worker_task() ..... I cannot do this because executor.submit does not allow

Any solutions?

Antiparticle answered 4/8, 2021 at 8:32 Comment(3)
#65481312 --> this should be helpful.Nesto
thank you, this does not assign a worker to my URLS list ... I would need to create a done/pending variable for every URLS ... which cannot even fit into the executor.submit() statement ........ this link provides the ability to track thread ID's ..... but doing so is useless without linking a task URL with a workerAntiparticle
Any feedback please?Jasmine
J
7

You can get name of worker thread with the help of threading.current_thread() function. Please find some example below:

from concurrent.futures import ThreadPoolExecutor, Future
from threading import current_thread
from time import sleep
from random import randint

# imagine these are urls
URLS = [i for i in range(100)]


def do_some_work(url, a, b):
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError("No! 5 found!")
    r = f"{current_thread().getName()}||: {url}_{rand_num}\n"
    return r


def show_fut_results(fut: Future):
    """Callback for future shows results or shows error"""
    if not fut.exception():
        print(fut.result())
    else:
        print(f"{current_thread().getName()}|  Error: {fut.exception()}\n")


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=10) as pool:
        for i in URLS:
            _fut = pool.submit(do_some_work, i, 1, 10)
            _fut.add_done_callback(show_fut_results)

If you want more control over threads, use threading module:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint
import logging

# imagine these are urls
URLS = [f"URL-{i}" for i in range(100)]

# number of worker threads
WORKER_NUM = 10


def do_some_work(url: str, a: int, b: int) -> str:
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError(f"No! 5 found in URL: {url}")
    r = f"{url} = {rand_num}"
    return r


def thread_worker_func(q: Queue, a: int, b: int) -> None:
    """Target function for Worker threads"""
    logging.info("Started working")
    while True:
        try:
            url = q.get()

            # if poison pill - stop worker thread
            if url is None:
                break

            r = do_some_work(url, a, b)
            logging.info(f"Result: {r}")
        except ValueError as ex:
            logging.error(ex)
        except Exception as ex:
            logging.error(f"Unexpected error: {ex}")

    logging.info("Finished working")


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format="%(levelname)s | %(threadName)s | %(asctime)s | %(message)s",
    )
    in_q = Queue(50)
    workers = [
        Thread(target=thread_worker_func, args=(in_q, 1, 10, ), name=f"MyWorkerThread-{i+1}")
        for i in range(WORKER_NUM)
    ]
    [w.start() for w in workers]

    # start distributing tasks
    for _url in URLS:
        in_q.put(_url)

    # send poison pills to worker-threads
    for w in workers:
        in_q.put(None)

    # wait worker thread to join Main Thread
    logging.info("Main Thread waiting for Worker Threads")
    [w.join() for w in workers]

    logging.info("Workers joined")
    sleep(10)
    logging.info("App finished")
Jasmine answered 4/8, 2021 at 16:36 Comment(6)
I want to send the name of thread thread1 ... into do_some_work() .......... I am trying to learn what the name is, and how to summon itAntiparticle
however, I realize now ... I could create a list [1,10] idle threads. then randomly select an idle thread ... and assign it within the function call submit()Antiparticle
your random number usage ... perhaps is the key to answer .... Prefer to see selection of [idle number] such as thread1 [busy], thread2 [busy], thread3 [selected] .............. instead of if rand_num == 5: ...... Thread_Assignment for printing [thread1 loaded texture] inside the do_some_work()Antiparticle
@Antiparticle if you want to control name of thread, you have to use threading.Thread instead of concurrent.futuretes.ThreadPoolExecutor. If you take a look at ThreadPoolExecutor source code, you will quickly realize that it is wrapper around threading.Thread: github.com/python/cpython/blob/3.9/Lib/concurrent/futures/… If you really need more control, e.g. control threads names, choose threading module. I can provide you with some example if you wish.Jasmine
Thanks Artiom ... If you do provide the threading.Thread example I will mark it as correct ... otherwise I do have examples available ... I've worked in threading.Thread ...Antiparticle
@Antiparticle added to asnwer example based on threading.Thread and producer-consumers pattern with poison pill techniqueJasmine

© 2022 - 2025 — McMap. All rights reserved.