Interprocess communication in Python
Asked Answered
S

8

131

What is a good way to communicate between two separate Python runtimes? Things tried:

  • reading/writing on named pipes e.g. os.mkfifo (feels hacky)
  • dbus services (worked on desktop, but too heavyweight for headless)
  • sockets (seems too low-level; surely there's a higher level module to use?)

My basic requirement is to be able to run python listen.py like a daemon, able to receive messages from python client.py. The client should just send a message to the existing process and terminate, with return code 0 for success and nonzero for failure (i.e. a two-way communication will be required.)

Sterne answered 3/8, 2011 at 1:57 Comment(0)
H
197

The multiprocessing library provides listeners and clients that wrap sockets and allow you to pass arbitrary python objects.

Your server could listen to receive python objects:

from multiprocessing.connection import Listener

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
listener = Listener(address, authkey=b'secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
while True:
    msg = conn.recv()
    # do something with msg
    if msg == 'close':
        conn.close()
        break
listener.close()

Your client could send commands as objects:

from multiprocessing.connection import Client

address = ('localhost', 6000)
conn = Client(address, authkey=b'secret password')
conn.send('close')
# can also send arbitrary objects:
# conn.send(['a', 2.5, None, int, sum])
conn.close()
Hendrix answered 3/8, 2011 at 3:44 Comment(6)
Worked well for me! And easy to run in the background using threading.ThreadAndeee
Great solution. After reading the docs, specifically the section on Authentication Keys, it seems like the data transferred between client and server is not encrypted. Anyone know the best way to encrypt data transmitted between client and server?Elbertine
Are there any rules for which port number I should be using? And how should I detect if a port is already being used or not?Mauceri
It's a great day when you get what you come in for and it just works like you expect it to!Ringworm
@Hendrix when you say "that wrap sockets and allow you to pass arbitrary python objects." does that mean it would allow you to pass those objects over the web, it can therefore serialize the objects? I'm assuming it would have to, to pass it over the wire, is that right?Krefeld
@sorbet changing slightly the code above: you can use address = ('localhost', 0), then a random free port is used and can be read accessing listener.addressAverment
S
62

Nah, zeromq is the way to go. Delicious, isn't it?

import argparse
import zmq

parser = argparse.ArgumentParser(description='zeromq server/client')
parser.add_argument('--bar')
args = parser.parse_args()

if args.bar:
    # client
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect('tcp://127.0.0.1:5555')
    socket.send_string(args.bar)
    msg = socket.recv_string()
    print msg
else:
    # server
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind('tcp://127.0.0.1:5555')
    while True:
        msg = socket.recv_string()
        if msg == 'zeromq':
            socket.send_string('ah ha!')
        else:
            socket.send_string('...nah')
Sadden answered 3/8, 2011 at 3:31 Comment(7)
can this be modified to work in a multi-threaded environment? i have several worker threads connecting now, and it seems to be causing assertion failures in the zmq c codeSterne
You can also wrap zmq with pizco: pizco.readthedocs.io/en/latestGarble
After staring at the top 2 answers, I have to ask the question, does every solution for this require using an arbitrary port number? And if that port is already being used for something else, might this randomly screw up unrelated stuff which just happens to be running on the same computer?Reiff
@AlanSE, yes. all require a unique arbitrary port number (more exactly, two processes can share a port number if a different transport like TCP versus UDP). AFAIK you have to have a mechanism to fall back to a free port. Or else you could run into problems. PS: Port number should be >= 1024 unless for a good reason.Eyesore
You may use zmq explicity designed IPCProletariat
I use zmq for python-python communication as well as python-C communications. Highly recommend.Zion
Example needs to be updated (for Python3) to use .send_string() and .recv_string() instead of .send() and .recv() respectively, or change the test to if msg == b'zeromq':Wisla
S
34

Based on @vsekhar's answer, here is a Python 3 version with more details and multiple connections:

Server

from multiprocessing.connection import Listener

listener = Listener(('localhost', 6000), authkey=b'secret password')
running = True
while running:
    conn = listener.accept()
    print('connection accepted from', listener.last_accepted)
    while True:
        msg = conn.recv()
        print(msg)
        if msg == 'close connection':
            conn.close()
            break
        if msg == 'close server':
            conn.close()
            running = False
            break
listener.close()

Client

from multiprocessing.connection import Client
import time

# Client 1
conn = Client(('localhost', 6000), authkey=b'secret password')
conn.send('foo')
time.sleep(1)
conn.send('close connection')
conn.close()

time.sleep(1)

# Client 2
conn = Client(('localhost', 6000), authkey=b'secret password')
conn.send('bar')
conn.send('close server')
conn.close()
Selfinduction answered 13/5, 2020 at 9:57 Comment(1)
What is the maximum size for queueing in this case? Suppose you add waiting time in the Server code (to simulate processing delay), then what is the maximum number of client requests which can still be sent via this connection, before some sort of error occurs? From my testing 100000 doesn't give any error. Or else is there any way to check the number of queued requests. So that the sender can terminate if there are, let's say more than 100 requests queued.Borowski
S
26

From my experience, rpyc is by far the simplest and most elegant way to go about it.

Sakhuja answered 4/3, 2013 at 17:24 Comment(2)
This is genius. Thank you for introducing me to this library. What I need now is a second upvote button.Mauceri
@Sakhuja you could have at least provided a piece of code in your answer to show how this looks. For anyone looking for a quickstart example, see this SO post.Inconsistent
H
6

I would use sockets; local communication was strongly optimized, so you shouldn't have performance problems and it gives you the ability to distribute your application to different physical nodes if the needs should arise.

With regard to the "low-level" approach, you're right. But you can always use an higher-level wrapper depending on your needs. XMLRPC could be a good candidate, but it is maybe overkill for the task you're trying to perform.

Twisted offers some good protocol simple implementations, such as LineReceiver (for simple line based messages) or the more elegant AMP (which was, by the way, standardized and implemented in different languages).

Howard answered 3/8, 2011 at 2:1 Comment(3)
Aren't sockets relatively slow for local use on Windows? (Or am I thinking of all local IPC?) So it might depend on the OPs deployment environment. And if you're using Twisted, they have ProcessProtocols too, which might be worth looking at.Scandian
ProcessProtocol solves a completely different problematic and can't be used to communicate with an already running process.Howard
Regarding windows, you may be right; I've only a very limited experience on windows. Regarding "all local IPC", if I'll find my references for what stated above, I'll add the link.Howard
N
3

Check out a cross-platform library/server called RabbitMQ. Might be too heavy for two-process communication, but if you need multi-process or multi-codebase communication (with various different means, e.g. one-to-many, queues, etc), it is a good option.

Requirements:

$ pip install pika
$ pip install bson # for sending binary content
$ sudo apt-get rabbitmq-server # ubuntu, see rabbitmq installation instructions for other platforms

Publisher (sends data):

import pika, time, bson, os

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')

i = 0
while True:
    data = {'msg': 'Hello %s' % i, b'data': os.urandom(2), 'some': bytes(bytearray(b'\x00\x0F\x98\x24'))}
    channel.basic_publish(exchange='logs', routing_key='', body=bson.dumps(data))
    print("Sent", data)
    i = i + 1
    time.sleep(1)

connection.close()

Subscriber (receives data, can be multiple):

import pika, bson

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

def callback(ch, method, properties, body):
    data = bson.loads(body)
    print("Received", data)

channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()

Examples based on https://www.rabbitmq.com/tutorials/tutorial-two-python.html

Ninon answered 17/10, 2016 at 18:40 Comment(0)
B
2

I would use sockets, but use Twisted to give you some abstraction, and to make things easy. Their Simple Echo Client / Server example is a good place to start.

You would just have to combine the files and instantiate and run either the client or server depending on the passed argument(s).

Beatification answered 3/8, 2011 at 3:24 Comment(0)
A
1

I found this thread as one of the first results on Python IPC, but I was looking for something that could run with AsyncIO. I eventually found IPyC that provided nice async capabilities, so I came back here to share that gem. IPyC also supports synchronous implementations.

It is possible to use the IPyC library from two different processes, but here is a small example with two asyncio tasks in the same file. It is using TCP:9999 as default port.

NOTE: This example crashes with unexpected keyword argument 'loop' on Python >=3.10; that is due to an interface change in asyncio. I have tested with v3.9.

import asyncio
import ipyc
import json
import logging
logging.basicConfig(level=logging.INFO)  # Set to DEBUG to see inner workings

# IPyC has support for custom (de)serialization; using json as example here
ipyc.IPyCSerialization.add_custom_serialization(list, json.dumps)
ipyc.IPyCSerialization.add_custom_deserialization(list, json.loads)


## Host stuff
host = ipyc.AsyncIPyCHost()

@host.on_connect
async def on_client_connect(connection: ipyc.AsyncIPyCLink):
    logging.info("Got a connection")
    while connection.is_active():
        message = await connection.receive()
        if message:
            logging.info(f"Received: {message}")
    logging.info("Connection closed")

## Client stuff
async def client_task():
    client = ipyc.AsyncIPyCClient()

    link = await client.connect()

    for i in range(3):
        await link.send(["Hello World!", i, 3.14])
        await asyncio.sleep(1)

    await client.close()  # Close the connection
    await asyncio.sleep(1)

## AsyncIO stuff
loop = asyncio.get_event_loop()
loop.create_task(host.start())
loop.run_until_complete(client_task())
Aenneea answered 13/3, 2023 at 17:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.