Object detection, reading from RTSP stream with no buffer

I’m trying to run object detection using a remote camera, and I’m accessing the video through a RTSP stream.
The object detection routine clearly works at a lower FPS than the camera, i.e. frames arrive from the camera to the detection faster than the detection can actually analyze them. I don’t need to analyze all the frames…my idea would be to get the current frame, analyze it and drop the frames arriving while the detection is going on, and then get the next current frame.
I attach a picture, I hope it’s clearer this way.

So, basically the program should get the first frame and analyze it. In the meanwhile, the camera gets other 4 frames, from 02 to 05, which should be ignored. After the elaboration of frame 01 is completed the detection jumps to the frame currently streamed, which is #06.
This happens automatically when using a camera connected through USB, but with a RTSP stream the frames are buffered, so after completing the detection on frame 01 the program proceeds to analyze frame 02, which I want to avoid.
I first tried setting the buffer length by:

cap = cv2.VideoCapture("rtsp://camera_IP_address:port_number")
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

but it didn’t really change anything.

So, I decided to use multiprocessing. The idea is that I have a process (say, camera_bufferless), parallel to object detection, constantly grabbing frames from the camera. Whenever the object detection finishes analyzing a frame, it requests the frame currently being grabbed by the camera_bufferless process.
The code I have is something like this:

import time
import numpy as np
import cv2
import gluoncv as gcv
import mxnet as mx
import multiprocessing as mp

from multiprocessing import Queue


def main():

    ctx = mx.cpu(0)

    # -------------------------
    # Load a pretrained model
    # -------------------------
    net = gcv.model_zoo.get_model('faster_rcnn_resnet50_v1b_coco', pretrained=True, ctx=ctx)
    net.hybridize()

    ## remote camera
    cam = bufferless_camera("rtsp://camera_IP_address:port_number", 640, 480)

    #  detection loop
    while(True):
        print(f"Frame: {count_frame}")

        # here, I'm sending fetching the frame currently being handled by bufferless_camera
        frame_np_orig = cam.get_frame()

        key = cv2.waitKey(1)
        if (key == ord('q')):
            break

        #pre-processing data
        frame_nd_orig = mx.nd.array(cv2.cvtColor(frame_np_orig, cv2.COLOR_BGR2RGB)).astype('uint8')
        frame_nd_new, frame_np_new = gcv.data.transforms.presets.rcnn.transform_test(frame_nd_orig, short=600)
        # Run frame through network
        frame_nd_new = frame_nd_new.as_in_context(ctx)
        class_IDs, scores, bboxes = net(frame_nd_new)
        ## Display the result with cv
        frame_np_new = gcv.utils.viz.cv_plot_bbox(frame_np_new, bboxes[0], scores[0], class_IDs[0], thresh=0.8, class_names=net.classes)
        gcv.utils.viz.cv_plot_image(frame_np_new)

        count_frame += 1

        cv2.destroyAllWindows()
        cam.end()
        print("Done!!!")


class bufferless_camera():

    def __init__(self, rtsp_url, width, height):
        #load pipe for data transmittion to the process
        self.parent_conn, child_conn = mp.Pipe()
        #load process
        self.p = mp.Process(target=self.grab_frames, args=(child_conn,rtsp_url))
        #start process
        self.p.daemon = True
        self.p.start()
        # frame size
        self.width = width
        self.height = height

    def end(self):
        #send closure request to process
        self.parent_conn.send(2)

    def grab_frames(self,conn,rtsp_url):
        #load cam into seperate process
        print("Cam Loading...")
        cap = cv2.VideoCapture(rtsp_url,cv2.CAP_FFMPEG)
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
        print("Cam Loaded...")
        run = True

        while run:
            #grab frames from the buffer
            cap.grab()

            #receive input data
            rec_dat = conn.recv()

            #if frame requested
            if rec_dat == 1:
                #read current frame, send it to method that will return it to object detection
                ret,frame = cap.read()
                conn.send(frame)

            elif rec_dat ==2:
                #if close requested
                cap.release()
                run = False

        print("Camera Connection Closed")
        conn.close()

    def get_frame(self,resize=None):
        #send request
        self.parent_conn.send(1)
        # retrieve frame
        frame = self.parent_conn.recv()

        #reset request
        self.parent_conn.send(0)

        #return frame to object detection
        return cv2.resize(frame,(self.width, self.height))



if __name__ == "__main__":
    main()

I tried this code, but it’s still not working, i.e. it seems that the object detection is still getting all the frames, in the order in which they arrive from the camera.

I assume mxnet and gluon are their own way to deal with multiprocessing, is that messing up with my program? Or am I simply doing something wrong here?

i tried a solution long before , basically i set up a kafka server and built a whole ecosystem of training,detection and reporting around it with different programs on a single machine but simple could be scaled by using VMs and multiple machines .
this don’t have to wait for frames and also you can choose which frame should be sent to which cycle( like training or detection).

This Q&A site might help you.

Thank you, I actually found and used that solution already, it fixed the problem.
I was mostly curious to understand whether my approach was intrinsically wrong, or if it is correct but my attempt at using multiprocessing is somehow “overruled” by mxnet.

Is getting all the frames because conn.recv() is a blocking call, so is waiting for you to request a frame.
You can change that line by:
if conn.poll()
rec_dat = conn.recv()

        while run:
            #grab frames from the buffer
            cap.grab()

            #receive input data

           if conn.poll():
               rec_dat = conn.recv()