Threading pool similar to the multiprocessing Pool?
Asked Answered
E

11

421

Is there a Pool class for worker threads, similar to the multiprocessing module's Pool class?

I like for example the easy way to parallelize a map function

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

however I would like to do it without the overhead of creating new processes.

I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.

Do I have to write my own threading pool?

Examine answered 13/6, 2010 at 21:17 Comment(4)
Here's something that looks promising over in the Python Cookbook: Recipe 576519: Thread pool with same API as (multi)processing.Pool (Python)Evolutionist
Nowadays it's built-in: from multiprocessing.pool import ThreadPool.Cochin
Can you elaborate on this I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call. ?Cheshire
@Cheshire stackoverflow.com/questions/1294382Dys
E
546

I just found out that there actually is a thread-based Pool interface in the multiprocessing module, however it is hidden somewhat and not properly documented.

It can be imported via

from multiprocessing.pool import ThreadPool

It is implemented using a dummy Process class wrapping a python thread. This thread-based Process class can be found in multiprocessing.dummy which is mentioned briefly in the docs. This dummy module supposedly provides the whole multiprocessing interface based on threads.

Examine answered 2/8, 2010 at 9:52 Comment(16)
That's awesome. I had a problem creating ThreadPools outside the main thread, you can use them from a child thread once created though. I put an issue in for it: bugs.python.org/issue10015Merla
I don't get it why this class has no documentation. Such helper classes are so important nowadays.Reciprocation
@Wernight: it isn't public primarily because nobody has offered a patch that provides it (or something similar) as threading.ThreadPool, including documentation and tests. It would indeed be a good battery to include in the standard library, but it won't happen if nobody writes it. One nice advantage of this existing implementation in multiprocessing, is that it should make any such threading patch much easier to write (docs.python.org/devguide)Playmate
@Playmate — I am not sure what you mean by “include in the standard library” since ThreadPool is already part of the standard library, and it does not sound like @Reciprocation is asking for it to me moved from its current home to threading or anywhere else. I think he just wants it mentioned in the Standard Library documentation?Calamint
@brandon-rhodes I'd missed that ThreadPool was actually listed in multiprocessing.pool.__all__. So my comment can be adjusted to "because nobody has written docs for it".Playmate
while this seems to work as API, I don't understand why I don't see nearly as much CPU utilisation as with the normal Pool :(. I am not using any shared memory other and an int returned by the threadPlotkin
@CiprianTomoiaga: You're being bitten by the GIL. CPython (the reference interpreter) can only execute bytecode in a single thread at a time. The GIL is released for blocking operations (lock acquisition, I/O), and in some third party extension modules that do heavy CPU work on non-Python types internally, but aside from that, CPU bound code doesn't benefit from threading at all; on Python 2, it often gets significantly slower, because the majority of the time is spent on lock contention. Py3 wastes less, but still can't benefit from more than one core.Sworn
why is it called dummy?Saphead
@Saphead The implementation uses a DummyProcess, which is a subclass of threading.Thread and not really a separate process. I think the idea is to provide the interface of multiprocessing without actually using multiple processes.Federalism
The best practice, if I understand correctly, is to use from multiprocessing.dummy import Pool. This is actually the ThreadingPool. This way you can easily switch between the actual multi-process pool and the threading pool. @CiprianTomoiaga, @Sworn gave a good answer - bottom line, simply use a multiprocessing pool for your case. If you are not passing heavy objects back and forth this is a good choice.Nitramine
This is NOT a thread pool. It's a process pool, with an interface "similar to a thread pool", as the docs say.Bellinzona
@daniel.gindi: multiprocessing.dummy.Pool/multiprocessing.pool.ThreadPool are the same thing, and are both thread pools. They mimic the interface of a process pool, but they are implemented entirely in terms of threading. Reread the docs, you got it backwards.Sworn
@Sworn have you read the source code? Or the docs? "multiprocessing is a package that supports spawning processes using an API similar to the threading module". It's the same way in the source code.Bellinzona
@daniel.gindi: Read further: "multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module." multiprocessing in general is about processes, but to allow switching between processes and threads, they (mostly) replicated the multiprocessing API in multiprocessing.dummy, but backed with threads, not processes. The goal is to allow you to do import multiprocessing.dummy as multiprocessing to change process-based code to thread-based.Sworn
this works fine in my local machine however on google cloud it raises an exceptionGraphology
Note that this pool is leaky: each time a ThreadPool(n) is created, 2n threads are added to the process forever. (maybe that's the reason it is hidden?)Dependency
W
281

In Python 3 you can use concurrent.futures.ThreadPoolExecutor, i.e.:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

See the docs for more info and examples.

Wun answered 17/7, 2012 at 19:42 Comment(6)
in order to use the backported futures module, run sudo pip install futuresDoll
it's the most efficient and fastest way for multi processingBadge
What is the difference between using ThreadPoolExecutor and multiprocessing.dummy.Pool?Galitea
concurrent.futures is as of the time of Python 3.9 / beginning of 3.10 is a very problematic library. It looks like it's overrun by bugs that aren't getting proper fixes. Perhaps, the whole premise of this library was bad. I'm more familiar with the process-based part of this library, where there's no end to reasons why the pool would hang up forever, swallow errors and misbehave in other ways. I would stay away from this library as much as possible.Fortyfive
Some gotchas and limitations of concurrent.futures are described here: github.com/yeraydiazdiaz/futureproof#readmeAurlie
For those interested in the difference between ThreadPoolExecutor and ThreadPool, a recent SO answer had a great explanation: https://mcmap.net/q/87226/-what-is-the-difference-between-threadpoolexecutor-and-threadpoolChengteh
B
74

Yes, and it seems to have (more or less) the same API.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
Brinkley answered 30/11, 2012 at 19:42 Comment(3)
Import path for ThreadPool is different from Pool. Correct import is from multiprocessing.pool import ThreadPool.Crusted
Strangely this is not a documented API, and multiprocessing.pool is only briefly mentioned as providing AsyncResult. But it is available in 2.x and 3.x.Supersaturated
This is what I was looking for. It's just a single import line and a small change to my existing pool line and it works perfectly.Succinylsulfathiazole
P
51

For something very simple and lightweight (slightly modified from here):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

To support callbacks on task completion you can just add the callback to the task tuple.

Principle answered 31/8, 2011 at 13:23 Comment(4)
how can the threads ever join if they unconditionally infinite loop?Negress
@JosephGarvin I've tested it, and the threads keep blocking on an empty queue(since the call to Queue.get() is blocking) till the program ends, after which they are terminated automatically.Bernettabernette
@JosephGarvin, good question. Queue.join() will actually join the task queue, not worker threads. So, when queue is empty, wait_completion returns, program ends, and threads are reaped by the OS.Rockbottom
If all of this code is wrapped up into a neat function it doesn't seem to be stopping threads even when the queue is empty and pool.wait_completion() returns. The result is that threads just keep building.Madelyn
S
26

Hi to use the thread pool in Python you can use this library :

from multiprocessing.dummy import Pool as ThreadPool

and then for use, this library do like that :

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

The threads are the number of threads that you want and tasks are a list of task that most map to the service.

Stapler answered 28/7, 2017 at 4:58 Comment(3)
Thanks, that is a great suggestion! From the docs: multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module. One correction - I think you want to say that the pool api is (function,iterable)Inpour
We missed the .close() and .join() calls and that causes .map() to finish before all the threads are finished. Just a warning.Irritation
Great solution ,super elegant!Discomposure
E
17

Yes, there is a threading pool similar to the multiprocessing Pool, however, it is hidden somewhat and not properly documented. You can import it by following way:-

from multiprocessing.pool import ThreadPool

Just I show you simple example

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result) 
Earldom answered 15/10, 2020 at 14:33 Comment(1)
imo this should be the accepted answerSporades
B
14

Here's the result I finally ended up using. It's a modified version of the classes by dgorissen above.

File: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

To use the pool

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()
Bernettabernette answered 10/5, 2018 at 5:4 Comment(5)
Annotion for other readers: This code is Python 3 (shebang #!/usr/bin/python3)Drippy
Why do you use for i, d in enumerate(delays): and then ignore the i value?Cochin
@Cochin - probably just a relic from development where they probably wanted to print i during a run.Ulberto
Why is create_task there? What is it for?Meninges
I can't believe and answer with 4 votes on SO is the way to do ThreadPooling in Python. The Threadpool in the official python distribution is still broken? What am I missing?Meninges
C
7

another way can be adding the process to thethread queue pool

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)
Covarrubias answered 15/6, 2020 at 20:28 Comment(0)
E
4

The overhead of creating the new processes is minimal, especially when it's just 4 of them. I doubt this is a performance hot spot of your application. Keep it simple, optimize where you have to and where profiling results point to.

Electronic answered 13/6, 2010 at 22:24 Comment(1)
If the questioner is under Windows (which I do not believe he specified), then I think that process spinup can be a significant expense. At least it is on the projects that I have been recently doing. :-)Calamint
H
3

There is no built in thread based pool. However, it can be very quick to implement a producer/consumer queue with the Queue class.

From: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
Hadwyn answered 13/6, 2010 at 21:30 Comment(3)
This is no longer the case with the concurrent.futures module.Miramirabeau
I don't think this is true at all anymore. from multiprocessing.pool import ThreadPoolBlen
The multiprocessing.pool.ThreadPool is not documented as its implementation has never been completed. It lacks tests and documentation.Meninges
C
0

If you don't mind executing other's code, here's mine:

Note: There is lot of extra code you may want to remove [added for better clarificaiton and demonstration how it works]

Note: Python naming conventions were used for method names and variable names instead of camelCase.

Working procedure:

  1. MultiThread class will initiate with no of instances of threads by sharing lock, work queue, exit flag and results.
  2. SingleThread will be started by MultiThread once it creates all instances.
  3. We can add works using MultiThread (It will take care of locking).
  4. SingleThreads will process work queue using a lock in middle.
  5. Once your work is done, you can destroy all threads with shared boolean value.
  6. Here, work can be anything. It can automatically import (uncomment import line) and process module using given arguments.
  7. Results will be added to results and we can get using get_results

Code:

import threading
import queue


class SingleThread(threading.Thread):
    def __init__(self, name, work_queue, lock, exit_flag, results):
        threading.Thread.__init__(self)
        self.name = name
        self.work_queue = work_queue
        self.lock = lock
        self.exit_flag = exit_flag
        self.results = results

    def run(self):
        # print("Coming %s with parameters %s", self.name, self.exit_flag)
        while not self.exit_flag:
            # print(self.exit_flag)
            self.lock.acquire()
            if not self.work_queue.empty():
                work = self.work_queue.get()
                module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
                self.lock.release()
                print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
                # module = __import__(module_name)
                result = str(getattr(module, operation)(*args, **kwargs))
                print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
                self.results.append(result)
            else:
                self.lock.release()
        # process_work_queue(self.work_queue)

class MultiThread:
    def __init__(self, no_of_threads):
        self.exit_flag = bool_instance()
        self.queue_lock = threading.Lock()
        self.threads = []
        self.work_queue = queue.Queue()
        self.results = []
        for index in range(0, no_of_threads):
            thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
            thread.start()
            self.threads.append(thread)

    def add_work(self, work):
        self.queue_lock.acquire()
        self.work_queue._put(work)
        self.queue_lock.release()

    def destroy(self):
        self.exit_flag.value = True
        for thread in self.threads:
            thread.join()

    def get_results(self):
        return self.results


class Work:
    def __init__(self, module, operation, args, kwargs={}):
        self.module = module
        self.operation = operation
        self.args = args
        self.kwargs = kwargs


class SimpleOperations:
    def sum(self, *args):
        return sum([int(arg) for arg in args])

    @staticmethod
    def mul(a, b, c=0):
        return int(a) * int(b) + int(c)


class bool_instance:
    def __init__(self, value=False):
        self.value = value

    def __setattr__(self, key, value):
        if key != "value":
            raise AttributeError("Only value can be set!")
        if not isinstance(value, bool):
            raise AttributeError("Only True/False can be set!")
        self.__dict__[key] = value
        # super.__setattr__(key, bool(value))

    def __bool__(self):
        return self.value

if __name__ == "__main__":
    multi_thread = MultiThread(5)
    multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
    while True:
        data_input = input()
        if data_input == "":
            pass
        elif data_input == "break":
            break
        else:
            work = data_input.split()
            multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
    multi_thread.destroy()
    print(multi_thread.get_results())
Cutlery answered 3/12, 2020 at 13:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.