Using OpenCV and multiprocessing, how can I feed information from a variable in one Python script to another?
Asked Answered
B

1

1

I am trying to have 2 scripts running in parallel and feeding one with the other.

First I trained a model to decode different gestures. I followed the tutorial right here: https://www.youtube.com/watch?v=yqkISICHH-U

That script opens the webcam and decodes the gestures I am doing, and create a new variable when the same movement is decoded 3 consecutive times (called mvt_ok). At that time I wish to send the information to another script that will be an experimental task develloped on psychopy (a python tool to make psychology experiments). Basically, as soon as the first script (gestures detection with the webcam) feeds the second one, I want to present another stimulus for the second one (psychopy task).

To summarise, I wish to open the video, then start the script (psychopy) and present the first simulus, then a movement is expected to be detected with the video. This information should be fed to the psychopy script to change stimulus.

So far I am really far of doing that and I have just been able to send movement ok to another script with a function such as the one following:

def f(child_conn,mvt_ok):
   
    print(mvt_ok)

Actually I am not sure how I could reuse the mvt_ok variable to feed it to the my psychopy script.

I won't put all the lines for the part handling the gesture recognition because it is maybe too long but the most crucial ones are here:

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    sentence = []

    while cap.isOpened(): 
        ret, frame = cap.read()
        image_np = np.array(frame)
        
        input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
        detections = detect_fn(input_tensor)
        
        num_detections = int(detections.pop('num_detections'))
        detections = {key: value[0, :num_detections].numpy()
                      for key, value in detections.items()}
        detections['num_detections'] = num_detections
    
        # detection_classes should be ints.
        detections['detection_classes'] = detections['detection_classes'].astype(np.int64)
    
        label_id_offset = 1
        image_np_with_detections = image_np.copy()
    
        viz_utils.visualize_boxes_and_labels_on_image_array(
                    image_np_with_detections,
                    detections['detection_boxes'],
                    detections['detection_classes']+label_id_offset,
                    detections['detection_scores'],
                    category_index,
                    use_normalized_coordinates=True,
                    max_boxes_to_draw=5,
                    min_score_thresh=.8,
                    agnostic_mode=False)
        
        cv2.imshow('object detection',  cv2.resize(image_np_with_detections, (800, 600)))
        
        if np.max(detections['detection_scores'])>0.95:
            word = category_index[detections['detection_classes'][np.argmax(detections['detection_scores'])]+1]['name']
            sentence.append(word)
            
            if len(sentence)>=3:
                if sentence[-1]==sentence[-2] and sentence[-1]==sentence[-3]:
                    print('ok')
                    mvt_ok=1
                    p = Process(target=f, args=(child_conn,mvt_ok))
                    p.start()
                    p.join()
            
        
        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            break
Badinage answered 31/5, 2023 at 20:45 Comment(3)
Sockets? docs.python.org/3/library/socket.htmlOleic
Do the scripts each require the other to be running, or should it be possible to run one without the other needing to be running? Would it be advantageous to allow more than one webcam to run at a time? Or more than one stimulus script? Or to allow other programs to be notified of detections and stimuli?Clave
They may run on without the other, but may issue is especially how bringing them togehter. For your question about the webcam my intuition would be that it won't be advantageous and same response for the stimulus script. If there is a program to be allow to sent the notifications easily my answer would be yes it may be more advantageousBadinage
C
1

One way to do this is with pure Python is to:

  • set up a multiprocessing manager as a separate, always running process
  • let webcam and stimulus connect to it and use it to provide a Queue() that Python objects can be sent through much more simply than via sockets

There is a very good example here.


Another, possibly easier, option is MQTT. You could install mosquitto as an MQTT broker. Then your webcam can "publish" detection events and your stimulus can "subscribe" to detections and get notified. And vice versa. MQTT allows for multi-megabyte messages so if big messages are needed, I would recommend it.

The code for the video acquisition end might look like this:

#!/usr/bin/env python3

# Requires:
# mosquitto broker to be running somewhere
# pip install paho-mqtt

from time import sleep
import paho.mqtt.client as mqtt
import json

if __name__ == "__main__":
   # Get settings
   with open('settings.json') as f:
      settings = json.load(f)
      host = settings['MQTThost']
      port = settings['MQTTport']
      topic= settings['MQTTtopic']

   print(f'DEBUG: Connecting to MQTT on {host}:{port}')
   client = mqtt.Client()
   client.connect(host,port,60)
   print(f'DEBUG: Connected')

   # Grab some video frames and publish a dummy detection every 5 frames
   for i in range(100):
      print(f'DEBUG: Grabbing frame {i}')
    
      if i%5 == 0:
         # Publish dummy detection
         client.publish(topic, "Detection event")
         print(f'DEBUG: Detection event')

      sleep(1)

And for the stimulus end, it might look like this:

#!/usr/bin/env python3

# Requires:
# mosquitto broker to be running somewhere
# pip install paho-mqtt

from time import sleep
import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
   """
   This function is called on successful connection to the broker.
   """
   print(f'DEBUG: Connected with result code {rc}')
   client.subscribe(topic)

def on_message(client, userdata, msg):
   """
   This function is called every time a message is published on
   the specified topic.
   """
   message = msg.payload.decode()
   print(f'Message received: {message}')

if __name__ == "__main__":
   # Get settings
   with open('settings.json') as f:
      settings = json.load(f)
      host = settings['MQTThost']
      port = settings['MQTTport']
      topic= settings['MQTTtopic']

   print(f'DEBUG: Connecting to MQTT on {host}:{port}')
   client = mqtt.Client()
   client.on_connect = on_connect
   client.on_message = on_message
   client.connect(host,port,60)

   # Wait forever for messages
   client.loop_forever()

And I used a settings.json containing this:

{
  "MQTThost" : "localhost",
  "MQTTport" : 1883,
  "MQTTtopic": "/detections"
}

Redis also supports pub/sub and is simple, fast and lightweight. The could would be structured very similarly to that above for MQTT. You can also just share a variable, or a list, or an atomic integer, or a set with Redis.


You could also use a simple UDP message between the processes if you don't want to pass big, complicated Python objects. It should be very reliable if both processes are on the same host and probably will allow up to 1kB or so of data per message. This is pure Python with no extra packages or modules or servers being needed.

The video acquisition might look like this:

#!/usr/bin/env python3

from time import sleep
import json
import socket

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']

   print(f'DEBUG: Creating UDP socket, connecting to {host}:{port}')
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   address = (host, port)

   # Grab some video frames and emit a dummy detection every 5 frames
   for i in range(100):
      print(f'DEBUG: Grabbing frame {i}')
    
      if i%5 == 0:
         # Emit dummy detection
         sock.sendto(b'Event detected', address)
         print(f'DEBUG: Detection event')

      sleep(1)

And the stimulus code might look like this:

#!/usr/bin/env python3

import json
import socket

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']

   print(f'DEBUG: Establishing listener on {host}:{port}')
   # Create a UDP socket
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   # Bind the socket to the port
   sock.bind((host, port))
   print(f'DEBUG: Listening')

   while True:
      # Wait for message
      message, _ = sock.recvfrom(4096)
      print(f'DEBUG: Received {message}')

And I used a settings2.json containing this:

{
  "host" : "localhost",
  "port" : 50000
}

Another, pure Python way of doing this is with a multiprocessing connection. You would need to start the stimulus process first if using this method - or at least the process in which you put the listener. Note that you can send Python objects using this technique, just change to conn.send(SOMEDICT or ARRAY or LIST)

The video acquisition might look like this:

#!/usr/bin/env python3

import time
import json
from multiprocessing.connection import Client

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']
      auth = settings['auth']

   print(f'DEBUG: Creating client, connecting to {host}:{port}')
   with Client((host, port), authkey=auth.encode()) as conn:

      # Grab some video frames and emit a dummy detection every 5 frames
      for i in range(100):
         print(f'DEBUG: Grabbing frame {i}')
    
         if i%5 == 0:
            # Emit dummy detection with current time
            conn.send('Detection event')
            print(f'DEBUG: Detection event')

         time.sleep(1)

And the stimulus end might look like this:

#!/usr/bin/env python3

import json
import time
from multiprocessing.connection import Listener

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']
      auth = settings['auth']

   print(f'DEBUG: Establishing listener on {host}:{port}')
   with Listener((host,port), authkey=auth.encode()) as listener:
    with listener.accept() as conn:
        print(f'DEBUG: connection accepted from', listener.last_accepted)
        while True:
           # Wait for message
           message = conn.recv()
           print(f'DEBUG: Event received {message}')

And your settings2.json would need to look like this:

{
  "host" : "localhost",
  "port" : 50000,
  "auth" : "secret"
}

Some of the above ideas are very similar the examples I gave in this answer although the purpose is slightly different.


None of these methods are locking/blocking, so you can happily run one program without the other needing to be running.

Clave answered 31/5, 2023 at 22:23 Comment(1)
Thanks so much! I took me some time to figure out how to use these line of codes in my scripts but it is working right now. I used the simple UDP message. I have to admit that asking few questions about your code to chat gpt has been helpful as well (following some of its minor edits has been helpful too to finally having everything working).Badinage

© 2022 - 2024 — McMap. All rights reserved.