I'm looking for the fastest way (in terms of latency) to communicate between two processes the fact that an event has occurred.
To be more precise I have numpy array in a shared memory where one process (producer) writes updates to an array and other one (consumer) reads them.
Using multiprocessing is required because we need to overcome GIL. Producer is CPU/IO heavy process which listens to data stream and does some data processing.
Consumer is very light and mostly idle process but we need to awaken it as fast as possible when Producer updates an array.
One more thing. It's more important to trigger consumer with minimal latency than to transfer all messages. (E.g. in case producer sends three messages in a row without a delay and consumer receives only first one and looses following two - it's ok.)
I have tried multiprocessing primitives Pipe, Queue, Event for this purpose, it looks like they have are almost the same in terms of latency. Pipe is the most stable.
multiprocessing.Pipe
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v, input_pipe):
for _ in range(ITER_COUNT):
v.value = time.time()
input_pipe.send(None)
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
(ip, op) = mp.Pipe()
p = mp.Process(target=main, args=(v, ip,))
measurements = []
p.start()
i = 0
while i < ITER_COUNT:
op.recv()
measurements.append(get_mcs_diff(v.value))
i += 1
print(np.percentile(measurements, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [138. 206.1 238. 383.21]
multiprocessing.Queue
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v, q):
for _ in range(ITER_COUNT):
v.value = time.time()
q.put(None)
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
q = mp.Queue()
p = mp.Process(target=main, args=(v, q,))
measurments = []
p.start()
i = 0
while i < ITER_COUNT:
q.get()
measurments.append(get_mcs_diff(v.value))
i += 1
print(measurments)
print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [187. 266. 299.05 444.06]
multiprocessing.Event
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v, e):
for _ in range(ITER_COUNT):
v.value = time.time()
e.set()
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
e = mp.Event()
p = mp.Process(target=main, args=(v, e,))
measurments = []
p.start()
i = 0
while i < ITER_COUNT:
e.wait()
measurments.append(get_mcs_diff(v.value))
i += 1
e.clear()
print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [142. 222.1 256.05 1754.77]
while True, busy loop
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v):
time.sleep(1)
for _ in range(ITER_COUNT):
v.value = time.time()
# print(v.value)
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
p = mp.Process(target=main, args=(v,))
measurments = []
p.start()
i = 0
v_prev = 0
while i < ITER_COUNT:
# print(v_prev - v.value)
if v_prev < v.value:
measurments.append(get_mcs_diff(v.value))
v_prev = float(v.value)
i += 1
print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [ 33. 65. 81. 128.05]
So far busy loop is the fastest option. But I would like to avoid it because of obvious reason.
signal
. – KidderQueue
? – Cottagesignals
one can easily send about 50% faster. However, even the most trivial concurrency-safe processing means one can only receive / handle about 30%-50% of these signals. (2019 MacBook Pro, 10.14.6, Python 3.9.5) – Kidderasync
? – Kidderwhile p.is_alive(): time.sleep(0.001)
– Nathanaelnathanialare_you_coming.clear()
last thing in the while loop. – Salomesalomitime.time
is not even very accurate (counts up by 15 ms increments usually inside a tight loop, with occasional increments as low as 10μs). This is down to the OS scheduler deciding "it's time to wake up this thread, and let it process for a while..." You may get a little benefit out of careful signal handling, and setting a higher thread priority, but true low latency comes from machine code that can't be interrupted... – Tullus