How should I log while using multiprocessing in Python?
Asked Answered
L

22

326

Right now I have a central module in a framework that spawns multiple processes using the Python 2.6 multiprocessing module. Because it uses multiprocessing, there is module-level multiprocessing-aware log, LOG = multiprocessing.get_logger(). Per the docs, this logger (EDIT) does not have process-shared locks so that you don't garble things up in sys.stderr (or whatever filehandle) by having multiple processes writing to it simultaneously.

The issue I have now is that the other modules in the framework are not multiprocessing-aware. The way I see it, I need to make all dependencies on this central module use multiprocessing-aware logging. That's annoying within the framework, let alone for all clients of the framework. Are there alternatives I'm not thinking of?

Luzern answered 13/3, 2009 at 4:2 Comment(5)
The docs you link to, state the exact opposite of what you say, the logger has no process shared locks and things get mixed up - a problem I had as well.Elite
see examples in the stdlib docs: Logging to a single file from multiple processes. The recipes doesn't require other modules to be multiprocessing-aware.Futrell
So, what is the use case for multiprocessing.get_logger()? It seems based on these other ways of doing logging are the logging functionality in multiprocessing of little value.Typewriting
get_logger() is the logger used by multiprocessing module itself. It is useful if you want to debug a multiprocessing issue.Futrell
Well, the child process can have a parameter logger: Optional[Logger] = None that initialize using logging.getLogger if it's None. That way, you can call the child giving it the multiprocessing logger as a parameter and it will work. The child process will never see or import multiprocessing. Does this make sense?Hamlani
M
94

The only way to deal with this non-intrusively is to:

  1. Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped.
  2. Your controller process can then do one of the following:
    • If using disk files: Coalesce the log files at the end of the run, sorted by timestamp
    • If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)
Mou answered 13/3, 2009 at 4:39 Comment(16)
Nice, that was 35s before I thought of that (thought I'd use atexit :-). Problem is that it won't give you a realtime readout. This may be part of the price of multiprocessing as opposed to multithreading.Luzern
@cdleary, using the piped approach it would be as near-realtime as one can get (especially if stderr is not buffered in the spawned processes.)Mou
+1 I had this general thought too. I especially like your on-the-fly idea.Sharkskin
Okay, but then wouldn't you need the coalescer process to be a central dispatcher that gave each child process a new shared stderr pipe? That would mean that people couldn't use the libraries traditionally, but would have to hand a callback over to the coalescer/dispatcher.Luzern
And by "shared stderr pipe" I don't mean shared among child processes, but shared between the coalescer and child process, as you're describing.Luzern
Do you have control between forks? If so you just dup new per-child fd's over stderr (2) just before forking a new child; the child's stderr (2) output will automatically be picked up by the spawner process' coalescer through the corresponding per-child fd.Mou
Incidentally, big assumption here: not Windows. Are you on Windows?Mou
I'm using POSIX (but os.dup is on both platforms). I don't see how you can get around the fact you need the select in a centralized event loop, which would presumably be where the coalescer lives. Am I missing something?Luzern
So anyway, if you are on *nix (i.e. multiprocess is using fork) you can dup a new fd over stderr (2) in Process.start (just before multiprocess calls self._popen = Popen(self), where Popen will do the actual fork) - check out the source code in lib/process.py, lib/forking.pyMou
Using Pool? If so then you'd have to use the async variants of map or apply and do the select loop until you get all results. Or spawn a thread (har-har) to do the select polling. :)Mou
Wow... that's crazy but would work. Spawn a coalescer thread so that it shares the main process' stderr locks, (hide the real sys.stderr in the coalescer, give sys a fake one for the coalescer to select on), have the coalescer terminate after join on subprocesses, and join on the coalescer atexit.Luzern
Yes it will. :) Used this approach a while ago (but in perl, not python) to coalesce real-time log output from multiple remote ssh sessions. have fun!Mou
@BrandonRhodes - As I said, non-intrusively. Using multiprocessing.Queue will not be simpler if there is a lot of code to rewire to use multiprocessing.Queue, and/or if performance is an issueMou
@Mou The benchmark is inconclusive in this scenario. You would have to compare multiple pipes against one queue.Felixfeliza
@Mou Plus, your are blindly assuming that everyone wants a log stream. But what if you want to use advanced handlers (SocketHandler, Sentry, ...) and still collect the entries in main process before further processing? And you would have to configure loggers (in every subprocess) to send the entries to a stream so how exactly do you define non-intrusively?Felixfeliza
@Felixfeliza you may want to re-read the OP before commenting; I do not assume a log stream, rather the OP clearly states that the legacy code already writes to a stream (stderr) and that he still expects aggregate logging to go to a stream, albeit with some degree of line-level atomicity (non-garbled). Do you now see why this method is non-intrusive for the OP? As for the benchmark, the number of pipes is not relevant; the gains come from buffering reducing the actual number of system calls (and impact on client performance) in exchange for extra latency in the aggregator proc.Mou
F
151

I just now wrote a log handler of my own that just feeds everything to the parent process via a pipe. I've only been testing it for ten minutes but it seems to work pretty well.

(Note: This is hardcoded to RotatingFileHandler, which is my own use case.)


Update: @javier now maintains this approach as a package available on Pypi - see multiprocessing-logging on Pypi, github at https://github.com/jruere/multiprocessing-logging


Update: Implementation!

This now uses a queue for correct handling of concurrency, and also recovers from errors correctly. I've now been using this in production for several months, and the current version below works without issue.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
Findlay answered 21/5, 2009 at 18:10 Comment(24)
One nit: you need to import traceback as well.Decolorant
Is this code actually handling the problem? When you spawn off several processes using this handler, each process will have its own thread to check the queue and they write concurrently to the log file just like RotatingFileHandler would? Not only can log entries get mixed up, I also got stuff from different processes end up in different files as the rotation didn't seem to work well. Inspired by your solution though, I split the code, so I check on the queue in a thread that I start after spawning the processes.Elite
The above handler does all the file writing from the parent process and uses just one thread to receive messages passed from child processes. If you invoke the handler itself from a spawned child process then that's using it incorrectly, and you'll get all the same issues as RotatingFileHandler. I've used the above code for years with no issue.Findlay
Unfortunately this approach doesn't work on Windows. From docs.python.org/library/multiprocessing.html 16.6.2.12 "Note that on Windows child processes will only inherit the level of the parent process’s logger – any other customization of the logger will not be inherited." Subprocesses won't inherit the handler, and you can't pass it explicitly because it's not pickleable.Imperfection
I think that only refers to the logger that's hardwired into the multiprocessing module. This recipe isn't making any usage of that nor should it care about propagation of loglevels, it just shuttles data from child to parent using normal multiprocessing channels. Have you tried it ?Findlay
I realize this is a year later, but I think the last comment is wrong. It relies on the effects of logging.getLogger().addHandler(…) being propagated to the children, which is exactly what doesn't work. Sure, it doesn't rely on propagation of loglevels, but that's the part that does work, so who cares?Avantgarde
I see no documentation which says this (but point me to what you're seeing). When they talk about "multiprocessing.get_logger()" in that linked document, it is a hardwired logger in the multiprocessing module. it says nothing about the addHandler() method of the generic logging module. Again, have you tried it ?Findlay
Somehow not all messages get to file and some lines are garbled. Could you expand on how to properly instantiate the logger on Windows? I refer to your class from config read by logging.config.fileConfig. I read config and get a logger right after all import statements.Ophite
It worth noting that multiprocessing.Queue uses a thread to in put(). So do not invoke put (i.e. log a msg using MultiProcessingLog handler) before creating all subprocesses. Otherwise the thread will be dead in child process. One solution is to call Queue._after_fork() at the start of each child process, or to use multiprocessing.queues.SimpleQueue instead, which does not involve thread but is blocking.Aerialist
Could you add a simple example that shows initialization, as well as usage from a hypothetical child process? I'm not quite sure how the child process is supposed to get access to the queue without instantiating another instance of your class.Knew
@Knew The handler above is assigned using addHandler() or setting it up in fileConfig() like any other handler. It's established at application startup time before any child process is created. The child process (on a POSIX system) will inherit the handler instance.Findlay
@Findlay Thanks for your quick response. After toying with this a bit more, it appears this currently only works Linux (possibly Mac, I haven't tried). I've created a gist of what I ran on both platforms and the resulting output here. Would you happen to have an idea of what possible modifications would need to be made to get this working on both linux and windows? This is my first foray into multiprocessing with Python, which is why I'm having a rough time of this.Knew
ive no experience with multiprocessing on windows or even how it could possibly work as i thought windows has no fork().Findlay
@zzzeek, this solution is good but I couldn't find a package with it or something similar so I created one called multiprocessing-logging.Bourdon
Thanks for this post, and for Javier`s package to help understand the usage!Dingus
Just added this into my process based pipeline -- works great: ` logger = logging.getLogger('my_logger') logger.setLevel(logging.DEBUG) handler = MultiProcessingLog("pipeline/logs/job-1.csv", "a", 0, 0) logger.addHandler(handler) `Farika
@javier someone just pointed me here that you made a legit package so I've added it to the top, thanks!Findlay
Thanks! It has some tweaks that you might want to check!Bourdon
@Findlay does this work on a cluster with multiple machinesBituminize
@Bourdon I have tried to use multiprocessing-logging and multiprocessing_logging.install_mp_handler(logger) but it seems that once a non-multiprocess line is written to the log it 'looses' its multiprocessing logging abilities. So using this code I can either log multiprocessing functions or non-multiprocessing but not both... is there a way?Goosefish
@ShaniShalgi please open a ticket in GitHub. This is not a good place for this discussion.Bourdon
I opened a ticket but it also relates to the code written above.Goosefish
This won't work when the multiprocessing context is set to spawn, right? With the spawn context, new processes are fresh Python interpreters, not forks of the existing one, so they don't inherit anything from the main process.Standup
@EricFrechette I just checked and this solution does not work if the context is set to spawnNonfulfillment
M
94

The only way to deal with this non-intrusively is to:

  1. Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped.
  2. Your controller process can then do one of the following:
    • If using disk files: Coalesce the log files at the end of the run, sorted by timestamp
    • If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)
Mou answered 13/3, 2009 at 4:39 Comment(16)
Nice, that was 35s before I thought of that (thought I'd use atexit :-). Problem is that it won't give you a realtime readout. This may be part of the price of multiprocessing as opposed to multithreading.Luzern
@cdleary, using the piped approach it would be as near-realtime as one can get (especially if stderr is not buffered in the spawned processes.)Mou
+1 I had this general thought too. I especially like your on-the-fly idea.Sharkskin
Okay, but then wouldn't you need the coalescer process to be a central dispatcher that gave each child process a new shared stderr pipe? That would mean that people couldn't use the libraries traditionally, but would have to hand a callback over to the coalescer/dispatcher.Luzern
And by "shared stderr pipe" I don't mean shared among child processes, but shared between the coalescer and child process, as you're describing.Luzern
Do you have control between forks? If so you just dup new per-child fd's over stderr (2) just before forking a new child; the child's stderr (2) output will automatically be picked up by the spawner process' coalescer through the corresponding per-child fd.Mou
Incidentally, big assumption here: not Windows. Are you on Windows?Mou
I'm using POSIX (but os.dup is on both platforms). I don't see how you can get around the fact you need the select in a centralized event loop, which would presumably be where the coalescer lives. Am I missing something?Luzern
So anyway, if you are on *nix (i.e. multiprocess is using fork) you can dup a new fd over stderr (2) in Process.start (just before multiprocess calls self._popen = Popen(self), where Popen will do the actual fork) - check out the source code in lib/process.py, lib/forking.pyMou
Using Pool? If so then you'd have to use the async variants of map or apply and do the select loop until you get all results. Or spawn a thread (har-har) to do the select polling. :)Mou
Wow... that's crazy but would work. Spawn a coalescer thread so that it shares the main process' stderr locks, (hide the real sys.stderr in the coalescer, give sys a fake one for the coalescer to select on), have the coalescer terminate after join on subprocesses, and join on the coalescer atexit.Luzern
Yes it will. :) Used this approach a while ago (but in perl, not python) to coalesce real-time log output from multiple remote ssh sessions. have fun!Mou
@BrandonRhodes - As I said, non-intrusively. Using multiprocessing.Queue will not be simpler if there is a lot of code to rewire to use multiprocessing.Queue, and/or if performance is an issueMou
@Mou The benchmark is inconclusive in this scenario. You would have to compare multiple pipes against one queue.Felixfeliza
@Mou Plus, your are blindly assuming that everyone wants a log stream. But what if you want to use advanced handlers (SocketHandler, Sentry, ...) and still collect the entries in main process before further processing? And you would have to configure loggers (in every subprocess) to send the entries to a stream so how exactly do you define non-intrusively?Felixfeliza
@Felixfeliza you may want to re-read the OP before commenting; I do not assume a log stream, rather the OP clearly states that the legacy code already writes to a stream (stderr) and that he still expects aggregate logging to go to a stream, albeit with some degree of line-level atomicity (non-garbled). Do you now see why this method is non-intrusive for the OP? As for the benchmark, the number of pipes is not relevant; the gains come from buffering reducing the actual number of system calls (and impact on client performance) in exchange for extra latency in the aggregator proc.Mou
F
61

QueueHandler is native in Python 3.2+, and does exactly this. It is easily replicated in previous versions.

Python docs have two complete examples: Logging to a single file from multiple processes

Each process (including the parent process) puts its logging on the Queue, and then a listener thread or process (one example is provided for each) picks those up and writes them all to a file - no risk of corruption or garbling.

For those using Python < 3.2, import logutils (which is the same as the python 3.2 native code).

Footwork answered 18/8, 2015 at 6:47 Comment(2)
this should be the accepted answer at least from the advent of QueueHandler. It is not intrusive, transparent, and works regardless what logger configuration(s) the main process is using. The workers are always logging to the their configured QueueHandler Also does not expect any kind of logger configuration from the parent to the spawn child processExclude
I don't like it that much tho, the process must accept the parameters queue an work_configuration and call work_configuration(queue)? Is a horrible solution from my povHamlani
G
43

Below is another solution with a focus on simplicity for anyone else (like me) who get here from Google. Logging should be easy! Only for 3.2 or higher.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
Gawain answered 23/1, 2016 at 13:59 Comment(4)
The QueueHandler and QueueListener classes can be used on Python 2.7 as well, available in the logutils package.Strengthen
The logger of the main process should also use a QueueHandler. In your current code, the main process is bypassing the queue so there can be race conditions between the main process and workers ones. Everyone should log to the queue (via a QueueHandler) and only the QueueListener should be allowed to log to the StreamHandler.Morganne
Also, you don't have to initial the logger in each child. Just initial the logger in the parent process, and get the logger in each child process.Pacifism
awesome answer! thanks for sharing! In case you're dealing with multiple handlers that use different levels, make sure to initialize the listener with QueueListener(q, *handlers, respect_handler_level=True)Hoy
H
30

As of 2020 it seems there is a simpler way of logging with multiprocessing.

This function will create the logger. You can set the format here and where you want your output to go (file, stdout):

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(\
        '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
    handler = logging.FileHandler('logs/your_file_name.log')
    handler.setFormatter(formatter)

    # this bit will make sure you won't have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger

In the init you instantiate the logger:

if __name__ == '__main__': 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info('Starting pooling')
    p = Pool()
    # rest of the code

Now, you only need to add this reference in each function where you need logging:

logger = create_logger()

And output messages:

logger.info(f'My message from {something}')

Hope this helps.

Herzig answered 6/7, 2020 at 6:50 Comment(6)
This seems like the most straightforward solution now. Note that the "if not len(logger.handlers)" part is assuming you will use a single handler. If you want to have more than one handler to, e.g., send all messages to a file but only INFO and above to stdout, then you'll need to adjust that part.Soulier
Normally you have vast amounts of code that just does import logging and then uses things like 'logging.info("whatever")" - there's no place you can pass a logger object to anything, and there's no chance you can retrofit that code.Distributive
This works but it's not very flexible. For example, once you put create_logger() into all your functions, there's no way to turn off logging in case someone else wants to use your library with their own application. Best practice for libraries is to never force anyone to see the log messages.Euphonious
@JamesMoore I haven't tried using logging.info(..) with multiprocessing. If this works, I'm happy to update the answer.Herzig
@Euphonious Agree. Perhaps, this code isn't intended to work in a library. I used it when I needed to debug a multiprocessing scrapper for an ad-hoc task.Herzig
In order to avoid inserting create_logger() in every function, you can set globals()['logger'] = multiprocessing.get_logger() once inside the subprocess, assuming your functions use logger.info(...)Toadeater
E
22

Yet another alternative might be the various non-file-based logging handlers in the logging package:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(and others)

This way, you could easily have a logging daemon somewhere that you could write to safely and would handle the results correctly. (E.g., a simple socket server that just unpickles the message and emits it to its own rotating file handler.)

The SyslogHandler would take care of this for you, too. Of course, you could use your own instance of syslog, not the system one.

Endarch answered 13/3, 2009 at 11:19 Comment(0)
D
14

A variant of the others that keeps the logging and queue thread separate.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
Dishrag answered 15/7, 2010 at 7:41 Comment(4)
I like an idea of fetching logger name from queue record. This allows to use conventional fileConfig() in MainProcess and a barely configured logger in PoolWorkers (with only setLevel(logging.NOTSET)). As I mentioned in another comment, I'm using Pool so I had to obtain my Queue (proxy) from Manager instead of multiprocessing so it can be pickled. This allows me to pass queue to a worker inside of a dictionary (most of which is derived from argsparse object using vars()). I feel like in the end this is the best approach for MS Windows that lacks fork() and breaks @zzzeak solution.Ophite
@Ophite I think you could also put a multiprocessing Queue in the init instead of using a Manager (see answer to #25558186 - it's about Locks but I believe it works for Queues as well)Footwork
@Footwork That won't work on MS Windows or any other platform that lacks fork. That way each process will have its own independent useless queue. The second approach in the linked Q/A won't work on such platforms. It is a way to non-portable code.Ophite
@Ophite Interesting. I'm using Windows and it seems to work ok for me - not long after I last commented I set up a pool of processes sharing a multiprocessing.Queue with the main process and I've been using it constantly since. Won't claim to understand why it works though.Footwork
F
11

All current solutions are too coupled to the logging configuration by using a handler. My solution has the following architecture and features:

  • You can use any logging configuration you want
  • Logging is done in a daemon thread
  • Safe shutdown of the daemon by using a context manager
  • Communication to the logging thread is done by multiprocessing.Queue
  • In subprocesses, logging.Logger (and already defined instances) are patched to send all records to the queue
  • New: format traceback and message before sending to queue to prevent pickling errors

Code with usage example and output can be found at the following Gist: https://gist.github.com/schlamar/7003737

Felixfeliza answered 16/10, 2013 at 7:31 Comment(3)
Unless I'm missing something, this isn't actually a daemon thread, since you never set daemon_thread.daemon to True. I needed to do that in order to get my Python program to exit properly when an exception occurs within the context manager.Perry
I also needed to catch, log and swallow exceptions thrown by the target func in logged_call, otherwise the exception would get garbled with other logged output. Here's my modified version of this: gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bfPerry
@Perry If you set @Felixfeliza 's daemon (let's call it the QueueListener, for a better naming) as an actual daemon thread, you risk that being abruptly stopped when the main program exits. Imagine the queue has buffered quite a lot of messages, the main program comes to the end, exits the context manager, the None sentinel is added on top of the full queue, and then the main process terminates before the listener (daemon) is able to dequeue and handle all log messages. You would lose those messages. How are you handling this situation in your code?Checked
D
11

Since we can represent multiprocess logging as many publishers and one subscriber (listener), using ZeroMQ to implement PUB-SUB messaging is indeed an option.

Moreover, PyZMQ module, the Python bindings for ZMQ, implements PUBHandler, which is object for publishing logging messages over a zmq.PUB socket.

There's a solution on the web, for centralized logging from distributed application using PyZMQ and PUBHandler, which can be easily adopted for working locally with multiple publishing processes.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
Delarosa answered 19/10, 2016 at 9:11 Comment(1)
A stdlib version using sockets is provided here: docs.python.org/3/howto/…, though it may need some slight adjustments as per use-caseToadeater
A
6

I also like zzzeek's answer but Andre is correct that a queue is required to prevent garbling. I had some luck with the pipe, but did see garbling which is somewhat expected. Implementing it turned out to be harder than I thought, particularly due to running on Windows, where there are some additional restrictions about global variables and stuff (see: How's Python Multiprocessing Implemented on Windows?)

But, I finally got it working. This example probably isn't perfect, so comments and suggestions are welcome. It also does not support setting the formatter or anything other than the root logger. Basically, you have to reinit the logger in each of the pool processes with the queue and set up the other attributes on the logger.

Again, any suggestions on how to make the code better are welcome. I certainly don't know all the Python tricks yet :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
Aemia answered 17/6, 2009 at 21:15 Comment(3)
I wonder if if 'MainProcess' == multiprocessing.current_process().name: can be used in place of passing child?Ophite
In case someone else is trying to use process pool instead of separate process objects on Windows, it worth mentioning that Manager shall be used to pass queue to subprocesses as it is not picklable directly.Ophite
This implementation worked well for me. I modified it to work with an arbitrary number of handlers. This way you can configure you root handler in a non-multiprocessing fashion, then where it is safe to make the queue, pass the root handlers to this, delete them, and make this the only handler.Haemophiliac
R
5

I'd like to suggest to use the logger_tt library: https://github.com/Dragon2fly/logger_tt

The multiporcessing_logging library is not working on my macOSX, while logger_tt does.

Roller answered 6/3, 2021 at 2:53 Comment(2)
I don't know why there is a disagree with my answer. logger_tt library is certainly most friendly logging library for multiprocessing.Roller
This worked for me when testing. But for Linux users, I added this configuration as mentioned in the docs: from logger_tt import setup_logging and then setup_logging(full_context=1, use_multiprocessing="fork"). Warning: Don't perform unless you know the effect of fork in multiprocessing!Kelsy
P
4

The concurrent-log-handler seems to do the job perfectly. Tested on Windows. Supports also POSIX systems.

Main idea

  • Create a separate file with a function that returns a logger. The logger must have fresh instance of ConcurrentRotatingFileHandler for each process. Example function get_logger() given below.
  • Creating loggers is done at the initialization of the process. For a multiprocessing.Process subclass it would mean the beginning of the run() method.

Detailed instructions

I this example, I will use the following file structure

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

Code

Child process

# child.py 

import multiprocessing as mp
import time
from somemodule import do_something


class ChildProcess(mp.Process):
    def __init__(self):
        self.logger = None
        super().__init__()

    def run(self):
        from logs import get_logger
        self.logger = get_logger()


        while True:
            time.sleep(1)
            self.logger.info("Child process")
            do_something()

  • Simple child process that inherits multiprocessing.Process and simply logs to file text "Child process"
  • Important: The get_logger() is called inside the run(), or elsewhere inside the child process (not module level or in __init__().) This is required as get_logger() creates ConcurrentRotatingFileHandler instance, and new instance is needed for each process.
  • The do_something is used just to demonstrate that this works with 3rd party library code which does not have any clue that you are using concurrent-log-handler.

Main Process

# main.py

import logging
import multiprocessing as mp
import time

from child import ChildProcess
from somemodule import do_something


class MainProcess(mp.Process):
    def __init__(self):
        self.logger = logging.getLogger()
        super().__init__()

    def run(self):
        from logs import get_logger

        self.logger = get_logger()
        self.child = ChildProcess()
        self.child.daemon = True
        self.child.start()

        while True:
            time.sleep(0.5)
            self.logger.critical("Main process")
            do_something()


  • The main process that logs into file two times a second "Main process". Also inheriting from multiprocessing.Process.
  • Same comments for get_logger() and do_something() apply as for the child process.

Logger setup

# logs.py

import logging
import os

from concurrent_log_handler import ConcurrentRotatingFileHandler

LOGLEVEL = logging.DEBUG


def get_logger():
    logger = logging.getLogger()

    if logger.handlers:
        return logger

    # Use an absolute path to prevent file rotation trouble.
    logfile = os.path.abspath("mylog.log")

    logger.setLevel(LOGLEVEL)

    # Rotate log after reaching 512K, keep 5 old copies.
    filehandler = ConcurrentRotatingFileHandler(
        logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
    )
    filehandler.setLevel(LOGLEVEL)

    # create also handler for displaying output in the stdout
    ch = logging.StreamHandler()
    ch.setLevel(LOGLEVEL)

    formatter = logging.Formatter(
        "%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
    )

    # add formatter to ch
    ch.setFormatter(formatter)
    filehandler.setFormatter(formatter)

    logger.addHandler(ch)
    logger.addHandler(filehandler)

    return logger
  • This uses the ConcurrentRotatingFileHandler from the concurrent-log-handler package. Each process needs a fresh ConcurrentRotatingFileHandler instance.
  • Note that all the arguments for the ConcurrentRotatingFileHandler should be the same in every process.

Example app

# myapp.py 

if __name__ == "__main__":
    from main import MainProcess

    p = MainProcess()
    p.start()
  • Just a simple example on how to start the multiprocess application

Example of 3rd party module using standard logging

# somemodule.py 

import logging

logger = logging.getLogger("somemodule")

def do_something():
    logging.info("doing something")

  • Just a simple example to test if loggers from 3rd party code will work normally.

Example output

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]

Prittleprattle answered 19/4, 2021 at 16:5 Comment(0)
B
3

just publish somewhere your instance of the logger. that way, the other modules and clients can use your API to get the logger without having to import multiprocessing.

Bourdon answered 13/3, 2009 at 4:40 Comment(4)
The problem with this is that the multiprocessing loggers appear unnamed, so you won't be able to decipher the message stream easily. Maybe it would be possible to name them after creation, which would make it more reasonable to look at.Luzern
well, publish one logger for each module, or better, export diferent closures that use the logger with the module name. the point is to let other modules use your APIBourdon
Definitely reasonable (and +1 from me!), but I would miss being able to just import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!') from anywhere and have it work properly.Luzern
It's an interesting phenomenon that I see when I use Python, that we get so used to being able to do what we want in 1 or 2 simple lines that the simple and logical approach in other languages (eg. to publish the multiprocessing logger or wrap it in an accessor) still feels like a burden. :)Danialdaniala
D
2

How about delegating all the logging to another process that reads all log entries from a Queue?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Simply share LOG_QUEUE via any of the multiprocess mechanisms or even inheritance and it all works out fine!

Dick answered 12/3, 2014 at 23:13 Comment(0)
F
2

Below is a class that can be used in Windows environment, requires ActivePython. You can also inherit for other logging handlers (StreamHandler etc.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

And here is an example that demonstrates usage:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
Fafnir answered 1/6, 2016 at 6:57 Comment(1)
Probably using multiprocessing.Lock() instead of Windows Mutex would make the solution portable.Halfpenny
A
1

I have a solution that's similar to ironhacker's except that I use logging.exception in some of my code and found that I needed to format the exception before passing it back over the Queue since tracebacks aren't pickle'able:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
Autograph answered 5/8, 2010 at 6:11 Comment(1)
I found a complete example along these lines here.Villar
G
1

If you have deadlocks occurring in a combination of locks, threads and forks in the logging module, that is reported in bug report 6721 (see also related SO question).

There is a small fixup solution posted here.

However, that will just fix any potential deadlocks in logging. That will not fix that things are maybe garbled up. See the other answers presented here.

Grime answered 26/3, 2015 at 12:4 Comment(0)
A
1

Here's my simple hack/workaround... not the most comprehensive, but easily modifiable and simpler to read and understand I think than any other answers I found before writing this:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
Apiece answered 13/9, 2016 at 16:55 Comment(0)
D
1

There is this great package

Package: https://pypi.python.org/pypi/multiprocessing-logging/

code: https://github.com/jruere/multiprocessing-logging

Install:

pip install multiprocessing-logging

Then add:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
Diazotize answered 7/4, 2018 at 2:55 Comment(3)
This library is literally based off of another comment on the current SO post: https://mcmap.net/q/98604/-how-should-i-log-while-using-multiprocessing-in-python.Chamberlin
Origins: https://mcmap.net/q/98604/-how-should-i-log-while-using-multiprocessing-in-python I appreciate the example usage of the module, in addition to the documentation on the homepage.Stu
This module does not work if the multiprocessing context is spawn (default on >3.8 with MacOS)Nonfulfillment
J
1

For whoever might need this, I wrote a decorator for multiprocessing_logging package that adds the current process name to logs, so it becomes clear who logs what.

It also runs install_mp_handler() so it becomes unuseful to run it before creating a pool.

This allows me to see which worker creates which logs messages.

Here's the blueprint with an example:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')
Jett answered 15/1, 2020 at 22:51 Comment(0)
L
0

One of the alternatives is to write the mutliprocessing logging to a known file and register an atexit handler to join on those processes read it back on stderr; however, you won't get a real-time flow to the output messages on stderr that way.

Luzern answered 13/3, 2009 at 4:40 Comment(1)
is the approach you are proposing below identical to the one from your comment here #641920Absenteeism
S
0

Simplest idea as mentioned:

  • Grab the filename and the process id of the current process.
  • Set up a [WatchedFileHandler][1]. The reasons for this handler are discussed in detail here, but in short there are certain worse race conditions with the other logging handlers. This one has the shortest window for the race condition.
    • Choose a path to save the logs to such as /var/log/...
Swansea answered 5/10, 2019 at 6:38 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.