How to increment a shared counter from multiple processes?
Asked Answered
C

8

102

I am having troubles with the multiprocessing module. I am using a Pool of workers with its map method to concurrently analyze lots of files. Each time a file has been processed I would like to have a counter updated so that I can keep track of how many files remains to be processed. Here is sample code:

import os
import multiprocessing

counter = 0


def analyze(file):
    # Analyze the file.
    global counter
    counter += 1
    print counter


if __name__ == '__main__':
    files = os.listdir('/some/directory')
    pool = multiprocessing.Pool(4)
    pool.map(analyze, files)

I cannot find a solution for this.

Cousins answered 17/1, 2010 at 10:26 Comment(0)
F
93

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()
Flexuous answered 17/1, 2010 at 10:29 Comment(7)
Great answer! I had the same problem in IronPython, and while multiprocessing.Value is not available you can do something similar with clr.Reference and System.Threading.Interlocked: #2255961Meaning
@jkp, how would you do it without the global variable? - I'm trying to use a class, but it is not as easy as it seems. See #1817458Rhesus
Unfortunately, this example appears to be flawed, since counter.value += 1 is not atomic between processes, so the value will be wrong if run long enough with a few processesCacology
In line with what Eli said, a Lock must surround the counter value += 1 statement. See #1233722Tillion
Note that it should be with counter.get_lock(), not with counter.value.get_lock():Tanyatanzania
@jkp, as @Jinghao-shi said, counter.value.get_lock() will produce AttributeError: 'int' object has no attribute 'get_lock'Cumulous
It's easier to just output a list of returns from each worker and then count the list after the factSchriever
C
50

Counter class without the race-condition bug:

class Counter(object):
    def __init__(self):
        self.val = multiprocessing.Value('i', 0)

    def increment(self, n=1):
        with self.val.get_lock():
            self.val.value += n

    @property
    def value(self):
        return self.val.value
Cerelia answered 10/2, 2014 at 15:42 Comment(3)
For similar code that works with joblibs Parallel (the code in this answer does not work with joblib), see github.com/davidheryanto/etc/blob/master/python-recipes/…Cappadocia
I'd also add return self to the increment function to enable chainingKindliness
A similar answer includes the lock also in the value function and uses RawValue instead of Value. Do you think thst is needed? https://mcmap.net/q/212203/-how-to-make-a-thread-safe-global-counter-in-pythonCower
S
18

A extremly simple example, changed from jkp's answer:

from multiprocessing import Pool, Value
from time import sleep

counter = Value('i', 0)
def f(x):
    global counter
    with counter.get_lock():
        counter.value += 1
    print("counter.value:", counter.value)
    sleep(1)
    return x

with Pool(4) as p:
    r = p.map(f, range(1000*1000))
Synaeresis answered 6/10, 2019 at 11:7 Comment(1)
I tested your code. Counter is not shared. Each process has its own counter.Takao
H
7

Faster Counter class without using the built-in lock of Value twice

class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.RawValue('i', initval)
        self.lock = multiprocessing.Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1

    @property
    def value(self):
        return self.val.value

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

Hypothec answered 29/11, 2017 at 22:0 Comment(2)
Basically the same with Value with lock=True, but this code is more clear.Camillecamilo
A similar answer includes the lock also in the value function: https://mcmap.net/q/212203/-how-to-make-a-thread-safe-global-counter-in-pythonCower
S
0

Here is a solution to your problem based on a different approach from that proposed in the other answers. It uses message passing with multiprocessing.Queue objects (instead of shared memory with multiprocessing.Value objects) and process-safe (atomic) built-in increment and decrement operators += and -= (instead of introducing custom increment and decrement methods) since you asked for it.

First, we define a class Subject for instantiating an object that will be local to the parent process and whose attributes are to be incremented or decremented:

import multiprocessing


class Subject:

    def __init__(self):
        self.x = 0
        self.y = 0

Next, we define a class Proxy for instantiating an object that will be the remote proxy through which the child processes will request the parent process to retrieve or update the attributes of the Subject object. The interprocess communication will use two multiprocessing.Queue attributes, one for exchanging requests and one for exchanging responses. Requests are of the form (sender, action, *args) where sender is the sender name, action is the action name ('get', 'set', 'increment', or 'decrement' the value of an attribute), and args is the argument tuple. Responses are of the form value (to 'get' requests):

class Proxy(Subject):

    def __init__(self, request_queue, response_queue):
        self.__request_queue = request_queue
        self.__response_queue = response_queue

    def _getter(self, target):
        sender = multiprocessing.current_process().name
        self.__request_queue.put((sender, 'get', target))
        return Decorator(self.__response_queue.get())

    def _setter(self, target, value):
        sender = multiprocessing.current_process().name
        action = getattr(value, 'action', 'set')
        self.__request_queue.put((sender, action, target, value))

    @property
    def x(self):
        return self._getter('x')

    @property
    def y(self):
        return self._getter('y')

    @x.setter
    def x(self, value):
        self._setter('x', value)

    @y.setter
    def y(self, value):
        self._setter('y', value)

Then, we define the class Decorator to decorate the int objects returned by the getters of a Proxy object in order to inform its setters whether the increment or decrement operators += and -= have been used by adding an action attribute, in which case the setters request an 'increment' or 'decrement' operation instead of a 'set' operation. The increment and decrement operators += and -= call the corresponding augmented assignment special methods __iadd__ and __isub__ if they are defined, and fall back on the assignment special methods __add__ and __sub__ which are always defined for int objects (e.g. proxy.x += value is equivalent to proxy.x = proxy.x.__iadd__(value) which is equivalent to proxy.x = type(proxy).x.__get__(proxy).__iadd__(value) which is equivalent to type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy).__iadd__(value))):

class Decorator(int):

    def __iadd__(self, other):
        value = Decorator(other)
        value.action = 'increment'
        return value

    def __isub__(self, other):
        value = Decorator(other)
        value.action = 'decrement'
        return value

Then, we define the function worker that will be run in the child processes and request the increment and decrement operations:

def worker(proxy):
    proxy.x += 1
    proxy.y -= 1

Finally, we define a single request queue to send requests to the parent process, and multiple response queues to send responses to the child processes:

if __name__ == '__main__':
    subject = Subject()
    request_queue = multiprocessing.Queue()
    response_queues = {}
    processes = []
    for index in range(4):
        sender = 'child {}'.format(index)
        response_queues[sender] = multiprocessing.Queue()
        proxy = Proxy(request_queue, response_queues[sender])
        process = multiprocessing.Process(
            target=worker, args=(proxy,), name=sender)
        processes.append(process)
    running = len(processes)
    for process in processes:
        process.start()
    while subject.x != 4 or subject.y != -4:
        sender, action, *args = request_queue.get()
        print(sender, 'requested', action, *args)
        if action == 'get':
            response_queues[sender].put(getattr(subject, args[0]))
        elif action == 'set':
            setattr(subject, args[0], args[1])
        elif action == 'increment':
            setattr(subject, args[0], getattr(subject, args[0]) + args[1])
        elif action == 'decrement':
            setattr(subject, args[0], getattr(subject, args[0]) - args[1])
    for process in processes:
        process.join()

The program is guaranteed to terminate when += and -= are process-safe. If you remove process-safety by commenting the corresponding __iadd__ or __isub__ of Decorator then the program will only terminate by chance (e.g. proxy.x += value is equivalent to proxy.x = proxy.x.__iadd__(value) but falls back to proxy.x = proxy.x.__add__(value) if __iadd__ is not defined, which is equivalent to proxy.x = proxy.x + value which is equivalent to proxy.x = type(proxy).x.__get__(proxy) + value which is equivalent to type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy) + value), so the action attribute is not added and the setter requests a 'set' operation instead of an 'increment' operation).

Example process-safe session (atomic += and -=):

child 0 requested get x
child 0 requested increment x 1
child 0 requested get y
child 0 requested decrement y 1
child 3 requested get x
child 3 requested increment x 1
child 3 requested get y
child 2 requested get x
child 3 requested decrement y 1
child 1 requested get x
child 2 requested increment x 1
child 2 requested get y
child 2 requested decrement y 1
child 1 requested increment x 1
child 1 requested get y
child 1 requested decrement y 1

Example process-unsafe session (non-atomic += and -=):

child 2 requested get x
child 1 requested get x
child 0 requested get x
child 2 requested set x 1
child 2 requested get y
child 1 requested set x 1
child 1 requested get y
child 2 requested set y -1
child 1 requested set y -1
child 0 requested set x 1
child 0 requested get y
child 0 requested set y -2
child 3 requested get x
child 3 requested set x 2
child 3 requested get y
child 3 requested set y -3  # the program stalls here
Syntactics answered 13/2, 2022 at 17:40 Comment(0)
A
0

A more sophisticated solution based on the lock-free atomic operations, as given by example on atomics library README:

from multiprocessing import Process, shared_memory

import atomics


def fn(shmem_name: str, width: int, n: int) -> None:
    shmem = shared_memory.SharedMemory(name=shmem_name)
    buf = shmem.buf[:width]
    with atomics.atomicview(buffer=buf, atype=atomics.INT) as a:
        for _ in range(n):
            a.inc()
    del buf
    shmem.close()


if __name__ == "__main__":
    # setup
    width = 4
    shmem = shared_memory.SharedMemory(create=True, size=width)
    buf = shmem.buf[:width]
    total = 10_000
    # run processes to completion
    p1 = Process(target=fn, args=(shmem.name, width, total // 2))
    p2 = Process(target=fn, args=(shmem.name, width, total // 2))
    p1.start(), p2.start()
    p1.join(), p2.join()
    # print results and cleanup
    with atomics.atomicview(buffer=buf, atype=atomics.INT) as a:
        print(f"a[{a.load()}] == total[{total}]")
    del buf
    shmem.close()
    shmem.unlink()

(atomics could be installed via pip install atomics on most of the major platforms)

Acadian answered 14/9, 2022 at 19:51 Comment(0)
S
-1

I'm working on a process bar in PyQT5, so I use thread and pool together

import threading
import multiprocessing as mp
from queue import Queue

def multi(x):
    return x*x

def pooler(q):
    with mp.Pool() as pool:
    count = 0
    for i in pool.imap_unordered(ggg, range(100)):
        print(count, i)
        count += 1
        q.put(count)

def main():
    q = Queue()
    t = threading.Thread(target=thr, args=(q,))
    t.start()
    print('start')
    process = 0
    while process < 100:
        process = q.get()
        print('p',process)
if __name__ == '__main__':
    main()

this I put in Qthread worker and it works with acceptable latency

Salicin answered 24/10, 2019 at 7:28 Comment(0)
M
-1

This is a different solution and the simplest to my taste. The reasoning is you create an empty list and append to it each time your function executes , then print len(list) to check progress.

Here is an example based on your code :

import os
import multiprocessing
counter = []


def analyze(file):
    # Analyze the file.
    counter.append(' ')
    print len(counter)


if __name__ == '__main__':
    files = os.listdir('/some/directory')
    pool = multiprocessing.Pool(4)
    pool.map(analyze, files)

Add counter to multiprocessing as follow :

    from multiprocessing.pool import ThreadPool

    counter = []

    def your_function():
       # function/process
       counter.append(' ') # you can append anything
       return len(counter)

    pool = ThreadPool()
    result = pool.map(get_data, urls)
Manhole answered 31/1, 2023 at 17:17 Comment(2)
Python lists are not process safe and they consume too much memory if we want to count a really big numberAcquittal
You're assuming that OP has an issue with memory consumption on hardware rather than finding a solution ?Manhole

© 2022 - 2024 — McMap. All rights reserved.