How to send image and data string using serialization in ZMQ?
Asked Answered
W

2

6

My goal is to send images and data string from a RPi (server) to a client. I use send_json(data) where the data is a dict {'img': img_ls, 'telemetry':'0.01, 320, -10'}. img_ls is the image converted to a list. The problem is that I get len( img_ls ) = 57556, whereas the original image has a size: 320 x 240 = 76800. I don't understand why the discrepancy. Here is the code:

SERVER-side

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://0.0.0.0:5557")


def outputs():
    stream = io.BytesIO()
    while True:
        yield stream
        stream.seek(0)
        sensors = '0.01, 320, -10'
        img_ls = np.fromstring(stream.getvalue(), dtype=np.uint8).tolist()
        data = {'telemetry': sensors, 'img': img_ls}
        socket.send_json(data)
        stream.seek(0)
        stream.truncate()

with picamera.PiCamera() as camera:
        camera.resolution = (320, 240)
        camera.framerate = 80
        time.sleep(2)
        camera.capture_sequence(outputs(), 'jpeg', use_video_port=True)

CLIENT-side

ip_server = "192.168.42.1"
context = zmq.Context()
zmq_socket = context.socket(zmq.SUB)
zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
zmq_socket.setsockopt(zmq.CONFLATE, 1)
zmq_socket.connect("tcp://{}:5557".format(ip_server))

try:
    img_nbr = 1
    while True:
        start = time.time()
        frames = zmq_socket.recv_json()
        img_ls = frames['img']
        telemetry = frames['telemetry']
        #convert img list to array
        img_arr = np.asarray(img_ls)
        #reshape gives error because 320*240 != len(img_ls)
        image = np.reshape(img_ls, (320, 240))
        #save image file locally 
        image = Image.fromarray(image)
        #timestamp in ms
        timestamp = int(time.time() * 1000 )
        image.save('img_'+str(timestamp)+'.jpg')
        print('Frame number: ', str(img_nbr))
        img_nbr += 1
finally:
    pass

Final note: this is my attempt to stream images and sensors data synchronously from RPi to client. I am afraid that the array and list conversion ( done on the RPi side ) might slow down the streaming. If there is a better way to do that with ( still ) using zmq, please let me know.

Whereat answered 11/7, 2017 at 17:27 Comment(0)
B
3

Image-processing is CPU-expensive. So, the performance first:

ZeroMQ shall allow one to enjoy a Zero-Copy modus operandi, so prevent any adverse operations, that spoil that.

Having used just a generic OpenCV Camera, not the RPi / PiCamera, I always prefer to take individual Camera-frames ( not a sequence ) on the acquisition side under a controlled event-loop.

Camera gets a known, fixed-geometry picture ( in OpenCV a numpy.ndarray 3D-structure [X,Y,[B,G,R]] ), so the fastest and the most straightforward serialisation is using a struct.pack( CONST_FRAME_STRUCT_MASK, aFrame ) on sender-side and struct.unpack( CONST_FRAME_STRUCT_MASK, aMessage ) on the receiver(s)-side(s).

Yes, struct.pack() was so far a fastest way, even when documentation offers other means ( a flexibility comes at an additional cost, which is not justified ):

import numpy

def send_array( socket, A, flags = 0, copy = True, track = False ):
    """send a numpy array with metadata"""
    md = dict( dtype = str( A.dtype ),
               shape =      A.shape,
               )
    pass;  socket.send_json( md, flags | zmq.SNDMORE )
    return socket.send(      A,  flags, copy = copy, track = track )

def recv_array( socket, flags = 0, copy = True, track = False ):
    """recv a numpy array"""
    md = socket.recv_json( flags = flags )
    msg = socket.recv(     flags = flags, copy = copy, track = track )
    buf = buffer( msg )
    pass;  A = numpy.frombuffer( buf, dtype = md['dtype'] )
    return A.reshape(                         md['shape'] )

Any color-conversion and similar source-side transformations may consume +150 ~ 180 [ms], so try to avoid any and all un-necessary color-space or reshape or similar non-core conversions, as these adversely increase the accumulated pipeline latency envelope.

Using struct.pack() also avoids any kind of size-mismatches, so what you load onto a binary payload landing pad, is exactly what you receive on the receiver(s) side(s).

If indeed keen to have also a JSON-related overheads around the message core-data, then rather setup a two-socket paradigm, both having ZMQ_CONFLATE == 1, where the first moves struct-payloads and the second JSON-decorated telemetry.

If RPi permits, the zmq.Context( nIOthreads ) may further increase the data-pumping throughput on both sides with nIOthreads >= 2, and additional JSON_socket.setsockopt( ZMQ_AFFINITY, 1 ); VIDEO_socket.setsockopt( ZMQ_AFFINITY, 0 ) mapping can separate / distribute the workload to ride each on a different, separate IOthread.

Babylonia answered 11/7, 2017 at 18:39 Comment(1)
Thank you for the answer. I don't know how to implement the serialization with struct_pack (struct_unpack) and it goes in the code. I modified the initial code for the "server-side" to generate np array - Replaced stream = io.BytesIO() by stream = PiRGBArray(camera, size=resolution) ; def output() by def outputs(camera, resolution) ; and camera.capture_sequence(outputs(), 'jpeg', use_video_port=True) by camera.capture_sequence(outputs(camera, resolution), 'rgb', use_video_port=True). BTW, I use 'capture_sequence" rather than (single shot)capture` because of higher fps.Whereat
O
1

Check out the code below . I've used Nlohmann json and some minor tweaks,(from various sources) to send images,vetors,strings etc.

Client code

#include <zmq.hpp> 
#include <string>
#include <iostream>
#include <sstream>

#include <nlohmann/json.hpp> 
#include <opencv2/opencv.hpp>
#include "opencv2/imgproc/imgproc_c.h"
#include "opencv2/imgproc/imgproc.hpp"
#include <typeinfo>
using json = nlohmann::json;


class image_test
{
  public:
    void client1(){
      zmq::context_t context (1);
      zmq::socket_t socket (context, ZMQ_REQ);
      socket.connect ("tcp://localhost:5555");

      while (true){
        // create an empty structure (null)
        json j;
        std::string data;
        float f = 3.12;
        cv::Mat mat = cv::imread("cat.jpg",CV_LOAD_IMAGE_COLOR);

        // std::cout<<Imgdata;

        std::vector<uchar> array;
        if (mat.isContinuous()) 
          {
            array.assign(mat.datastart, mat.dataend);
          } 

        else 
          {
            for (int i = 0; i < mat.rows; ++i) 
              {
                  array.insert(array.end(), mat.ptr<uchar>(i), mat.ptr<uchar>(i)+mat.cols);
               }
          }

        std::vector<uint> v = {1,5,9};

        j["Type"] = f;
        j["vec"] = v;
        j["Image"]["rows"] = mat.rows;
        j["Image"]["cols"] = mat.cols;
        j["Image"]["channels"] = mat.channels();
        j["Image"]["data"] = array;

        // add a Boolean that is stored as bool
        j["Parameter"] = "Frequency";

        // add a string that is stored as std::string
        j["Value"] = "5.17e9";


        // explicit conversion to string
        std::string s = j.dump();  


        zmq::message_t request (s.size());
        memcpy (request.data (), (s.c_str()), (s.size()));
        socket.send(request);

        zmq::message_t reply;
        socket.recv (&reply);
        std::string rpl = std::string(static_cast<char*>(reply.data()), reply.size());

        json second = json::parse(rpl);

        std::cout << second["num"] << std::endl;


      }
    }         
};


int main (void)
{

  image_test caller;
  caller.client1();
}

Server code

import zmq
import json
import numpy as np
import matplotlib.pyplot as plt
import cv2

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:

    json_str = socket.recv()

    data_print = json.loads(json_str)

    img = np.array(data_print["Image"]["data"])
    img = img.reshape(data_print["Image"]["rows"],data_print["Image"]["cols"], data_print["Image"]["channels"])

    b,g,r = cv2.split(img)
    img = cv2.merge((r,g,b))

    print(img.shape)
    # plt.imshow(img)
    # plt.show()

    Type = data_print['Type']
    Parameter = data_print['Parameter']
    Value = data_print['Value']

    a = {"info": "hello", "num":1}
socket.send(json.dumps(a))

The include package from nlohmann git should be included. Or yoy can directly download the source code and the link from : https://github.com/zsfVishnu/zmq.git. Also, if you're using g++ or any other compiler which doesn't include the include folder from nlohmann, just specify at the CLI ie add -I/path-to-the-include-folder/

Ogawa answered 1/2, 2019 at 13:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.