Here is a solution to your problem based on a different approach from that proposed in the other answers. It uses message passing with multiprocessing.Queue
objects (instead of shared memory with multiprocessing.Value
objects) and process-safe (atomic) built-in increment and decrement operators +=
and -=
(instead of introducing custom increment
and decrement
methods) since you asked for it.
First, we define a class Subject
for instantiating an object that will be local to the parent process and whose attributes are to be incremented or decremented:
import multiprocessing
class Subject:
def __init__(self):
self.x = 0
self.y = 0
Next, we define a class Proxy
for instantiating an object that will be the remote proxy through which the child processes will request the parent process to retrieve or update the attributes of the Subject
object. The interprocess communication will use two multiprocessing.Queue
attributes, one for exchanging requests and one for exchanging responses. Requests are of the form (sender, action, *args)
where sender
is the sender name, action
is the action name ('get'
, 'set'
, 'increment'
, or 'decrement'
the value of an attribute), and args
is the argument tuple. Responses are of the form value
(to 'get'
requests):
class Proxy(Subject):
def __init__(self, request_queue, response_queue):
self.__request_queue = request_queue
self.__response_queue = response_queue
def _getter(self, target):
sender = multiprocessing.current_process().name
self.__request_queue.put((sender, 'get', target))
return Decorator(self.__response_queue.get())
def _setter(self, target, value):
sender = multiprocessing.current_process().name
action = getattr(value, 'action', 'set')
self.__request_queue.put((sender, action, target, value))
@property
def x(self):
return self._getter('x')
@property
def y(self):
return self._getter('y')
@x.setter
def x(self, value):
self._setter('x', value)
@y.setter
def y(self, value):
self._setter('y', value)
Then, we define the class Decorator
to decorate the int
objects returned by the getters of a Proxy
object in order to inform its setters whether the increment or decrement operators +=
and -=
have been used by adding an action
attribute, in which case the setters request an 'increment'
or 'decrement'
operation instead of a 'set'
operation. The increment and decrement operators +=
and -=
call the corresponding augmented assignment special methods __iadd__
and __isub__
if they are defined, and fall back on the assignment special methods __add__
and __sub__
which are always defined for int
objects (e.g. proxy.x += value
is equivalent to proxy.x = proxy.x.__iadd__(value)
which is equivalent to proxy.x = type(proxy).x.__get__(proxy).__iadd__(value)
which is equivalent to type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy).__iadd__(value))
):
class Decorator(int):
def __iadd__(self, other):
value = Decorator(other)
value.action = 'increment'
return value
def __isub__(self, other):
value = Decorator(other)
value.action = 'decrement'
return value
Then, we define the function worker
that will be run in the child processes and request the increment and decrement operations:
def worker(proxy):
proxy.x += 1
proxy.y -= 1
Finally, we define a single request queue to send requests to the parent process, and multiple response queues to send responses to the child processes:
if __name__ == '__main__':
subject = Subject()
request_queue = multiprocessing.Queue()
response_queues = {}
processes = []
for index in range(4):
sender = 'child {}'.format(index)
response_queues[sender] = multiprocessing.Queue()
proxy = Proxy(request_queue, response_queues[sender])
process = multiprocessing.Process(
target=worker, args=(proxy,), name=sender)
processes.append(process)
running = len(processes)
for process in processes:
process.start()
while subject.x != 4 or subject.y != -4:
sender, action, *args = request_queue.get()
print(sender, 'requested', action, *args)
if action == 'get':
response_queues[sender].put(getattr(subject, args[0]))
elif action == 'set':
setattr(subject, args[0], args[1])
elif action == 'increment':
setattr(subject, args[0], getattr(subject, args[0]) + args[1])
elif action == 'decrement':
setattr(subject, args[0], getattr(subject, args[0]) - args[1])
for process in processes:
process.join()
The program is guaranteed to terminate when +=
and -=
are process-safe. If you remove process-safety by commenting the corresponding __iadd__
or __isub__
of Decorator
then the program will only terminate by chance (e.g. proxy.x += value
is equivalent to proxy.x = proxy.x.__iadd__(value)
but falls back to proxy.x = proxy.x.__add__(value)
if __iadd__
is not defined, which is equivalent to proxy.x = proxy.x + value
which is equivalent to proxy.x = type(proxy).x.__get__(proxy) + value
which is equivalent to type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy) + value)
, so the action
attribute is not added and the setter requests a 'set'
operation instead of an 'increment'
operation).
Example process-safe session (atomic +=
and -=
):
child 0 requested get x
child 0 requested increment x 1
child 0 requested get y
child 0 requested decrement y 1
child 3 requested get x
child 3 requested increment x 1
child 3 requested get y
child 2 requested get x
child 3 requested decrement y 1
child 1 requested get x
child 2 requested increment x 1
child 2 requested get y
child 2 requested decrement y 1
child 1 requested increment x 1
child 1 requested get y
child 1 requested decrement y 1
Example process-unsafe session (non-atomic +=
and -=
):
child 2 requested get x
child 1 requested get x
child 0 requested get x
child 2 requested set x 1
child 2 requested get y
child 1 requested set x 1
child 1 requested get y
child 2 requested set y -1
child 1 requested set y -1
child 0 requested set x 1
child 0 requested get y
child 0 requested set y -2
child 3 requested get x
child 3 requested set x 2
child 3 requested get y
child 3 requested set y -3 # the program stalls here