Deadlock with logging multiprocess/multithread python script
Asked Answered
N

3

8

I am facing the problem with collecting logs from the following script. Once I set up the SLEEP_TIME to too "small" value, the LoggingThread threads somehow block the logging module. The script freeze on logging request in the action function. If the SLEEP_TIME is about 0.1 the script collect all log messages as I expect.

I tried to follow this answer but it does not solve my problem.

import multiprocessing
import threading
import logging
import time

SLEEP_TIME = 0.000001

logger = logging.getLogger()

ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(): %(message)s'))
ch.setLevel(logging.DEBUG)

logger.setLevel(logging.DEBUG)
logger.addHandler(ch)


class LoggingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        while True:
            logger.debug('LoggingThread: {}'.format(self))
            time.sleep(SLEEP_TIME)


def action(i):
    logger.debug('action: {}'.format(i))


def do_parallel_job():

    processes = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=processes)
    for i in range(20):
        pool.apply_async(action, args=(i,))
    pool.close()
    pool.join()



if __name__ == '__main__':

    logger.debug('START')

    #
    # multithread part
    #
    for _ in range(10):
        lt = LoggingThread()
        lt.setDaemon(True)
        lt.start()

    #
    # multiprocess part
    #
    do_parallel_job()

    logger.debug('FINISH')

How to use logging module in multiprocess and multithread scripts?

Nuremberg answered 1/7, 2014 at 11:54 Comment(3)
I can't seem to reproduce your problem. Can you provide the thread creation/launching code?Peonage
Just to be sure: the execution of action() freezes (log message with 'actions:' is never produced). LoggingThreads do their job forever.Parasynthesis
The probability of the deadlock depends on SLEEP_TIME value.Parasynthesis
M
8

This is probably bug 6721.

The problem is common in any situation where you have locks, threads and forks. If thread 1 had a lock while thread 2 calls fork, in the forked process, there will only be thread 2 and the lock will be held forever. In your case, that is logging.StreamHandler.lock.

A fix can be found here (permalink) for the logging module. Note that you need to take care of any other locks, too.

Malnourished answered 26/3, 2015 at 11:25 Comment(0)
F
0

I've run into similar issue just recently while using logging module together with Pathos multiprocessing library. Still not 100% sure, but it seems, that in my case the problem may have been caused by the fact, that logging handler was trying to reuse a lock object from within different processes.

Was able to fix it with a simple wrapper around default logging Handler:

import threading
from collections import defaultdict
from multiprocessing import current_process

import colorlog


class ProcessSafeHandler(colorlog.StreamHandler):
    def __init__(self):
        super().__init__()

        self._locks = defaultdict(lambda: threading.RLock())

    def acquire(self):
        current_process_id = current_process().pid
        self._locks[current_process_id].acquire()

    def release(self):
        current_process_id = current_process().pid
        self._locks[current_process_id].release()
Faun answered 4/9, 2019 at 5:16 Comment(0)
B
0

By default, multiprocessing will fork() the process in the pool when running on Linux. The resulting subprocess will lose all running threads except for the main one. So if you're on Linux, that's the problem.

First action item: You shouldn't ever use the fork()-based pool; see https://pythonspeed.com/articles/python-multiprocessing/ and https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods.

On Windows, and I think newer versions of Python on macOS, the "spawn"-based pool is used. This is also what you ought use on Linux. In this setup, a new Python process is started. As you would expect, the new process doesn't have any of the threads from the parent process, because it's a new process.

Second action item: you'll want to have logging setup done in each subprocess in the pool; the logging setup for the parent process isn't sufficient to get logs from the worker processes. You do this with the initializer keyword argument to Pool, e.g. write a function called setup_logging() and then do pool = multiprocessing.Pool(initializer=setup_logging) (https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool).

Beguile answered 5/7, 2022 at 14:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.