Python multiprocessing Queue NotImplementedError macOS
Asked Answered
T

2

13

System information

To reproduce on Big Sur and most probably older versions:

import multiprocessing as mp


if __name__ == '__main__':
    exp_queue = mp.Queue()
    print(exp_queue.qsize())

Results in:

  File "/Users/username/Library/Application Support/JetBrains/PyCharm2020.3/scratches/scratch.py", line 5, in <module>
    print(exp_queue.qsize())
  File "/usr/local/Cellar/[email protected]/3.8.7/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 120, in qsize
    return self._maxsize - self._sem._semlock._get_value()
NotImplementedError

It looks like whoever wrote this in multiprocessing/queues.py line 120 is aware of the issue, but I can't find a solution somewhere:

def qsize(self):
    # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
    return self._maxsize - self._sem._semlock._get_value()
Ticket answered 7/1, 2021 at 9:13 Comment(3)
What is your question?Spagyric
What do you think?Ticket
What is wrong with python devs? They just said "eh screw mac users". It's not a platofrm limitation. There are plenty of viable ways to implement. Lazy gits...Bedtime
S
6

Simplest solution: use multiprocessing.Manager. It provides a Queue class with qsize() implemented. You don't have to copypasta GPL-licensed code. Quick example:

import multiprocessing
import time

def worker(x, que):
    que.put(x**2)

if __name__ == '__main__':
    inputs = list(range(1000))

    pool = multiprocessing.Pool(processes=5)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = [pool.apply_async(worker, (i, q)) for i in inputs]
    while q.qsize() < len(inputs):
        time.sleep(1)

    results = [q.get() for _ in range(q.qsize())]
    assert len(results) == len(inputs)

I'm running Ventura 13.0.1 (macOS) and it works fine.

Stunning answered 14/12, 2022 at 15:26 Comment(0)
M
4

As Víctor Terrón has suggested in a GitHub discussion, you can use his implementation:

https://github.com/vterron/lemon/blob/d60576bec2ad5d1d5043bcb3111dff1fcb58a8d6/methods.py#L536-L573

According to the doc:

A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using a synchronized shared counter (initialized to zero) and increasing / decreasing its value every time the put() and get() methods are called, respectively. This not only prevents NotImplementedError from being raised, but also allows us to implement a reliable version of both qsize() and empty().

Melidamelilot answered 4/4, 2021 at 10:26 Comment(2)
To anyone finding this, be aware that Víctor Terrón's Queue implementation is licensed under the GPL, meaning that you can't use it in your code without also making your code GPL licensed.Gasometer
And python devs couldn't implement this themselves because...? They're really so lazy that they'll screw over an entire OS instead of implementing qsize another way?Bedtime

© 2022 - 2024 — McMap. All rights reserved.