The right way to limit maximum number of threads running at once?
Asked Answered
M

12

76

I'd like to create a program that runs multiple light threads, but limits itself to a constant, predefined number of concurrent running tasks, like this (but with no risk of race condition):

import threading

def f(arg):
    global running
    running += 1
    print("Spawned a thread. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    running -= 1
    print("Done")

running = 0
while True:
    if running < 8:
        arg = get_task()
        threading.Thread(target=f, args=[arg]).start()

What's the safest/fastest way to implement this?

Mohun answered 14/10, 2013 at 21:37 Comment(9)
It sounds like you are describing a thread pool.Regeneracy
Does thread pool imply storing the references to threads? I'd like to keep it as light as possible.Mohun
docs.python.org/2/library/…Lenes
Just edited the question - replaced "CPU intensive" to "light" as it fits my problem more precisely.Mohun
It doesn't really matter if a reference to the thread is stored or not, does it? Eight object references are not going to make or break your program.Regeneracy
I had the thought that replacing them rapidly could cause a big performance penalty.Mohun
BTW, I updated the code to show that I need to keep pulling the thread arguments.Mohun
Replacing an object reference, compared to the overhead of starting a new thread, is extremely fast. Have you done any benchmarks, or are you simply trying to guess what is "light" and what is not?Regeneracy
potential duplicate of #1787897Interfere
R
68

It sounds like you want to implement the producer/consumer pattern with eight workers. Python has a Queue class for this purpose, and it is thread-safe.

Each worker should call get() on the queue to retrieve a task. This call will block if no tasks are available, causing the worker to go idle until one becomes available. Then the worker should execute the task and finally call task_done() on the queue.

You would put tasks in the queue by calling put() on the queue.

From the main thread, you can call join() on the queue to wait until all pending tasks have been completed.

This approach has the benefit that you are not creating and destroying threads, which is expensive. The worker threads will run continuously, but will be asleep when no tasks are in the queue, using zero CPU time.

(The linked documentation page has an example of this very pattern.)

Regeneracy answered 14/10, 2013 at 21:50 Comment(10)
Works great, but I'm wondering how to signal the threads that I finished sending the tasks?Mohun
Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.Airspace
Queue definitely gets you use threads wisely, but that is not limiting the number of threads being created(and started simultaneously), a limit defined in pool will just wait for queue to finish those number of tasks in queue and move on to the next number remaining. if the size of data is big, queue is still holding up too much consuming the memory slowing down the system performance.Depict
@san: A queue doesn't directly limit the threads, but it allows you to limit the threads easily by using a pool, and the linked example shows exactly how to do that. And storing more data in the queue doesn't slow down system performance at all, or at least no more than storing it in a list would; it's just some locks around a deque, which takes no more storage than a list.Baudelaire
@Baudelaire You only have to send the suicide task once if you do what Martin said: "Any thread that gets such a task requeues it and then commits suicide."Regeneracy
@MartinJames The suggestion about sending an 'I am finished task' such that any thread that gets it requeues it an then commits suicide does not work very well for me. I have a follow-up question about it here: https://mcmap.net/q/102881/-how-to-make-worker-threads-quit-after-work-is-finished-in-a-multithreaded-producer-consumer-pattern/1175080 .Happening
hi @cdhowie, I am a little late to the party but how will this approach explicitly limit the max number of threads to only 8?Stelmach
This approach combined with Paul's answer is the best answer.Salpingitis
If anyone looking for the Python3 version, you can find it here. docs.python.org/3/library/queue.htmlEfrem
This leads to UnboundLocalError:, while manipulating data in the queue. Is there any way you can make sure such errors don't happen??Wholewheat
F
34

semaphore is a variable or abstract data type that is used to control access to a common resource by multiple processes in a concurrent system such as a multiprogramming operating system; this can help you here.

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class MyThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            self.Executemycode()
        finally:
            threadLimiter.release()

    def Executemycode(self):
        print(" Hello World!") 
        # <your code here>

This way you can easily limit the number of threads that will be executed concurrently during the program execution. Variable, 'maximumNumberOfThreads' can be used to define an upper limit on the maximum value of threads.

credits

Footlight answered 7/5, 2014 at 17:34 Comment(2)
Thanks! This is exactly what I wanted!Louden
Does not work with python 3.7.6 on MAC OS. Does not limits number of threads to 1Coelom
H
22

I ran into this same problem and spent days (2 days to be precise) getting to the correct solution using a queue. I wasted a day going down the ThreadPoolExecutor path because there is no way to limit the number of threads that thing launches! I fed it a list of 5000 files to copy and the code went non-responsive once it got up to about 1500 concurrent file copies running all at once. The max_workers parameter on the ThreadPoolExecutor only controls how many workers are spinning up threads not how many threads get spun up.

Ok, anyway, here is a very simple example of using a Queue for this:

import threading, time, random
from queue import Queue

jobs = Queue()

def do_stuff(q):
    while not q.empty():
        value = q.get()
        time.sleep(random.randint(1, 10))
        print(value)
        q.task_done()

for i in range(10):
    jobs.put(i)

for i in range(3):
    worker = threading.Thread(target=do_stuff, args=(jobs,))
    worker.start()

print("waiting for queue to complete", jobs.qsize(), "tasks")
jobs.join()
print("all done")
Hyperborean answered 12/2, 2019 at 20:38 Comment(8)
I don't think you need the time.sleep(random.randint(1, 10)) since the q.get() should block until there's something in the queueObservable
Thanks Vasilis, that time.sleep is only there so that when you run the example you can see the queue tasks executing out of order to illustrate that three separate threads are executing tasks each of which has an unknown time to complete. This is the exact sample I built to make sure I had the logic right before applying it to a queue of thousands of file copy tasks.Hyperborean
You Can limit the number of threads it launches at once as follows: ThreadPoolExecutor(max_workers=10) or 20 or 30 etcRedhanded
Divij, The max_workers parameter on the ThreadPoolExecutor only controls how many workers are spinning up threads not how many threads get spun up. If you set it to 1 then you get single threaded performance. If you set it to 2 and you have a queue if several thousand long running tasks those two workers start spinning up threads and don't stop until they have spun up a thread for every item. If those tasks are competing for the same resource like memory, storage, or network, you will have a big problem on your hands.Hyperborean
question: where is the number of concurrent threads being limited? when you do the second for loop, it spins up three threads that continue working until all of jobs is consumed?Softpedal
@PaulJacobs, look at the ThreadPoolExecutor source code, no more than max_workers threads are createdCaseose
The issue you are hitting is based on the queue you are building when using ThreadPoolExecutor. @PaulJacobs is correct in the explanation on max number of threads, but the code which executes and adds jobs to the queue will happen very quickly and until the list is exhausted, and put a great deal of pressure on the memory for the process - the large queue filled with the large number of jobs, which are grabbed and executed by available "workers". One solution which uses ThreadPoolExecutor would be to check the length of the jobs queued, and if it is over an acceptable number, await completion.Michelemichelina
Ahh, as per the implementation here, I don't think it can spin more threads than max_workers, correct?Parotic
B
11

It would be much easier to implement this as a thread pool or executor, using either multiprocessing.dummy.Pool, or concurrent.futures.ThreadPoolExecutor (or, if using Python 2.x, the backport futures). For example:

import concurrent

def f(arg):
    print("Started a task. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    print("Done")

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    while True:
        arg = get_task()
        executor.submit(f, arg)

Of course if you can change the pull-model get_task to a push-model get_tasks that, e.g., yields tasks one at a time, this is even simpler:

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    for arg in get_tasks():
        executor.submit(f, arg)

When you run out of tasks (e.g., get_task raises an exception, or get_tasks runs dry), this will automatically tell the executor to stop after it drains the queue, wait for it to stop, and clean up everything.

Baudelaire answered 14/10, 2013 at 22:25 Comment(5)
@san: If you need to understand how to build a thread pool properly, you can look at the multiprocessing and concurrent.futures modules in the source code; they're not that complicated. Or any number of tutorials. But why would you build your own pool implementation when you already have a perfectly good one in the stdlib?Baudelaire
uttmost thanks for the link, I havent spent much time with multiprocessing and concurrent.futures which is why the question came up, I had a look at it, so threading is the base of both these modules...Depict
@san: No, multiprocessing simulates threads with child processes, then builds extra features (like pools, explicit shared data, etc.) on top of it, and also (in multiprocessing.dummy) provides those same extra features for threads. (Not ideal stdlib design, but historical reasons…) futures runs on top of either threading or multiprocessing (depending on which executor you use), providing the same interface either way.Baudelaire
is the ThreadPoolExecutor not working with instance method like self.xxxx ?Machree
This interface is the nicest, but is there a nice way to detect exceptions on the worker function? #33448829Grumble
S
8

Simple and easiest way of limiting max threads using threading.activeCount() method

import threading, time

maxthreads = 10

def do_stuff(i):
    print(i)
    print("Total Active threads are {0}".format(threading.activeCount()))
    time.sleep(20)

count = 0
while True:
    if threading.activeCount() <= maxthreads:
        worker = threading.Thread(target=do_stuff, args=(count,))
        worker.start()
        count += 1
Suppose answered 18/6, 2021 at 1:10 Comment(0)
W
7

I've seen that most commonly written like:

threads = [threading.Thread(target=f) for _ in range(8)]
for thread in threads:
    thread.start()
...
for thread in threads:
    thread.join()

If you want to maintain a fixed-size pool of running threads that process short-lived tasks than ask for new work, consider a solution built around Queues, like "How to wait until only the first thread is finished in Python".

Warnke answered 14/10, 2013 at 21:41 Comment(1)
I updated the question to show that these are rather light short-living tasks and I need to keep grabbing them.Mohun
G
5

concurrent.futures.ThreadPoolExecutor.map

concurrent.futures.ThreadPoolExecutor was mentioned at https://mcmap.net/q/101689/-the-right-way-to-limit-maximum-number-of-threads-running-at-once and here is an example of the map method which is often the most convenient method.

.map() is a parallel version of map(): it reads all the input immediately, then runs tasks in parallel, and returns in the same order as the input.

Usage:

./concurrent_map_exception.py [nproc [min [max]]

concurrent_map_exception.py

import concurrent.futures
import sys
import time

def my_func(i):
    time.sleep((abs(i) % 4) / 10.0)
    return 10.0 / i

def my_get_work(min_, max_):
    for i in range(min_, max_):
        print('my_get_work: {}'.format(i))
        yield i

# CLI.
argv_len = len(sys.argv)
if argv_len > 1:
    nthreads = int(sys.argv[1])
    if nthreads == 0:
        nthreads = None
else:
    nthreads = None
if argv_len > 2:
    min_ = int(sys.argv[2])
else:
    min_ = 1
if argv_len > 3:
    max_ = int(sys.argv[3])
else:
    max_ = 100

# Action.
with concurrent.futures.ProcessPoolExecutor(max_workers=nthreads) as executor:
    for input, output in zip(
        my_get_work(min_, max_),
        executor.map(my_func, my_get_work(min_, max_))
    ):
        print('result: {} {}'.format(input, output))

GitHub upstream.

So for example:

./concurrent_map_exception.py 1 1 5

gives:

my_get_work: 1
my_get_work: 2
my_get_work: 3
my_get_work: 4
my_get_work: 1
result: 1 10.0
my_get_work: 2
result: 2 5.0
my_get_work: 3
result: 3 3.3333333333333335
my_get_work: 4
result: 4 2.5

and:

./concurrent_map_exception.py 2 1 5

gives the same output but runs faster because we now have 2 processes, and:

./concurrent_map_exception.py 1 -5 5

gives:

my_get_work: -5
my_get_work: -4
my_get_work: -3
my_get_work: -2
my_get_work: -1
my_get_work: 0
my_get_work: 1
my_get_work: 2
my_get_work: 3
my_get_work: 4
my_get_work: -5
result: -5 -2.0
my_get_work: -4
result: -4 -2.5
my_get_work: -3
result: -3 -3.3333333333333335
my_get_work: -2
result: -2 -5.0
my_get_work: -1
result: -1 -10.0
my_get_work: 0
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
    return [fn(*args) for args in chunk]
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
    return [fn(*args) for args in chunk]
  File "./concurrent_map_exception.py", line 24, in my_func
    return 10.0 / i
ZeroDivisionError: float division by zero
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./concurrent_map_exception.py", line 52, in <module>
    executor.map(my_func, my_get_work(min_, max_))
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
ZeroDivisionError: float division by zero

So notice how it stops immediately on an exception.

Queue example with error handling

Queue was mentioned at https://mcmap.net/q/101689/-the-right-way-to-limit-maximum-number-of-threads-running-at-once but here is a full example.

Design goals:

  • the input function does not need to be modified
  • limits the number of threads
  • queue sizes follow number of threads closely
  • fetch input input only as needed, not everything upfront
  • if an error happens, optionally stop soon afterwards
  • is an exception is raised on the worker function, show the stack trace clearly

concurrent.futures.ThreadPoolExecutor is the nicest interface currently available in the stdlib that I've seen. However I could not find how to do all of the following:

  • make it perfectly feed input little by little
  • fail immediately on error
  • accept functions with multiple arguments

because:

  • .map(): reads all inputs at once and func can only take on argument
  • .submit(): .shutdown() executes until all futures finish, and there is no blocking .submit() on max current work items. So how to avoid an ugly .cancel() loop over all futures after first failure?

Without further ado, here is my implementation. Test cases follow at the end of the script under __name__ == '__main__':

thread_pool.py

#!/usr/bin/env python3

'''
This file is MIT Licensed because I'm posting it on Stack Overflow:
https://mcmap.net/q/101689/-the-right-way-to-limit-maximum-number-of-threads-running-at-once/55263676#55263676
'''

from typing import Any, Callable, Dict, Iterable, Union
import os
import queue
import sys
import threading
import time
import traceback

class ThreadPoolExitException(Exception):
    '''
    An object of this class may be raised by output_handler_function to
    request early termination.

    It is also raised by submit() if submit_raise_exit=True.
    '''
    pass

class ThreadPool:
    '''
    Start a pool of a limited number of threads to do some work.

    This is similar to the stdlib concurrent, but I could not find
    how to reach all my design goals with that implementation:

    * the input function does not need to be modified
    * limit the number of threads
    * queue sizes closely follow number of threads
    * if an exception happens, optionally stop soon afterwards

    This class form allows to use your own while loops with submit().

    Exit soon after the first failure happens:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_print
    ....

    Sample output:

    ....
    {'i': -9} -1.1111111111111112 None
    {'i': -8} -1.25 None
    {'i': -10} -1.0 None
    {'i': -6} -1.6666666666666667 None
    {'i': -7} -1.4285714285714286 None
    {'i': -4} -2.5 None
    {'i': -5} -2.0 None
    {'i': -2} -5.0 None
    {'i': -3} -3.3333333333333335 None
    {'i': 0} None ZeroDivisionError('float division by zero')
    {'i': -1} -10.0 None
    {'i': 1} 10.0 None
    {'i': 2} 5.0 None
    work_function or handle_output raised:
    Traceback (most recent call last):
    File "thread_pool.py", line 181, in _func_runner
        work_function_return = self.work_function(**work_function_input)
    File "thread_pool.py", line 281, in work_function_maybe_raise
        return 10.0 / i
    ZeroDivisionError: float division by zero
    work_function_input: {'i': 0}
    work_function_return: None
    ....

    Don't exit after first failure, run until end:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_print_no_exit
    ....

    Store results in a queue for later inspection instead of printing immediately,
    then print everything at the end:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_queue
    ....

    Exit soon after the handle_output raise.

    ....
    python3 thread_pool.py 2 -10 20 handle_output_raise
    ....

    Relying on this interface to abort execution is discouraged, this should
    usually only happen due to a programming error in the handler.

    Test that the argument called "thread_id" is passed to work_function and printed:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_print thread_id
    ....

    Test with, ThreadPoolExitException and submit_raise_exit=True, same behaviour handle_output_print
    except for the different exit cause report:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_raise_exit_exception
    ....
    '''
    def __init__(
        self,
        work_function: Callable,
        handle_output: Union[Callable[[Any,Any,Exception],Any],None] = None,
        nthreads: Union[int,None] = None,
        thread_id_arg: Union[str,None] = None,
        submit_raise_exit: bool = False
    ):
        '''
        Start in a thread pool immediately.

        join() must be called afterwards at some point.

        :param work_function: main work function to be evaluated.
        :param handle_output: called on work_function return values as they
            are returned.

            The function signature is:

            ....
            handle_output(
                work_function_input: Union[Dict,None],
                work_function_return,
                work_function_exception: Exception
            ) -> Union[Exception,None]
            ....

            where work_function_exception the exception that work_function raised,
            or None otherwise

            The first non-None return value of a call to this function is returned by
            submit(), get_handle_output_result() and join().

            The intended semantic for this, is to return:

            *   on success:
            ** None to continue execution
            ** ThreadPoolExitException() to request stop execution
            * if work_function_input or work_function_exception raise:
            ** the exception raised

            The ThreadPool user can then optionally terminate execution early on error
            or request with either:

            * an explicit submit() return value check + break if a submit loop is used
            * `with` + submit_raise_exit=True

            Default: a handler that just returns `exception`, which can normally be used
            by the submit loop to detect an error and exit immediately.
        :param nthreads: number of threads to use. Default: nproc.
        :param thread_id_arg: if not None, set the argument of work_function with this name
            to a 0-indexed thread ID. This allows function calls to coordinate
            usage of external resources such as files or ports.
        :param submit_raise_exit: if True, submit() raises ThreadPoolExitException() if
            get_handle_output_result() is not None.
        '''
        self.work_function = work_function
        if handle_output is None:
            handle_output = lambda input, output, exception: exception
        self.handle_output = handle_output
        if nthreads is None:
            nthreads = len(os.sched_getaffinity(0))
        self.thread_id_arg = thread_id_arg
        self.submit_raise_exit = submit_raise_exit
        self.nthreads = nthreads
        self.handle_output_result = None
        self.handle_output_result_lock = threading.Lock()
        self.in_queue = queue.Queue(maxsize=nthreads)
        self.threads = []
        for i in range(self.nthreads):
            thread = threading.Thread(
                target=self._func_runner,
                args=(i,)
            )
            self.threads.append(thread)
            thread.start()

    def __enter__(self):
        '''
        __exit__ automatically calls join() for you.

        This is cool because it automatically ends the loop if an exception occurs.

        But don't forget that errors may happen after the last submit was called, so you
        likely want to check for that with get_handle_output_result() after the with.
        '''
        return self

    def __exit__(self, exception_type, exception_value, exception_traceback):
        self.join()
        return exception_type is ThreadPoolExitException

    def _func_runner(self, thread_id):
        while True:
            work_function_input = self.in_queue.get(block=True)
            if work_function_input is None:
                break
            if self.thread_id_arg is not None:
                work_function_input[self.thread_id_arg] = thread_id
            try:
                work_function_exception = None
                work_function_return = self.work_function(**work_function_input)
            except Exception as e:
                work_function_exception = e
                work_function_return = None
            handle_output_exception = None
            try:
                handle_output_return = self.handle_output(
                    work_function_input,
                    work_function_return,
                    work_function_exception
                )
            except Exception as e:
                handle_output_exception = e
            handle_output_result = None
            if handle_output_exception is not None:
                handle_output_result = handle_output_exception
            elif handle_output_return is not None:
                handle_output_result = handle_output_return
            if handle_output_result is not None and self.handle_output_result is None:
                with self.handle_output_result_lock:
                    self.handle_output_result = (
                        work_function_input,
                        work_function_return,
                        handle_output_result
                    )
            self.in_queue.task_done()

    @staticmethod
    def exception_traceback_string(exception):
        '''
        Helper to get the traceback from an exception object.
        This is usually what you want to print if an error happens in a thread:
        https://mcmap.net/q/45247/-catch-and-print-full-python-exception-traceback-without-halting-exiting-the-program/56199295#56199295
        '''
        return ''.join(traceback.format_exception(
            None, exception, exception.__traceback__)
        )

    def get_handle_output_result(self):
        '''
        :return: if a handle_output call has raised previously, return a tuple:

            ....
            (work_function_input, work_function_return, exception_raised)
            ....

            corresponding to the first such raise.

            Otherwise, if a handle_output returned non-None, a tuple:

            (work_function_input, work_function_return, handle_output_return)

            Otherwise, None.
        '''
        return self.handle_output_result

    def join(self):
        '''
        Request all threads to stop after they finish currently submitted work.

        :return: same as get_handle_output_result()
        '''
        for thread in range(self.nthreads):
            self.in_queue.put(None)
        for thread in self.threads:
            thread.join()
        return self.get_handle_output_result()

    def submit(
        self,
        work_function_input: Union[Dict,None] =None
    ):
        '''
        Submit work. Block if there is already enough work scheduled (~nthreads).

        :return: the same as get_handle_output_result
        '''
        handle_output_result = self.get_handle_output_result()
        if handle_output_result is not None and self.submit_raise_exit:
            raise ThreadPoolExitException()
        if work_function_input is None:
            work_function_input = {}
        self.in_queue.put(work_function_input)
        return handle_output_result

if __name__ == '__main__':
    def get_work(min_, max_):
        '''
        Generate simple range work for work_function.
        '''
        for i in range(min_, max_):
            yield {'i': i}

    def work_function_maybe_raise(i):
        '''
        The main function that will be evaluated.

        It sleeps to simulate an IO operation.
        '''
        time.sleep((abs(i) % 4) / 10.0)
        return 10.0 / i

    def work_function_get_thread(i, thread_id):
        time.sleep((abs(i) % 4) / 10.0)
        return thread_id

    def handle_output_print(input, output, exception):
        '''
        Print outputs and exit immediately on failure.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))
        return exception

    def handle_output_print_no_exit(input, output, exception):
        '''
        Print outputs, don't exit on failure.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))

    out_queue = queue.Queue()
    def handle_output_queue(input, output, exception):
        '''
        Store outputs in a queue for later usage.
        '''
        global out_queue
        out_queue.put((input, output, exception))
        return exception

    def handle_output_raise(input, output, exception):
        '''
        Raise if input == 0, to test that execution
        stops nicely if this raises.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))
        if input['i'] == 0:
            raise Exception

    def handle_output_raise_exit_exception(input, output, exception):
        '''
        Return a ThreadPoolExitException() if input == -5.
        Return the work_function exception if it raised.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))
        if exception:
            return exception
        if output == 10.0 / -5:
            return ThreadPoolExitException()

    # CLI arguments.
    argv_len = len(sys.argv)
    if argv_len > 1:
        nthreads = int(sys.argv[1])
        if nthreads == 0:
            nthreads = None
    else:
        nthreads = None
    if argv_len > 2:
        min_ = int(sys.argv[2])
    else:
        min_ = 1
    if argv_len > 3:
        max_ = int(sys.argv[3])
    else:
        max_ = 100
    if argv_len > 4:
        handle_output_funtion_string = sys.argv[4]
    else:
        handle_output_funtion_string = 'handle_output_print'
    handle_output = eval(handle_output_funtion_string)
    if argv_len > 5:
        work_function = work_function_get_thread
        thread_id_arg = sys.argv[5]
    else:
        work_function = work_function_maybe_raise
        thread_id_arg = None

    # Action.
    if handle_output is handle_output_raise_exit_exception:
        # `with` version with implicit join and submit raise
        # immediately when desired with ThreadPoolExitException.
        #
        # This is the more safe and convenient and DRY usage if
        # you can use `with`, so prefer it generally.
        with ThreadPool(
            work_function,
            handle_output,
            nthreads,
            thread_id_arg,
            submit_raise_exit=True
        ) as my_thread_pool:
            for work in get_work(min_, max_):
                my_thread_pool.submit(work)
        handle_output_result = my_thread_pool.get_handle_output_result()
    else:
        # Explicit error checking in submit loop to exit immediately
        # on error.
        my_thread_pool = ThreadPool(
            work_function,
            handle_output,
            nthreads,
            thread_id_arg,
        )
        for work_function_input in get_work(min_, max_):
            handle_output_result = my_thread_pool.submit(work_function_input)
            if handle_output_result is not None:
                break
        handle_output_result = my_thread_pool.join()
    if handle_output_result is not None:
        work_function_input, work_function_return, exception = handle_output_result
        if type(exception) is ThreadPoolExitException:
            print('Early exit requested by handle_output with ThreadPoolExitException:')
        else:
            print('work_function or handle_output raised:')
            print(ThreadPool.exception_traceback_string(exception), end='')
        print('work_function_input: {!r}'.format(work_function_input))
        print('work_function_return: {!r}'.format(work_function_return))
    if handle_output == handle_output_queue:
        while not out_queue.empty():
            print(out_queue.get())

GitHub upstream.

Tested in Python 3.7.3.

Grumble answered 20/3, 2019 at 14:50 Comment(0)
B
3

it can be achieved easily using ThreadPoolExecutor. Change the limit by using the max_workers argument.

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=10)


def thread(num):
    print(num)
    time.sleep(3)


for n in range(0, 1000):
    pool.submit(thread, n)

pool.shutdown(wait=True)
Bunsen answered 27/11, 2021 at 20:32 Comment(0)
B
2

This could be done with a Semaphore Object. A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().

A short example shows for a maximum of 5 threads in parallel, that one half of the threads are executed instantly and the others are blocked and wait:

import threading
import time

maxthreads = 5
pool_sema = threading.Semaphore(value=maxthreads)
threads = list()

def task(i):
    pool_sema.acquire()
    try:
        print("executed {}. thread".format(i))
        time.sleep(2)
    except Exception as e:
        print("Error: problem with {0}. thread.\nMessage:{1}".format(i, e))
    finally:
        pool_sema.release()

def create_threads(number_of_threads):
    try:
        for i in range(number_of_threads):
            thread = threading.Thread(target=task,args=(str(i)))
            threads.append(thread)
            thread.start()
    except Exception as e:
        print("Error: unable to start thread {}".format(e))

if __name__ == '__main__':
    create_threads(10)

output

executed 0. thread
executed 1. thread
executed 2. thread
executed 3. thread
executed 4. thread
executed 5. thread
executed 6. thread
executed 7. thread
executed 8. thread
executed 9. thread

For those who prefer to use list comprehension based on an input list:

import threading
import time

maxthreads = 5
pool_sema = threading.Semaphore(value=maxthreads)

def task(i):
    pool_sema.acquire()
    try:
        print("executed {}. thread".format(i))
        time.sleep(2)
    except Exception as e:
        print("Error: problem with {0}. thread.\nMessage:{1}".format(i, e))
    finally:
        pool_sema.release()

def create_threads(number_of_threads):
    try:
        threads = [threading.Thread(target=task, args=(str(i))) for i in range(number_of_threads)]
        [t.start() for t in threads]
    except Exception as e:
        print("Error: unable to start thread {}".format(e))
    finally:
        [t.join() for t in threads]

if __name__ == '__main__':
    create_threads(10)
Boatman answered 28/2, 2021 at 19:30 Comment(2)
creating all thread and then running 1 by 1 is not a good way on big data.Hubbell
Ahh, I was under the impression that thread.start will make the thread start doing it's computation wrt the targeted function.Parotic
M
1

To apply limitation on threads creation, follow this example:

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread():
    try:
        for i in range(1, 555):  # trying to spawn 555 threads.
            thread = threading.Thread(target=some_process, args=(i,))
            thread.start()

            if threading.active_count() == 100:  # set maximum threads.
                thread.join()

            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread()

Or:

Another way to set a thread number checker mutex/lock such as below example:

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        # print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread2(number_of_desire_thread ):
    try:
        for i in range(1, 555):
            thread = threading.Thread(target=some_process, args=(i,)).start()

            while number_of_desire_thread <= threading.active_count():
                '''mutex for avoiding to additional thread creation.'''
                pass

            print 'unlock'
            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread2(100)
Marmara answered 23/1, 2018 at 8:46 Comment(0)
S
-1

implement threading count

 if threading.active_count() > 10:
            sleep(1)
            continue

example

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S")
    

    threads = []
    logging.info("Start The Program!!!")

    for i in range(0, len(df)):
        #limit thread to 10
        if threading.active_count() > 10:
            sleep(1)
            continue

        email = (df['Email'][i])
        nama_lengkap = (df['Nama Lengkap'][i])
        posisi = (df['Posisi'][i])
        x = threading.Thread(target=main_program, args=(email, nama_lengkap, posisi,))
        print(f"Thread {i} started")
        threads.append(x)
        x.start()
        # break

    for index, thread in enumerate(threads):
        thread.join()

Systematism answered 4/1, 2023 at 11:59 Comment(0)
R
-2

Use the following Python Code

import threading
import time

# Set this global variable to maximum number of threads you want to execute
MAX_THREAD_COUNT = 30

#Sample program which will run in thread. This is place holder. Add your code to this program, to be executed in the thread
def sampleThreadProgram():
    time.sleep(1)

#main program
def main():
    #Sample loop, replace it with your loop to execute threads. Currently it 
    #will execute 1000 times the above placeholder function in multiple 
    #threads, in parallel of MAX_THREAD_COUNT threads

    for x in range(1000):
        print('x is ' + str(x))
        
        #Sleep and wait for 1 second and wait while some of the running threads
        #in MAX_THREAD_COUNT range are completed before raising new threads
        while len(threading.enumerate())>=MAX_THREAD_COUNT:
            print('running threads are' + str(len(threading.enumerate())))
            time.sleep(1)
        
        # Start new thread since now number of threads are less than MAX_THREAD_COUNT
        t1 = threading.Thread(target=sampleThreadProgram, args=())
        t1.start()

main()
Rosetterosewall answered 7/10, 2022 at 4:46 Comment(2)
Your answer could be improved by adding more information on what the code does and how it helps the OP.Hippie
Thanks @Tyler2P. I have added comments to the program. Please have a look again and let me know, if it makes sense.Rosetterosewall

© 2022 - 2025 — McMap. All rights reserved.