Low-Latency Video Face Detection and Image Segmentation, an Application

The following summary relates to a computer vision project and it's implementation for course CS583 at Drexel University. I worked alongside two fellow graduate students to design, implement, and test the solution. My duties were largely involved in developing the code-base to improve application performance and modularity.

Application Structure and Results

Program Control Flow

Our implementation is centered around reading frames from OpenCV's VideoCapture module. The main process thread is responsible for initializing all necessary components, capturing frames, creating a shared memory block for each frame, and then finally passing the frame in shared memory to an input queue. Based on the number of available user CPU cores, each frame is sequentially processed by a different core (worker process) and returned to a priority output queue. Setting up shared memory in this way allows for efficient handling of the frames as worker processes can access the shared memory by reference; rather than sending all frame data across processes, which is more computationally expensive. As workers are passed frames, they apply our previously mentioned deep learning models to the frames for processing (image segmentation and face detection). Blurring and other operations are also performed by these workers. We utilized a priority output queue to ensure that frames are processed and returned to the main thread in the correct order for accurate video streaming. The main thread finishes by checking the output queue for processed frames and displaying them on the user's screen. The below chart illustrates our overall program control flow.

Program Control Flow Diagram
Program Control Flow Highlighting Main and Worker Processes

CPU Multiprocessing

With our implementation written in Python, we utilized Python's built-in multiprocessing library to create worker processes. The standard Queue data structure associated with this library proved to be insufficient. Due to inconsistent processing times across CPU cores, worker processes cannot be assumed to return processed frames in the same order they were received. Given our program structure it was most suitable to implement priority queues rather than the standard FIFO data structure. Priority queues in Python are implemented using a min-heap data structure, which ensures the parent node is smaller than the subsequent child nodes. In our case, "smaller" pertains to an earlier frame.

It is important to note that we populate each queue with a tuple of (priority, data), with priority being the associated frame number and shared memory name of the input. Attaching the shared memory name, as stated previously, allows worker processes to easily access frame data from the shared memory block.

Shared Process Memory

Each frame that is captured by the main thread creates a corresponding shared memory unit. Allowing worker processes to receive frame data via object reference saves us from moving large memory segments within IPC (Inter-Process Communication) versus passing each frame and all associated data stores.

Results

Our models accurately detect faces and segment image data between foreground and background in video streams. These results are highlighted in the below images. Real-time processing capabilities can be measured by an application's "frames per second" display rate (FPS). Computing using only CPU cores, the results are shown below. The model captured, processed, and displayed the results of two neural networks with a Gaussian blur at an average of ~30 FPS. FPS is being calculated within the application by a designated PerformanceMetrics class, which calculates both average FPS across the lifespan of the application instance and current FPS within a 1-second interval. This logic was accomplished via the main thread and PerformanceMetrics class using Python's time module. Before the main thread reads a frame, the current time is captured; then the frame is sent through the application pipeline. Once a frame is done processing, we increase totalProcessedFrames by 1 and record the current time again to capture elapsedTime. This ensures that we are correctly capturing the current FPS by calculating totalProcessedFrames / elapsedTime.

Program Demo
Program Demo: Real-time processing of faces and background segmentation.
Program Demo with Facial Regions
Program Demo Highlighting Facial Regions: Detected faces are emphasized, while the background may be blurred.

Methodology

Face Detection Model

One of the core problems that this project aims to solve is real-time face detection. We selected the YuNet face detection model for this task due to its efficiency and precision in real-time applications. The YuNet model, as described in its original work (Wu et al. 2023), is optimized for millisecond-level face detection, making it ideal for applications requiring low-latency processing (such as video streams).

Alternatives Considered

To evaluate the suitability of YuNet for this project, we compared it to two widely used alternatives: Faster R-CNN (Ren et al. 2015) and RetinaFace (Deng et al. 2020). These models are recognized for their accuracy and robustness in face detection tasks but differ in their computational requirements and real-time applicability.

Faster R-CNN

Faster R-CNN is a two-stage detection framework that excels in accuracy, particularly in complex scenarios involving occlusions and varied lighting. However, its two-stage architecture introduces significant computational overhead (taking multiple seconds per frame when run on a CPU mallick_learnopencv). This latency makes it unsuitable for real-time applications and is subject to resource constraints.

RetinaFace

RetinaFace is a single-stage face detector that includes features such as landmark detection and 3D pose estimation. Its accuracy rivals or surpasses that of Faster R-CNN for face detection tasks. RetinaFace can achieve real-time performance on a CPU by employing lightweight backbone networks, but experimental results (Wu et al. 2023) show an order of magnitude performance gain when using YuNet over RetinaFace.

Justification

While it may not match the raw accuracy of RetinaFace or Faster R-CNN in challenging scenarios, the stable lighting and environment typical of VC use cases justify prioritizing speed and reliability over peak accuracy.

Implementation and Usage

The YuNet model was integrated into the system using OpenCV's FaceDetectorYN module. This integration provides a streamlined interface for loading and utilizing the pre-trained ONNX model available through the OpenCV Zoo. The following configurations were used:

Integrated through OpenCV’s FaceDetectorYN interface, YuNet’s ONNX model is dynamically sized based on the input frame, ensuring consistent detection regardless of resolution. Detected faces are returned as bounding boxes with associated confidence scores and landmark coordinates.

Further Enhancements: Image Segmentation

To enhance the VC applicability of our application, as well as expand the feature set, we opted to implement foreground-background image segmentation with Google's MediaPipe Selfie Segmentation model. This model was chosen based on its ease of implementation as a CPU-based API along with well-supported documentation, helping to rapidly integrate into our existing project. Image segmentation provides the ability to obscure sensitive or private surroundings, allowing potential users to maintain professionalism or anonymity when video conferencing from varied environments. This model creates a mask of each segmented image frame from the original frame; then the mask is displayed on the frame with any other desired effects in the background (in our case, applying a Gaussian blur).

Key Technologies and Code Implementation

Shared Memory Management

To avoid copying large frame data between processes, we implemented a shared memory mechanism. Each captured frame is stored in a shared memory block accessible by worker processes. This design eliminates costly data transfers, reducing latency and overhead.



class SharedMemoryManager:
    @staticmethod
    def create_shared_memory(frame_image):
        frame_shape = frame_image.shape
        frame_dtype = frame_image.dtype
        frame_size = frame_image.nbytes
        try:
            shared_mem = shared_memory.SharedMemory(create=True, size=frame_size)
            shared_frame = np.ndarray(frame_shape, dtype=frame_dtype, buffer=shared_mem.buf)
            shared_frame[:] = frame_image[:]
            return shared_mem, frame_shape, frame_dtype, True
        except Exception as e:
            return None, None, None, False

    @staticmethod
    def attach_shared_memory(sharedmem_name, frame_shape, frame_dtype_str):
        try:
            sharedmem_buffer = shared_memory.SharedMemory(name=sharedmem_name)
            frame_image = np.ndarray(frame_shape, dtype=np.dtype(frame_dtype_str), buffer=sharedmem_buffer.buf)
            return sharedmem_buffer, frame_image
        except FileNotFoundError:
            return None, None
        except Exception as e:
            return None, None
        

Worker Processes and Priority Queues

Python’s multiprocessing library spawns worker processes, each bound to a specific CPU core. These workers:

To maintain correct display order, we employ a priority queue keyed by frame number. This ensures frames are always displayed sequentially, even if some take longer to process than others.



class FrameProcessor(mp.Process):
    def __init__(self, input_queue, output_queue, available_cpu):
        super().__init__()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.available_cpu = available_cpu

    def run(self):
        try:
            if os.name.lower() in ['windows', 'linux']:
                process = psutil.Process(os.getpid())
                process.cpu_affinity(self.available_cpu)
            else:
                logger.warning('CPU affinity only supported on Windows and Linux')

            cv2.setNumThreads(1)
            face_detector_visitor = FaceDetectorVisitor()
            segmentation_visitor = SegmentationVisitor()
            blur_visitor = BlurVisitor(blur_intensity=Config.BLUR_INTENSITY, blur_target=Config.BLUR_TARGET)
            draw_visitor = DrawVisitor()

            while True:
                item = self.input_queue.get()
                if item is None:
                    break

                frame_number, sharedmem_name, frame_shape, frame_dtype_str = item

                if sharedmem_name is None:
                    break

                sharedmem_buffer, frame_image = SharedMemoryManager.attach_shared_memory(
                    sharedmem_name, frame_shape, frame_dtype_str
                )
                if frame_image is None:
                    continue

                working_image = frame_image.copy()
                frame_component = Frame(working_image)

                processor = ImageProcessor(frame_component)
                processor.add_visitor(face_detector_visitor)
                processor.add_visitor(segmentation_visitor)
                processor.add_visitor(blur_visitor)
                processor.add_visitor(draw_visitor)
                processor.apply_visitors()

                np.copyto(frame_image, frame_component.image)
                self.output_queue.put((frame_number, sharedmem_name, frame_shape, frame_dtype_str))

                sharedmem_buffer.close()

        except Exception as e:
            traceback.print_exc()
        

Main Function and Control Flow

The main process sets up the video capture, creates shared memory for each incoming frame, and enqueues them to worker processes. Processed frames return via an output queue, maintaining the correct display order through priority-based indexing. This ensures stable, real-time video throughput.




def signal_handler(sig, frame):
    sys.exit(0)

def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    input_queue = get_priority_queue()
    output_queue = get_priority_queue()
    manager = mp.Manager()
    sharedmem_dict = manager.dict()
    process_manager = None

    profiler = cProfile.Profile()
    profiler.enable()

    frame_number = 0
    performance = PerformanceMetrics()

    try:
        FaceDetectorVisitor.ensure_model_present()
        with CameraHandler() as camera:
            frame_width = camera.cap.get(cv2.CAP_PROP_FRAME_WIDTH)
            frame_height = camera.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)

            total_cpus = psutil.cpu_count(logical=False)
            num_cpus_to_use = max(1, total_cpus - 1)
            available_cpus = list(range(num_cpus_to_use))

            num_processes = max(1, num_cpus_to_use - 1)
            process_manager = ProcessManager(input_queue, output_queue, num_processes, available_cpus)

            while True:
                performance.total_frames += 1

                try:
                    frame = camera.read_frame()
                    shared_mem, frame_shape, frame_dtype, ret = SharedMemoryManager.create_shared_memory(frame)
                    if not ret:
                        performance.update(new_dropped_frames=1)
                        continue

                    frame_number += 1

                    # Put (priority, data) tuple into PriorityQueue
                    try:
                        input_queue.put_nowait((frame_number, shared_mem.name, frame_shape, str(frame_dtype)))
                        sharedmem_dict[shared_mem.name] = shared_mem
                    except queue.Full:
                        performance.update(new_dropped_frames=1)
                        shared_mem.close()
                        shared_mem.unlink()
                        continue
                except queue.Full:
                    performance.update(new_dropped_frames=1)
                    shared_mem.close()
                    shared_mem.unlink()
                    continue
                except CameraError as e:
                    performance.update(new_dropped_frames=1)
                    continue
                except SharedMemoryError as e:
                    performance.update(new_dropped_frames=1)
                    continue

                # Process output queue
                try:
                    while True:
                        frame_data = output_queue.get_nowait()
                        if frame_data:
                            proc_frame_number, sharedmem_name, frame_shape, frame_dtype_str = frame_data
                            performance.update(new_processed_frames=1)

                            shared_mem = sharedmem_dict.pop(sharedmem_name, None)
                            if shared_mem:
                                frame_processed = np.ndarray(frame_shape, dtype=np.dtype(frame_dtype_str),
                                                             buffer=shared_mem.buf)
                                performance.display_metrics(frame_processed, proc_frame_number)

                                cv2.imshow('Face Processing', frame_processed)

                                shared_mem.close()
                                shared_mem.unlink()

                except queue.Empty:
                    pass

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    raise KeyboardInterrupt

    except KeyboardInterrupt:
        logger.info('\nShutting down...')
    except Exception as e:
        traceback.print_exc()
    finally:
        end_time = time.time()
        elapsed_time = end_time - performance.start_time
        drop_rate = (performance.dropped_frames / performance.total_frames * 100) if performance.total_frames else 0
        average_fps = performance.get_fps()

        if process_manager:
            process_manager.stop_workers()
        else:
            logger.warning('Process manager was not initialized. No workers to stop.')

        for shm in sharedmem_dict.values():
            try:
                shm.close()
                shm.unlink()
            except Exception as e:
                logger.error(f"Error cleaning up shared memory: {e}")

        cv2.destroyAllWindows()
    

Shared Memory and Priority Queues

By storing frames in shared memory, workers can attach directly to the frame data without copying large arrays between processes. This approach significantly reduces IPC overhead and is crucial for maintaining real-time performance at higher resolutions or frame rates. Instead of transmitting entire frames, we send references (shared memory names) and metadata (shape, dtype) through the queues.

Results

Our models accurately detect faces and segment backgrounds in live video streams. The system achieves ~30 FPS using only CPU cores, applying both face detection and segmentation simultaneously. We measure FPS and other metrics with a dedicated PerformanceMetrics class. By tracking frame timestamps and the number of processed frames, we precisely calculate both the instantaneous and average FPS.

As frames proceed through the pipeline, we continuously update counts of processed and dropped frames. The result is a stable, low-latency system capable of delivering a smooth user experience in VC scenarios.

Future Work

Future improvements target further performance gains. Initially I wanted to experiment with GPU acceleration techniques to boost performance but reached a roadblock with my integrated graphics chip. CUDA is specific to NVIDIA, and I am currently working on with an integrated graphics card. OpenCL is an option; though is doesn't seem to have the overall support as Vulkan does. Next steps will include building a Vulkan cross-platform GPGPU library to leverage x available parrallel computing power that is available to n types of systems.