`DummyExecutor` for Python's `futures`
Asked Answered
S

3

25

Python's futures package allows us to enjoy ThreadPoolExecutor and ProcessPoolExecutor for doing tasks in parallel.

However, for debugging it is sometimes useful to temporarily replace the true parallelism with a dummy one, which carries out the tasks in a serial way in the main thread, without spawning any threads or processes.

Is there anywhere an implementation of a DummyExecutor?

Savour answered 3/5, 2012 at 15:31 Comment(3)
@mata I don't think so, that would create one thread which will still be separate from the main thread.Savour
of course you're right. but then it should't be too complicated to implement an Executor which on submit directly calls the callable and returns a Future object. A look a the ThreadPoolExecutor might helpSallie
It always looks simple before you do it, but not always after you do it. If someone already implemented this, it's much preferable that I use their ready implementation.Savour
S
24

Something like this should do it:

from concurrent.futures import Future, Executor
from threading import Lock


class DummyExecutor(Executor):

    def __init__(self):
        self._shutdown = False
        self._shutdownLock = Lock()

    def submit(self, fn, *args, **kwargs):
        with self._shutdownLock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = Future()
            try:
                result = fn(*args, **kwargs)
            except BaseException as e:
                f.set_exception(e)
            else:
                f.set_result(result)

            return f

    def shutdown(self, wait=True):
        with self._shutdownLock:
            self._shutdown = True


if __name__ == '__main__':

    def fnc(err):
        if err:
            raise Exception("test")
        else:
            return "ok"

    ex = DummyExecutor()
    print(ex.submit(fnc, True))
    print(ex.submit(fnc, False))
    ex.shutdown()
    ex.submit(fnc, True) # raises exception

locking is probably not needed in this case, but can't hurt to have it.

Sallie answered 3/5, 2012 at 17:57 Comment(0)
E
4

Use this to mock your ThreadPoolExecutor

class MockThreadPoolExecutor():
    def __init__(self, **kwargs):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def submit(self, fn, *args, **kwargs):
        # execute functions in series without creating threads
        # for easier unit testing
        result = fn(*args, **kwargs)
        return result

    def shutdown(self, wait=True):
        pass

if __name__ == "__main__":
    def sum(a, b):
        return a + b

    with MockThreadPoolExecutor(max_workers=3) as executor:
        future_result = list()
        for i in range(5):
            future_result.append(executor.submit(sum, i + 1, i + 2))
Eloign answered 7/2, 2020 at 8:6 Comment(2)
This is a good solution, but I'm using this to test code then does something with the Future objects returned by ThreadPoolExecutor.submit. My solution was to also add a small MockFuture class with a result method, and have submit return an instance of that rather than the result directly.Feil
Coming back to this almost two years later to say that rather than creating a MockFuture class, you can have MockThreadPoolExecutor.submit create a real futures.Future object, call new_future.set_result, and then return it, as suggested here: bugs.python.org/issue36395 Some library functions, such as concurrent.futures.as_completed, look at the internals of the futures, so it can be easier to use the real class than try to mock it.Feil
N
0

Pools from concurrent.futures package are eager (which you of course want and which means they pick up calculations as soon as possible - some time between pool.submit() call and associated future.result() method returns).

From perspective of synchronous code you have two choices - either calculate tasks result on pool.submit() call, or future.result() retrieval.

I find second approach more natural since it better imitates "nonblocking" nature of pool.map() from perspective of main thread - results can be obtained one after another as soon as they done calculating (rather then when they are all ready).

Here is my code (additional implementation of DummyFuture is needed):

from concurrent.futures import Executor
from threading import Lock
from functools import partial


class DummyFuture():
    def __init__(self, calculation) -> None:
        self.calculation = calculation
    def result(self, timeout=None):
        return self.calculation()
    def cancel(self):
        pass


class DummyExecutor(Executor):

    def __init__(self):
        self._shutdown = False
        self._shutdown_lock = Lock()

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            return DummyFuture(partial(fn, *args, **kwargs))

    def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            self._shutdown = True

(I have to give credit to mata's answer since it was initial start of my implementation.)

Nacred answered 9/10, 2023 at 17:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.