ERROR:root:can't pickle fasttext_pybind.fasttext objects
Asked Answered
L

1

2

I am using gunicorn with multiple workers for my machine learning project. But the problem is when I send a train request only the worker getting the training request gets updated with the latest model after training is done. Here it is worth to mention that, to make the inference faster I have programmed to load the model once after each training. This is why, the only worker which is used for current training operation loads the latest model and the other workers still keeps the previously loaded model. Right now the model file (binary format) is loaded once after each training in a global dictionary variable where key is the model name and the value is the model file. Obviously, this problem won't occur if I program it to load the model every time from disk for each prediction, but I cannot do it, as it will make the prediction slower.

I studied further on global variables and further investigation shows that, in a multi-processing environment, all the workers (processes) create their own copies of global variables. Apart from the binary model file, I also have some other global variables (in dictionary type) need to be synced across all processes. So, how to handle this situation?

TL;DR: I need some approach which can help me to store variable which will be common across all the processes (workers). Any way to do this? With multiprocessing.Manager, dill etc.?

Update 1: I have multiple machine learning algorithms in my project and they have their own model files, which are being loaded to memory in a dictionary where the key is the model name and the value is the corresponding model object. I need to share all of them (in other words, I need to share the dictionary). But some of the models are not pickle serializable like - FastText. So, when I try to use a proxy variable (in my case a dictionary to hold models) with multiprocessing.Manager I get error for those non-pickle-serializable object while assigning the loaded model file to this dictionary. Like: can't pickle fasttext_pybind.fasttext objects. More information on multiprocessing.Manager can be found here: Proxy Objects

Following is the summary what I have done:

import multiprocessing
import fasttext

mgr = multiprocessing.Manager()
model_dict = mgr.dict()
model_file = fasttext.load_model("path/to/model/file/which/is/in/.bin/format")
model_dict["fasttext"] = model_file # This line throws this error

Error:

can't pickle fasttext_pybind.fasttext objects

I printed the model_file which I am trying to assign, it is:

<fasttext.FastText._FastText object at 0x7f86e2b682e8>

Update 2: According to this answer I modified my code a little bit:

import fasttext
from multiprocessing.managers import SyncManager

def Manager():
    m = SyncManager()
    m.start()
    return m

# As the model file has a type of "<fasttext.FastText._FastText object at 0x7f86e2b682e8>" so, using "fasttext.FastText._FastText" as the class of it
SyncManager.register("fast", fasttext.FastText._FastText)
# Now this is the Manager as a replacement of the old one.
mgr = Manager()
ft = mgr.fast() # This line gives error.

This gives me EOFError.

Update 3: I tried using dill both with multiprocessing and multiprocess. The summary of changes are as the following:

import multiprocessing
import multiprocess
import dill

# Any one of the following two lines
mgr = multiprocessing.Manager() # Or,
mgr = multiprocess.Manager()

model_dict = mgr.dict()
... ... ...
... ... ...

model_file = dill.dumps(model_file) # This line throws the error
model_dict["fasttext"] = model_file
... ... ...
... ... ...
# During loading
model_file = dill.loads(model_dict["fasttext"])

But still getting the error: can't pickle fasttext_pybind.fasttext objects.

Update 4: This time I am using another library called jsonpickle. It seems to be that serialization and de-serialization occurs properly (as it is not reporting any issue while running). But surprisingly enough, after de-serialization whenever I am making a prediction, it faces segmentation fault. More details and the steps to reproduce it can be found here: Segmentation fault (core dumped)

Update 5: Tried cloudpickle, srsly, but couldn't make the program working.

Leger answered 4/10, 2021 at 3:55 Comment(10)
pubsub to workers with new value?Exergue
multiprocessing.Manager is probably your best bet, but there's plenty of other options as described in the docsJacob
@Jacob yes probably it is. I also got to know about this from here: medium.com/@jgleeee/…Leger
@Jacob [Update] for my particular case, it is quite complicated. I want to do it for Fasttext supervised model (Fasttext from Facebook) which is not pickle serializable. But to be shared across processes the object is needed to be serializable (docs.python.org/3/library/multiprocessing.html#proxy-objects), so I ended up getting error: can't pickle fasttext_pybind.fasttext objects.Leger
I'm the author of dill, multiprocess, klepto, and ppft. One of these may help. If you can serialize the object with dill, then you can pass it between processes. If it's a big object, then you can try using a multiprocess.Manager to share one object across multiple cores, or if the computation is lighter then you might try multiprocess.dummy for threading. ppft is like multiprocess, but uses dill.source to extract source code instead of serialization. klepto can share objects between processes through database-like objects. First try dill.dumps to see if it pickles.Diglot
@MikeMcKerns no, model_file = dill.dumps(model_file) also gives the same error: can't pickle fasttext_pybind.fasttext objects. The model size is typically around 250 MB.Leger
..Alternatively, also tried using multiprocess instead of multiprocessing along with it. But getting the same error. I am totally clueless.Leger
multiprocess won't work if dill can't pickle the object. So, klepto is probably also not going to work. I think the answer is that the object is not serializable, and you can't do what you want to do.Diglot
You can, however, make a class that derives from Fasttext which provides a __reduce__ method so the state can be stored. Or, with that knowledge, you can then register a new method to the pickle registry in dill, thus teaching dill how to serialize a Fasttext object.Diglot
@MedetTleukabiluly yes finally solved it using redis pub-sub. But I didn't share the model file via pub-sub. But I shared only a message to other workers to load the model from the disk. I had to compromise the fact that each of the workers will load a separate copy of the model and this strategy consumes more memory.Leger
L
0

For the sake of completeness I am providing the solution that worked for me. All the approaches I have tried to serialize FastText went in vain. Finally, as @MedetTleukabiluly mentioned in the comment, I managed to share the message of loading the model from the disk with other workers with redis-pubsub. Obviously, it is not actually sharing the model from the same memory space, rather, just sharing the message to other workers to inform them they should load the model from the disk (as a new training just happened). Following is the general solution:

# redis_pubsub.py

import logging
import os
import fasttext
import socket
import threading
import time

"""The whole purpose of GLOBAL_NAMESPACE is to keep the whole pubsub mechanism separate.
As this might be a case another service also publishing in the same channel.
"""
GLOBAL_NAMESPACE = "SERVICE_0"

def get_ip():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        s.connect(('10.255.255.255', 1))
        IP = s.getsockname()[0]
    except Exception:
        IP = '127.0.0.1'
    finally:
        s.close()
    return IP


class RedisPubSub:
    def __init__(self):
        self.redis_client = get_redis_client() #TODO: A SAMPLE METHOD WHICH CAN RETURN YOUR REDIS CLIENT (you have to implement)
        # Unique ID is used, to identify which worker from which server is the publisher. Just to avoid updating
        # getting a message which message is indeed sent by itself.
        self.unique_id = "IP_" + get_ip() + "__" + str(GLOBAL_NAMESPACE) + "__" + "PID_" + str(os.getpid())


    def listen_to_channel_and_update_models(self, channel):
        try:
            pubsub = self.redis_client.pubsub()
            pubsub.subscribe(channel)
        except Exception as exception:
            logging.error(f"REDIS_ERROR: Model Update Listening: {exception}")

        while True:
            try:
                message = pubsub.get_message()

                # Successful operation gives 1 and unsuccessful gives 0
                # ..we are not interested to receive these flags
                if message and message["data"] != 1 and message["data"] != 0: 
                    message = message["data"].decode("utf-8")
                    message = str(message)
                    splitted_msg = message.split("__SEPERATOR__")


                    # Not only making sure the message is coming from another worker
                    # but also we have to make sure the message sender and receiver (i.e, both of the workers) are under the same namespace
                    if (splitted_msg[0] != self.unique_id) and (splitted_msg[0].split('__')[1] == GLOBAL_NAMESPACE):
                        algo_name = splitted_msg[1]
                        model_path = splitted_msg[2]

                        # Fasttext
                        if "fasttext" in algo_name:
                            try:
                                #TODO: YOU WILL GET THE LOADED NEW FILE IN model_file. USE IT TO UPDATE THE OLD ONE.
                                model_file = fasttext.load_model(model_path + '.bin')
                            except Exception as exception:
                                logging.error(exception)
                            else:
                                logging.info(f"{algo_name} model is updated for process with unique_id: {self.unique_id} by process with unique_id: {splitted_msg[0]}")


                time.sleep(1) # sleeping for 1 second to avoid hammering the CPU too much

            except Exception as exception:
                time.sleep(1)
                logging.error(f"PUBSUB_ERROR: Model or component update: {exception}")


    def publish_to_channel(self, channel, algo_name, model_path):
        def _publish_to_channel():
            try:
                message = self.unique_id + '__SEPERATOR__' + str(algo_name) + '__SEPERATOR__' + str(model_path)
                time.sleep(3)
                self.redis_client.publish(channel, message)
            except Exception as exception:
                logging.error(f"PUBSUB_ERROR: Model or component publishing: {exception}")

        # As the delay before pubsub can pause the next activities which are independent, hence, doing this publishing in another thread.
        thread = threading.Thread(target = _publish_to_channel)
        thread.start()

Also you have to start the listener:

from redis_pubsub import RedisPubSub
pubsub = RedisPubSub()


# start the listener:
thread = threading.Thread(target = pubsub.listen_to_channel_and_update_models, args = ("sync-ml-models", ))
thread.start()

From fasttext training module, when you finish the training, publish this message to other workers, such that the other workers get a chance to re-load the model from the disk:

# fasttext_api.py

from redis_pubsub import RedisPubSub
pubsub = RedisPubSub()

pubsub.publish_to_channel(channel = "sync-ml-models", # a sample name for the channel
                                  algo_name = f"fasttext",
                                  model_path = "path/to/fasttext/model")

Leger answered 4/12, 2022 at 3:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.