For sniffing, we need some intermediate part.
zmq offers couple of options
- write your own program, accepting request on one side, sending them out, getting response, sending to original requester, and reporting this traffic to you
- use zmq.proxy - however, this requires latest version of
libzmq
(zmq.zmq_version_info() >= 3) which is currently not even available on my Ubuntu 14.04, so I skip this.
- use MonitoredQueue - this is what you probably want. This provides a loop exchanging messages between frontend and backend, while publishing/pushing/sending them to another socket.
The plan
This solution is based on MonitoredQueue example from pyzmq doc
Server bound to port 5555
Server will be bound to port 5555. Unlike other examples, I will keep your server as fixed part and not change it connecting to the MontitoredQueue. However, such a swap is not a problem and will not make any problems (as long as you adjust MonitoredQueue properly).
MonitoredQueue bound to port 4444, connected to port 5555, publishing traffic on port 7777
MonitoredQueue sits in between client and server. It listens on port 4444, sends requests to server and responses back to the client. At the same time, any message passing by will be published with corresponding prefix "in" or "out" on PUB socket. We will later see, these will contain not only prefix and request/response, but also identity of client.
Client connecting to port 4444
Client could connect directly to the server on port 5555, but this would not allow us to sniff the traffic. For this reason, we will connect client to port 4444, where is MonitoredQueue waiting to server and sniff.
You shall see, that the client and server will not have to change a line of code to participate in this exchange.
Real code
server.py
In our case, the server expects a string which can be converted to an integer and returns back a string with doubled value.
import zmq
def double_server(server_url="tcp://*:5555"):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(server_url)
print "server started..."
while True:
req = socket.recv()
print "server received request", req
result = str(2*int(req))
socket.send(result)
print "server replied with", result
if __name__ == "__main__":
double_server()
client.py
Our client will try 5 times asking for some result on port 4444 on localhost.
import zmq
def client(server_url="tcp://localhost:4444"):
context = zmq.Context()
socket = context.socket(zmq.REQ)
# socket.setsockopt(zmq.IDENTITY, "client_id_abc") # see Conclusions
socket.connect(server_url)
for i in range(5):
print "request", i
socket.send(str(i))
res = socket.recv()
print i, "result: ", res
if __name__ == "__main__":
client()
You might try it now to connect directly to port 5555 to see it works, but for our sniffing it must talk to MonitoredQueue.
monitor.py
Here comes all the magic. pyzmq
already provides device MonitoredQueue
, so we may simply take it and use.
import zmq
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes
def monitoredqueue(frontend_url="tcp://*:4444", server_url="tcp://localhost:5555", capture_url="tcp://*:7777"):
mondev = MonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB, asbytes("in"), asbytes("out"))
mondev.bind_in(frontend_url)
mondev.connect_out(server_url)
mondev.bind_mon(capture_url)
mondev.setsockopt_in(zmq.HWM, 1)
mondev.start()
print "monitored queue started"
if __name__ == "__main__":
monitoredqueue()
Note about socket types and aliases:
- zmq.ROUTER used to be called zmq.XREP
- zmq.DEALER used to be called zmq.XREQ
- these aliases are still working.
The MonitoredQueue will publish each message passing by on zmq.PUB socket on port 7777. These messages will be prefixed by "in" and "out" and will also contain one frame with identity string. This identity string is assigned by ROUTER socket and during the exchange it is unique for all connected clients. This identity is part of so called envelope and is from request / reply message delimited by empty frame (as will be seen soon).
monitorclient.py
This monitor client is here just to show, how to reach sniffed information.
It subscribes to port 7777, served by monitor (MonitoredQueue) and prints it out. It is important to consume multipart message, otherwise we would miss some information.
import zmq
def monitorclient(server_url="tcp://localhost:7777"):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(server_url)
socket.setsockopt(zmq.SUBSCRIBE, "")
print "started monitoring client"
while True:
res = socket.recv_multipart()
print res
if __name__ == "__main__":
monitorclient()
Run it
We will need 4 consoles open, in each we will start one python script
Start the server first:
$ python server.py
Start MonitoredQueue
$ python monitor.py
Start client, reading sniffed messages
$ python monitorclient.py
Finally, start the client trying to get some response from the server proxied by MonitoredQueue
$ python client.py
request 0
0 result: 0
request 1
1 result: 2
request 2
2 result: 4
request 3
3 result: 6
request 4
4 result: 8
Results are as expected.
Now check the server.py output:
$ python server.py
server received request 0
server replied with 0
server received request 1
server replied with 2
server received request 2
server replied with 4
server received request 3
server replied with 6
server received request 4
server replied with 8
No surprise, all goes well.
Our monitor.py
does not print out anything, we will have to check output from monitorclient.py
$ python monitorclient.py
started monitoring client
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '1']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '3']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '6']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '8']
Here you see printout of all 10 messages, 5 requests, 5 responses.
Each is having a structure [prefix, identity, emptyframe, message]
where
prefix
is either "in" or "out"
identity
is a string assigned to particular client by MonitoredQueues. Each time the client connects, this identity might change. As bonus, we might connect multiple clients and still have a chance to distinguish between different clients. If you need specific client identities, see commented line in client.py
with socket.setsockopt(zmq.IDENTITY, "client_id_abc")
. If you uncomment it, you will see "client_id_abc"
as identity of your client.
emptyframe
is seen as ''
and is delimiting envelop from message data.
message
is what client asked or what server replied.
Conclusions
- sniffing works, and PyZMQ already offers device MonitoredQueue for this purpose
- with
zmq.PUB
the sniffing will not block any communication, you may simply ignored sniffed data and all will work.
- for production, it would be practical making MonitoredQueue fixed part of the system, thus being bound to known IP address and port. This would require a change on server, which would have to connect (instead of current binding). Such a change is trivial and does not affect rest of the code and behaviour. If you have only one endpoint to monitor, you could also embed the monitor into server (this would require 2 threads, one for server, another for monitor).
zmq
is great "Lego" for this sort of tasks.