What is the fastest way to pass a message between two processes in python?
Asked Answered
N

0

10

I'm looking for the fastest way (in terms of latency) to communicate between two processes the fact that an event has occurred.

To be more precise I have numpy array in a shared memory where one process (producer) writes updates to an array and other one (consumer) reads them.

Using multiprocessing is required because we need to overcome GIL. Producer is CPU/IO heavy process which listens to data stream and does some data processing.

Consumer is very light and mostly idle process but we need to awaken it as fast as possible when Producer updates an array.

One more thing. It's more important to trigger consumer with minimal latency than to transfer all messages. (E.g. in case producer sends three messages in a row without a delay and consumer receives only first one and looses following two - it's ok.)


I have tried multiprocessing primitives Pipe, Queue, Event for this purpose, it looks like they have are almost the same in terms of latency. Pipe is the most stable.

multiprocessing.Pipe

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, input_pipe):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        input_pipe.send(None)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    (ip, op) = mp.Pipe()
    p = mp.Process(target=main, args=(v, ip,))
    measurements = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        op.recv()
        measurements.append(get_mcs_diff(v.value))
        i += 1

    print(np.percentile(measurements, [50, 90, 95, 99], axis=0))
    p.join()

# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [138.   206.1   238.   383.21]

multiprocessing.Queue

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, q):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        q.put(None)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    q = mp.Queue()
    p = mp.Process(target=main, args=(v, q,))
    measurments = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        q.get()
        measurments.append(get_mcs_diff(v.value))
        i += 1

    print(measurments)
    print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
    p.join()

# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [187.   266.   299.05  444.06]

multiprocessing.Event

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, e):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        e.set()
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    e = mp.Event()
    p = mp.Process(target=main, args=(v, e,))
    measurments = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        e.wait()
        measurments.append(get_mcs_diff(v.value))
        i += 1
        e.clear()
    print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
    p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [142.  222.1  256.05  1754.77]

while True, busy loop

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v):
    time.sleep(1)
    for _ in range(ITER_COUNT):
        v.value = time.time()
        # print(v.value)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    p = mp.Process(target=main, args=(v,))
    measurments = []

    p.start()
    i = 0
    v_prev = 0
    while i < ITER_COUNT:
        # print(v_prev - v.value)
        if v_prev < v.value:
            measurments.append(get_mcs_diff(v.value))
            v_prev = float(v.value)
            i += 1
    print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
    p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [ 33.    65.    81.   128.05]

So far busy loop is the fastest option. But I would like to avoid it because of obvious reason.

Nathanaelnathanial answered 27/9, 2021 at 9:15 Comment(18)
What kind of message do you need to send? The fastest signal is literally a signal.Kidder
Did you try to use Queue?Cottage
Out of curiosity, what is it you're doing that requires latency < 30us in Python?Rosy
@MisterMiyagi blank message is ok, I need to say "event X has occurred", but this event can occur multiple time per second and we need to pass one message per each evenNathanaelnathanial
You say you want to "improve throughput". Please confirm if this is indeed not about latency and what your latency requirements are. Thinking of grouping/aggregating the messages here.Salomesalomi
For the record: Using signals one can easily send about 50% faster. However, even the most trivial concurrency-safe processing means one can only receive / handle about 30%-50% of these signals. (2019 MacBook Pro, 10.14.6, Python 3.9.5)Kidder
Can you please clarify in the question what exactly your requirements for "messages" are? What's the maximum latency and minimum throughput required? Does the main process actually need to be informed promptly, or is valid to for example send bunches of timestamped messages? Do you actually need multiprocessing, or could you use a different concurrency mechanism such as threads or async?Kidder
@MisterMiyagi I have update the question, please take a lookNathanaelnathanial
@EugeneRyabtsev I have update the question, please take a lookNathanaelnathanial
@Cottage I will check it out, as far as I know it's a layer on top of Pipe so in the best case scenario it will be the sameNathanaelnathanial
Can you investigate threading.Event to see if it works for you? I tried it ( ideone.com/fKv02q) and it seems to sort of work, but I suspect this does not measure the actual latency.Salomesalomi
@EugeneRyabtsev I think Event can fire only once and there is no way to use it multiple times? Your example is equivalent to while p.is_alive(): time.sleep(0.001)Nathanaelnathanial
Oh, sorry, forgot this one is not autoreset. Should be are_you_coming.clear() last thing in the while loop.Salomesalomi
@EugeneRyabtsev I added your example to the question, it works but it looks like the same thing as Pipe, maybe a little bit less stable.Nathanaelnathanial
@Cottage i've added queue example, it almost the same as Pipe, a bit slower.Nathanaelnathanial
Hm. We might have to resort to unconventional methods. Is your producer able to say when it will be finishing next calculation "real soon now". I think about signalling at that time and busy looping the consumer until the data is actually ready.Salomesalomi
@EugeneRyabtsev nice idea, yep I think it's possible. In general I would like to avoid busy looping but this idea is the best so far. We will get comparable results to "while True" sectionNathanaelnathanial
I'm afraid if you want better than milisecond latency consistently (and reliably) you shouldn't be running a non-realtime os. On my system, time.time is not even very accurate (counts up by 15 ms increments usually inside a tight loop, with occasional increments as low as 10μs). This is down to the OS scheduler deciding "it's time to wake up this thread, and let it process for a while..." You may get a little benefit out of careful signal handling, and setting a higher thread priority, but true low latency comes from machine code that can't be interrupted...Tullus

© 2022 - 2024 — McMap. All rights reserved.