Python Multiprocessing: Handling Child Errors in Parent
Asked Answered
P

4

88

I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.

Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.

As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.

I have two questions:

  1. How do I detect the child error in the parent?
  2. How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.

I am using python 2.7.

Here are the essential parts of my code:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()

[... initializing mongo db connection ...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()
Philosophical answered 12/11, 2013 at 8:20 Comment(2)
It was suggested that I rewrite my 3 steps to be one single function and submit it to a process pool. But i want these steps split up, they should be interchangeable. In the end I will have several classes that all to one specific task and I can run them as processes with queues between them (as shown above). There could also be a file output instead of the mysql writer or an additional transformation step, where i split or merge columns. Think of them as steps in a Kettle transformation, if you know the tool.Philosophical
I put an answer to your specific questions but at a higher level, are your worker processes really going to be cpu-bound? The stuff you are talking about sounds like it would be I/O bound. If so, I don't think multiprocessing is going to help you. Have you looked at the many alternativesRiddick
R
58

I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

Specifically what I do is:

  • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
  • always provide a shared error multiprocessing.Queue from the main process to each worker process
  • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
    • the process id that died
    • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
  • of course handle expected issues as normal within the normal operation of the worker
  • (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
    • define a stop token in the class or for functions.
    • When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
    • the wrapping loop checks the input q for the token or whatever other input you want

The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?

Riddick answered 12/11, 2013 at 12:54 Comment(7)
yes, this does help. I have been thinking about creating an Error queu to communicate between parent and child process but I was hoping there was a better (standard) solution provided by the multiprocessing module that I have not found yet. How would I tell the other child processes to terminate?Philosophical
It's as you mentioned. I send a stop token to the input Q. I updated the answer to reflect this.Riddick
I used your answer as starting point for my solution, thanks! I have added my solution as separate Answer to my question.Philosophical
You emphasize "sending an error package with the exception with its original context", which to me means the traceback object that is used as the 3rd argument to a raise statement. But a traceback object is not pickable, so it can't be sent through a multiprocessing.Queue object. How do you get the context back to the parent process?Motionless
@Motionless I haven't touched MP for a long time now so I can't remember. I think I had something that worked with passing exceptions based on the linked discussion to Ned Batchelder's blog but I can't say 100% I was passing the traceback object. Maybe I was just sending a string? Sorry I don't have time to go back and reinvestigate. If you have a good pattern, this question could definitely use some best practice answers with code.Riddick
@Motionless Probably too late already. I send traceback as string. There is no other way.Allveta
@ShipluMokaddim There are many other ways. Pickling and unpickling traceback objects with third-party serialization libraries (e.g., dill, cloudpickle) is one obvious alternative. This is why pathos is commonly recommended over the standard multiprocessing package.Orvalorvan
C
73

Why not to let the Process to take care of its own exceptions, like this:

from __future__ import print_function
import multiprocessing as mp
import traceback

class Process(mp.Process):
    def __init__(self, *args, **kwargs):
        mp.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = mp.Pipe()
        self._exception = None

    def run(self):
        try:
            mp.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception

Now you have, both error and traceback at your hands:

def target():
    raise ValueError('Something went wrong...')

p = Process(target = target)
p.start()
p.join()

if p.exception:
    error, traceback = p.exception
    print(traceback)

Regards, Marek

Counterblow answered 8/11, 2015 at 22:50 Comment(17)
I run into the following error on my linux server: File "/home/ec2-user/anaconda3/lib/python3.6/multiprocessing/connection.py", line 252, in recv return _ForkingPickler.loads(buf.getbuffer()) TypeError: init() takes 1 positional argument but 2 were given Does anyone know what to do?Synodic
This solutoion doesn't work with HTTPError for some reason.Teddie
From python docs: "Though being an exception (a subclass of URLError), an HTTPError can also function as a non-exceptional file-like return value (the same thing that urlopen() returns). This is useful when handling exotic HTTP errors, such as requests for authentication." Maybe you face such exotic situation?Counterblow
AFAICT (python 3.6) this doesn't work. Exceptions thrown in target() are not bubbled-up to Process.run(self) .Mccue
Just tested and it works as expected (python 3.7). Only print statement have to be modified...Counterblow
There is one disadvantage of this solution. If we have few processes and just 1 has error, we need to wait until all processes are finished in order to check if p.exception. Fixed here: https://mcmap.net/q/237075/-python-multiprocessing-handling-child-errors-in-parentParrotfish
Note that as @JensdeBruijn has noticed there appears to be an open python bug which affects pickling exceptions. Curious if anybody has a work-around for this.Jamestown
@JensdeBruijn A naive work-around to seems to be avoiding to send the error object over the pipe, e.g. instead of self._child_conn.send((e, tb)) use self._child_conn.send(tb) so send only the traceback.Jamestown
@Counterblow - why would I want to raise the exception from run() (i.e. raise e)?Packthread
@Counterblow - and why do you send None over the client pipe: self._cconn.send(None)?Packthread
The answer to the first question is: i don't know, i never used this. That's why this line is commented out. The second one: self._cconn.send(None) provides information that everything was OK. Isn't it, that self._pconn.recv() raises something when there's nothing on the other side?Counterblow
This code will deadlock if exception is too big (message and/or stack trace too long). The receiving end must call Pipe.recv() regularly otherwise Pipe.send() will block when the internal buffer becomes full. The join() will wait forever for the child to exit, while the child will wait forever for the parent to do recv() which only happens after join() finishes.Mcmasters
You're right. But the whole idea arises from the necessity of checking p.exceptionafter p.join(). Otherwise mp.Process class is just enough.Counterblow
Ahh, sorry @hamstergene, i misunderstood your comment. You say that even single exception can fill buffer. Well, i didn't expect that exception can really be such a big thing. There's usually 64kB of buffer space...Counterblow
For me it happened on subprocess.CalledProcessError with 5000 characters long message, so on my OS the buffer must be only 4096 bytes.Mcmasters
Why does it show error AssertionError: group argument must be None for now if I change mp.Process.__init__(self, *args, **kwargs) to super().__init__(self, *args, **kwargs)?Sande
Super() is confused with the class name, which is the same as the original one. Use another name, e.g. ProcessWithException. This seems like a bug in super() since new class is defined in another namespace...Counterblow
R
58

I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

Specifically what I do is:

  • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
  • always provide a shared error multiprocessing.Queue from the main process to each worker process
  • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
    • the process id that died
    • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
  • of course handle expected issues as normal within the normal operation of the worker
  • (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
    • define a stop token in the class or for functions.
    • When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
    • the wrapping loop checks the input q for the token or whatever other input you want

The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?

Riddick answered 12/11, 2013 at 12:54 Comment(7)
yes, this does help. I have been thinking about creating an Error queu to communicate between parent and child process but I was hoping there was a better (standard) solution provided by the multiprocessing module that I have not found yet. How would I tell the other child processes to terminate?Philosophical
It's as you mentioned. I send a stop token to the input Q. I updated the answer to reflect this.Riddick
I used your answer as starting point for my solution, thanks! I have added my solution as separate Answer to my question.Philosophical
You emphasize "sending an error package with the exception with its original context", which to me means the traceback object that is used as the 3rd argument to a raise statement. But a traceback object is not pickable, so it can't be sent through a multiprocessing.Queue object. How do you get the context back to the parent process?Motionless
@Motionless I haven't touched MP for a long time now so I can't remember. I think I had something that worked with passing exceptions based on the linked discussion to Ned Batchelder's blog but I can't say 100% I was passing the traceback object. Maybe I was just sending a string? Sorry I don't have time to go back and reinvestigate. If you have a good pattern, this question could definitely use some best practice answers with code.Riddick
@Motionless Probably too late already. I send traceback as string. There is no other way.Allveta
@ShipluMokaddim There are many other ways. Pickling and unpickling traceback objects with third-party serialization libraries (e.g., dill, cloudpickle) is one obvious alternative. This is why pathos is commonly recommended over the standard multiprocessing package.Orvalorvan
P
17

@mrkwjc 's solution is simple, so easy to understand and implement, but there is one disadvantage of this solution. When we have few processes and we want to stop all processes if any single process has error, we need to wait until all processes are finished in order to check if p.exception. Below is the code which fixes this problem (ie when one child has error, we terminate also another child):

import multiprocessing
import traceback

from time import sleep


class Process(multiprocessing.Process):
    """
    Class which returns child Exceptions to Parent.
    https://mcmap.net/q/237075/-python-multiprocessing-handling-child-errors-in-parent
    """

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._parent_conn, self._child_conn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._child_conn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._child_conn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._parent_conn.poll():
            self._exception = self._parent_conn.recv()
        return self._exception


class Task_1:
    def do_something(self, queue):
        queue.put(dict(users=2))


class Task_2:
    def do_something(self, queue):
        queue.put(dict(users=5))


def main():
    try:
        task_1 = Task_1()
        task_2 = Task_2()

        # Example of multiprocessing which is used:
        # https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
        task_1_queue = multiprocessing.Queue()
        task_2_queue = multiprocessing.Queue()

        task_1_process = Process(
            target=task_1.do_something,
            kwargs=dict(queue=task_1_queue))

        task_2_process = Process(
            target=task_2.do_something,
            kwargs=dict(queue=task_2_queue))

        task_1_process.start()
        task_2_process.start()

        while task_1_process.is_alive() or task_2_process.is_alive():
            sleep(10)

            if task_1_process.exception:
                error, task_1_traceback = task_1_process.exception

                # Do not wait until task_2 is finished
                task_2_process.terminate()

                raise ChildProcessError(task_1_traceback)

            if task_2_process.exception:
                error, task_2_traceback = task_2_process.exception

                # Do not wait until task_1 is finished
                task_1_process.terminate()

                raise ChildProcessError(task_2_traceback)

        task_1_process.join()
        task_2_process.join()

        task_1_results = task_1_queue.get()
        task_2_results = task_2_queue.get()

        task_1_users = task_1_results['users']
        task_2_users = task_2_results['users']

    except Exception:
        # Here usually I send email notification with error.
        print('traceback:', traceback.format_exc())


if __name__ == "__main__":
    main()
Parrotfish answered 23/9, 2019 at 10:49 Comment(3)
@Parrotfish - why do you send None over the pipe: self._child_conn.send(None)?Packthread
@Packthread it is for the case in which the process has no exceptions. If process you instantiated (say process P) started and it exited without exceptions, you'd like P.exception to reflect that. Here, this is a return of None. To ensure that P.exception returns None upon a process run encountering no exceptions, we send None over the pipe when no exceptions are encountered by process.Spencerianism
I really appreciated this answer for showing me a neat way to architect this, both orchestrating the processes and organizing the tasks as classes. Thank youKoffman
P
9

Thanks to kobejohn i have found a solution which is nice and stable.

  1. I have created a subclass of multiprocessing.Process which implements some functions and overwrites the run() method to wrap a new saferun method into a try-catch block. This Class requires a feedback_queue to initialize which is used to report info, debug, error messages back to the parent. The log methods in the class are wrappers for the globally defined log functions of the package:

    class EtlStepProcess(multiprocessing.Process):
    
        def __init__(self, feedback_queue):
            multiprocessing.Process.__init__(self)
            self.feedback_queue = feedback_queue
    
        def log_info(self, message):
            log_info(self.feedback_queue, message, self.name)
    
        def log_debug(self, message):
            log_debug(self.feedback_queue, message, self.name)
    
        def log_error(self, err):
            log_error(self.feedback_queue, err, self.name)
    
        def saferun(self):
            """Method to be run in sub-process; can be overridden in sub-class"""
            if self._target:
                self._target(*self._args, **self._kwargs)
    
        def run(self):
            try:
                self.saferun()
            except Exception as e:
                self.log_error(e)
                raise e
            return
    
  2. I have subclassed all my other process steps from EtlStepProcess. The code to be run is implemented in the saferun() method rather than run. This ways i do not have to add a try catch block around it, since this is already done by the run() method. Example:

    class MySqlWriter(EtlStepProcess):
    
        def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count,
                     input_queue, feedback_queue):
            EtlStepProcess.__init__(self, feedback_queue)
            self.mysql_host = mysql_host
            self.mysql_user = mysql_user
            self.mysql_passwd = mysql_passwd
            self.mysql_schema = mysql_schema
            self.mysql_table = mysql_table
            self.columns = columns
            self.commit_count = commit_count
            self.input_queue = input_queue
    
        def saferun(self):
            self.log_info(self.name + " started")
            #create mysql connection
            engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema)
            meta = sqlalchemy.MetaData()
            table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine)
            connection = engine.connect()
            try:
                self.log_info("start MySQL insert")
                counter = 0
                row_list = []
                while True:
                    next_row = self.input_queue.get()
                    if isinstance(next_row, Terminator):
                        if counter % self.commit_count != 0:
                            connection.execute(table.insert(), row_list)
                        # Poison pill means we should exit
                        break
                    row_list.append(next_row)
                    counter += 1
                    if counter % self.commit_count == 0:
                        connection.execute(table.insert(), row_list)
                        del row_list[:]
                        self.log_debug(self.name + ' ' + str(counter))
    
            finally:
                connection.close()
            return
    
  3. In my main file, I submit a Process that does all the work and give it a feedback_queue. This process starts all the steps and thenreads from mongoDB and puts values to the initial queue. My main process listens to the feedback queue and prints all log messages. If it receives an error log, it print the error and terminate its child, which in return also terminates all its children before dying.

    if __name__ == '__main__':
    feedback_q = multiprocessing.Queue()
    p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,))
    p.start()
    
    while p.is_alive():
        fb = feedback_q.get()
        if fb["type"] == "error":
            p.terminate()
            print "ERROR in " + fb["process"] + "\n"
            for child in multiprocessing.active_children():
                child.terminate()
        else:
            print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \
                                                  fb["process"] + ": " + fb["message"]
    
    p.join()
    

I think about making a module out of it and putting it up on github, but I have to do some cleaning up and commenting first.

Philosophical answered 13/11, 2013 at 10:21 Comment(7)
That's great to have actual code. Here is some feedback: 1) why do the log_* methods seem to call themselves? Are those top level functions you have defined elsewhere? 2) be careful with Q.get() it will block forever. You can use get(False) to spin really quickly or get(timeout=some_very_small_time) to spin without blasting your CPU. In either case you have to wrap it with try/except Queue.Empty 3) shouldn't need to terminate the processes when an unhandled error is received. the try/except handles that and lets them close out peacefully. terminate() is generally discouraged I believe anyway.Riddick
4) On the same topic, I recommend using the stop token rather than terminate(). I define the stop token within each class that subclasses Process. (or Thread... actually all of this stuff applies to threading.Thread as well.) 5) You'll really want to use the reraise technique to pass exception context back to the main process so that you retain debug information. The exception is pretty useless otherwise.Riddick
Thanks for the tips! Regarding your points: 1) yes, they are toplevel functions in my module that can be used outside of the module as well. 2) I will add this with a timeout and catch the timeout exception, thanks 3) since these processes are part of a whole loading structure from mongodb to mysql, i need to make sure that the whole thing shuts down when one process has an error so I do not miss any data or insert wrong data into mysql. It's an all or nothing thing and is meant to be not fault tolerant. 4)+5) will chek it out in the documentationPhilosophical
Have you made a module out of this? If yes, is it on github? I would love to contribute!Maurine
Hi Dschoni. I have indeed made a module out of it. But it is super rough and has some stuff specific to our environment. If I find the time, I will update it so that it is more usable for the public and put it on GitHub for everyone to contribute.Philosophical
Just me, asking againMaurine
Hi Dschoni, unfortunately i do not work at that company anymore and the code is not available to me anymore. Generally, i would recommend using Pentaho Data Integration instead if rebuilding it yourself in Python. Although it was fun, at the current state of Pentaho DI it is probably less pain to use that. It is Open Source, so you can contribute.Philosophical

© 2022 - 2024 — McMap. All rights reserved.