Creating and updating nested dictionaries and lists inside multiprocessing.Manager object
Asked Answered
A

1

3

I have created a nested dictionary inside a multiprocessing.Manager.dict object. Dictionary methods like update, clear, etc., does not work when I apply them to the nested dictionary. This is an example:

from multiprocessing import Manager, Process


def worker(shared_object):
    print(f'Before defining the nested dictionary: {shared_object}')
    shared_object['nested'] = {
        'first_key': 'first_value'
    }
    print(f'After defining the nested dictionary: {shared_object}')
    shared_object['nested'].update(
        {
            'second_key': 'second_value'
        }
    )
    print(f'After updating the nested dictionary: {shared_object}')


if __name__ == '__main__':
    with Manager() as manager:
        shared_dict = manager.dict()
        worker_one = Process(target=worker, args=(shared_dict,))
        worker_one.start()
        worker_one.join()

And this is the result:

Before defining the nested dictionary: {}
After defining the nested dictionary: {'nested': {'first_key': 'first_value'}}
After updating the nested dictionary: {'nested': {'first_key': 'first_value'}}

What can I do?

Actable answered 18/8, 2022 at 20:37 Comment(2)
The nested dict would itself have to be created by manager.dict() in order to be shared.Damar
@Damar How can I create that nested dictionary inside the worker function since I don't have access to the manager in there? I passed the manager as an argument but didn't work.Actable
C
3

Your usage of the shared dictionary is identical to and can be re-written as this:

temp = shared_object['nested']
temp.update({'second_key': 'second_value'})

When you request the value of the nested key from the shared dictionary, it is stored in the current process's memory space. This means that any changes you do to this dictionary will not be reflected in the shared dictionary stored inside the manager simply because the manager has no way of knowing what you did to the dictionary after retrieving a subset of it. Therefore, what you need to do is inform the manager that you have made changes to the nested dictionary. There are two ways you can do this:

Explicitly call the manager object with the updated value

This is probably the most simplest (and fastest, in terms of performance) fix here, after making any changes to the nested dictionary, explicitly set the key's value as the altered dictionary:

temp = shared_object['nested']
temp.update({'second_key': 'second_value'})
shared_object['nested'] = temp

Doing so will let the manager know that there has been a change in the managed object.

Create another managed object for the nested dictionary (manually vs automatically)

One other way you can achieve this is by creating the managed objects for the nested dictionary and storing that instead. However, keep in mind this can quickly become inconvenient incase the nested levels for your objects go very deep. This is because for each nested dictionary, you'll need to create another managed object. Moreover, this will not work incase the child processes you create are daemonic (default when using multiprocessing.Pool) and they need to add their own nested objects inside the shared dictionary. This is because to create a managed object you need to start a manager process, and daemonic processes cannot spawn their own processes.

However, both these issues can quickly be fixed if you let the shared dictionary become responsible enough to automatically create managed objects if someone attempts to store a nested dictionary within it. This removes the burden of doing so manually, and an example of how to make this possible is given below (it works for both lists and dicitonaries):

from multiprocessing.managers import SyncManager, MakeProxyType, ListProxy, State
from multiprocessing import Process, Lock
from collections import UserDict
from collections import UserList


class ManagerList(UserList):

    def __check_state(self):
        global manager
        global lock

        # Managers are not thread-safe, protect starting one from within another with a lock
        with lock:
            if manager._state.value != State.STARTED:
                manager.start(initializer=init)

    def __setitem__(self, key, value):
        global manager
        self.__check_state()

        if isinstance(value, list):
            value = manager.list(value)
        elif isinstance(value, dict):
            value = manager.dict(value)
        return super().__setitem__(key, value)


class ManagerDict(UserDict):

    def __check_state(self):
        global manager
        global lock

        # Managers are not thread-safe, protect starting one from within another with a lock
        with lock:
            if manager._state.value != State.STARTED:
                manager.start(initializer=init)

    def __setitem__(self, key, value):
        global manager
        self.__check_state()

        if isinstance(value, list):
            value = manager.list(value)
        elif isinstance(value, dict):
            value = manager.dict(value)
        return super().__setitem__(key, value)


ManagerDictProxy = MakeProxyType('DictProxy', (
    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
    '__setitem__', 'clear', 'copy', 'get', 'items',
    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
    ))
ManagerDictProxy._method_to_typeid_ = {
    '__iter__': 'Iterator',
    }

SyncManager.register('list', ManagerList, ListProxy)
SyncManager.register('dict', ManagerDict, ManagerDictProxy)


def init():
    global manager
    global lock

    lock = Lock()
    manager = SyncManager()

def worker(shared_object):
    print(f'Before defining the nested dictionary: {shared_object}')
    shared_object['nested'] = {
        'first_key': 'first_value',
    }
    print(f'After defining the nested dictionary: {shared_object["nested"]}')
    shared_object['nested'].update(
        {
            'second_key': 'second_value'
        }
    )
    print(f'After updating the nested dictionary: {shared_object["nested"]}')


if __name__ == "__main__":
    manager = SyncManager()
    manager.start(initializer=init)

    d = manager.dict()  # It works with manager.list() too
    p = Process(target=worker, args=(d, ))
    p.start()
    p.join()

This uses the __setitem__ dunder method to check whether the value being assigned to the dictionary is another list or a dictionary, and if it is, it creates a managed object for them and stores that instead. Any changes now made to these nested dictionaries/lists would then be reflected in the original dictionary as well. This works for any amount of nested levels.

However, keep in mind that this only creates the managed objects if the datatype is list or dict (or their subclasses). Additionally, it creates a manager process for each nested level. Therefore, using the first method will always be faster than using this one, even though this one is more convenient.

Output

Before defining the nested dictionary: {}
After defining the nested dictionary: {'first_key': 'first_value'}
After updating the nested dictionary: {'first_key': 'first_value', 'second_key': 'second_value'}
Cur answered 19/8, 2022 at 14:35 Comment(3)
I appreciate your clear and helpful explanation. This issue can also be resolved by using the jsonmodule for nested dictionaries. Initially, the solution that came to my mind, was to save the nested dictionary as a string and convert that string to a dictionary using json.loads, modify it and then save it again as a string with json.dumps. Now I can use your excellent solutions. Thanks again.Actable
A doubt: what happen with the method "Explicitly call the manager object with the updated value", if it's called at the same time on 2 different process? Can lead to data loss? Or is there some "double access protect mechanism"?Dicky
@DanieleRugginenti yes that would lead to data loss since managers are not technically process or thread-safe. Changing mutable managed objects from multiple processes or threads should be done with locks. Some more context on that https://mcmap.net/q/130967/-unexpected-multiprocessing-behavior-with-omitted-lockCur

© 2022 - 2024 — McMap. All rights reserved.