Python ThreadPoolExecutor terminate all threads
Asked Answered
D

4

10

I am running a piece of python code in which multiple threads are run through threadpool executor. Each thread is supposed to perform a task (fetch a webpage for example). What I want to be able to do is to terminate all threads, even if one of the threads fail. For instance:

with ThreadPoolExecutor(self._num_threads) as executor:
    jobs = []
    for path in paths:
        kw = {"path": path}
        jobs.append(executor.submit(start,**kw))
    for job in futures.as_completed(jobs):
        result = job.result()
        print(result)
def start(*args,**kwargs):
    #fetch the page
    if(success):
        return True
    else:
        #Signal all threads to stop

Is it possible to do so? The results returned by threads are useless to me unless all of them are successful, so if even one of them fails, I would like to save some execution time of the rest of the threads and terminate them immediately. The actual code obviously is doing relatively lengthy tasks with a couple of failure points.

Disenthral answered 19/6, 2020 at 15:53 Comment(7)
Does this answer your question? asyncio: Is it possible to cancel a future been run by an Executor?Interior
Answers my question, but doesnt solve my problem. Thanks thoughDisenthral
Don't conflate "threads" with "tasks." Threads are agents in the operating system that do things (i.e., they run code.) Tasks are things that need to be done. A thread pool executor creates and manages its own threads—you should_not_ mess with them—to perform the tasks that you submit(...) to it.Hyperbaric
I am not that familiar with ThreadPoolExecutor , but maybe I can give you a hint with the way I usually end threads in python 2.7.Dahabeah
If possible use a global variable or a class attribute that can be seen by all working threads and add a condition at the end of each working thread, if job was successful or not set the global variable. ej PASS=True/False, and add checking loop or maybe if doing work inside a loop add a condition that checks for the state of this variable, when ever any of the threads reads PASS=False, then a return statement should follow, this way a worker thread can end.. ,Dahabeah
I was thinking of a similar solution, but what i really want to do is to kill the threads as soon as one of them fails. The problem with this solution is that the thread would start the next step of the lengthy job if no thread has failed when it checked the variable. I guess thats the best that can be achieved with threads. I might switch to processes to see if they can be terminated easily. Thanks for the suggestion though, really appreciate it.Disenthral
I seem a little late to the party here, but there seems to be a method in concurrent.futures for this set_running_or_notify_cancelNashoma
D
6

If you are done with threads and want to look into processes, then this piece of code here looks very promising and simple, almost the same syntax as threads, but with the multiprocessing module.

When the timeout flag expires the process is terminated, very convenient.

import multiprocessing

def get_page(*args, **kwargs):
    # your web page downloading code goes here

def start_get_page(timeout, *args, **kwargs):
    p = multiprocessing.Process(target=get_page, args=args, kwargs=kwargs)
    p.start()
    p.join(timeout)
    if p.is_alive():
        # stop the downloading 'thread'
        p.terminate()
        # and then do any post-error processing here

if __name__ == "__main__":
    start_get_page(timeout, *args, **kwargs)
Dahabeah answered 21/6, 2020 at 7:18 Comment(2)
Thank you for this, this might actually do the job.Disenthral
Thanks, this helped a lot!Artistry
P
0

In my code I used multiprocessing

import multiprocessing as mp
pool = mp.Pool()
for i in range(threadNumber):
    pool.apply_async(publishMessage, args=(map_metrics, connection_parameters...,))

pool.close()
pool.terminate()
Polypropylene answered 15/11, 2021 at 15:48 Comment(0)
N
-1

You can try to use StoppableThread from func-timeout. But terminating threads is strongly discouraged. And if you need to kill a thread, you probably have a design problem. Look at alternatives: asyncio coroutines and multiprocessing with legal cancel/terminating functionality.

Nolasco answered 21/6, 2020 at 11:37 Comment(5)
Could you explain why killing a thread is a bad design? Is it simply because thread is not designed to be killed?Apish
Not only. Explained hereNolasco
I guess I have a "design problem" in my tests... Thank youQuarterback
Damned geezers, I am using threads to test my app by spawning a couple of device emulators that will run in an infite loop and they will never finish. I need to kill the thread and I don't care for all the "good and bad" reasons about them. Stop telling others what to do when you don't know the usecase. Of course in a server app that should gracefully recover from a failure this is not encouraged, who on earth would like that?Sacroiliac
People could just answer that you need to make the task aware of an Event and observe it. When the Event is set gracefull kill the task, end of topic. docs.python.org/3/library/threading.html#event-objectsSacroiliac
D
-1

I have created an answer for a similar question I had, which I think will work for this question.

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

NUM_REQUESTS = 100


def long_request(id):
    sleep(1)

    # Simulate bad response
    if id == 10:
        return {"data": {"valid": False}}
    else:
        return {"data": {"valid": True}}


def check_results(results):
    valid = True
    for result in results:
        valid = result["data"]["valid"]

    return valid


def main():
    futures = []
    responses = []
    num_requests = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Future list
            futures.append(future)

        for future in as_completed(futures):

            is_responses_valid = check_results(responses)

            # Cancel all future requests if one invalid
            if not is_responses_valid:
                executor.shutdown(wait=False)
            else:
                # Append valid responses
                num_requests += 1
                responses.append(future.result())

    return num_requests


if __name__ == "__main__":
    requests = main()
    print("Num Requests: ", requests)
Drennen answered 10/8, 2021 at 14:14 Comment(1)
executor.shutdown(wait=False) just cancel un-run future, if future is running, it will keep running. If you have infinite loop in a future object, then it will never end.Venesection

© 2022 - 2025 — McMap. All rights reserved.