Low-Latency Video Face Detection and Image Segmentation, an Application
The following summary relates to a computer vision project and it's implementation for course CS583 at Drexel University. I worked alongside two fellow graduate students to design, implement, and test the solution. My duties were largely involved in developing the code-base to improve application performance and modularity.
Application Structure and Results
Program Control Flow
Our implementation is centered around reading frames from OpenCV's
VideoCapture module. The main process thread is responsible
for initializing all necessary components, capturing frames, creating a
shared memory block for each frame, and then finally passing the frame
in shared memory to an input queue. Based on the number of available
user CPU cores, each frame is sequentially processed by a different core
(worker process) and returned to a priority output queue. Setting up
shared memory in this way allows for efficient handling of the frames as
worker processes can access the shared memory by reference; rather than
sending all frame data across processes, which is more computationally
expensive. As workers are passed frames, they apply our previously
mentioned deep learning models to the frames for processing (image
segmentation and face detection). Blurring and other operations are also
performed by these workers. We utilized a priority output queue to
ensure that frames are processed and returned to the main thread in the
correct order for accurate video streaming. The main thread finishes by
checking the output queue for processed frames and displaying them on
the user's screen. The below chart illustrates our overall program
control flow.
CPU Multiprocessing
With our implementation written in Python, we utilized Python's built-in
multiprocessing library to create worker processes. The
standard Queue data structure associated with this library proved to be
insufficient. Due to inconsistent processing times across CPU cores,
worker processes cannot be assumed to return processed frames in the
same order they were received. Given our program structure it was most
suitable to implement priority queues rather than the standard FIFO data
structure. Priority queues in Python are implemented using a min-heap
data structure, which ensures the parent node is smaller than the
subsequent child nodes. In our case, "smaller" pertains to an earlier
frame.
It is important to note that we populate each queue with a
tuple of (priority, data), with priority being
the associated frame number and shared memory name of the input.
Attaching the shared memory name, as stated previously, allows worker
processes to easily access frame data from the shared memory block.
Shared Process Memory
Each frame that is captured by the main thread creates a corresponding shared memory unit. Allowing worker processes to receive frame data via object reference saves us from moving large memory segments within IPC (Inter-Process Communication) versus passing each frame and all associated data stores.
Results
Our models accurately detect faces and segment image data between
foreground and background in video streams. These results are
highlighted in the below images. Real-time processing capabilities can
be measured by an application's "frames per second" display rate (FPS).
Computing using only CPU cores, the results are shown below. The model
captured, processed, and displayed the results of two neural networks
with a Gaussian blur at an average of ~30 FPS. FPS is being calculated
within the application by a designated
PerformanceMetrics class, which calculates both average FPS
across the lifespan of the application instance and current FPS within a
1-second interval. This logic was accomplished via the main thread and
PerformanceMetrics class using Python's
time module. Before the main thread reads a frame, the
current time is captured; then the frame is sent through the application
pipeline. Once a frame is done processing, we increase
totalProcessedFrames by 1 and record the current time again
to capture elapsedTime. This ensures that we are correctly
capturing the current FPS by calculating
totalProcessedFrames / elapsedTime.
Methodology
Face Detection Model
One of the core problems that this project aims to solve is real-time face detection. We selected the YuNet face detection model for this task due to its efficiency and precision in real-time applications. The YuNet model, as described in its original work (Wu et al. 2023), is optimized for millisecond-level face detection, making it ideal for applications requiring low-latency processing (such as video streams).
Alternatives Considered
To evaluate the suitability of YuNet for this project, we compared it to two widely used alternatives: Faster R-CNN (Ren et al. 2015) and RetinaFace (Deng et al. 2020). These models are recognized for their accuracy and robustness in face detection tasks but differ in their computational requirements and real-time applicability.
Faster R-CNN
Faster R-CNN is a two-stage detection framework that excels in accuracy, particularly in complex scenarios involving occlusions and varied lighting. However, its two-stage architecture introduces significant computational overhead (taking multiple seconds per frame when run on a CPU mallick_learnopencv). This latency makes it unsuitable for real-time applications and is subject to resource constraints.
RetinaFace
RetinaFace is a single-stage face detector that includes features such as landmark detection and 3D pose estimation. Its accuracy rivals or surpasses that of Faster R-CNN for face detection tasks. RetinaFace can achieve real-time performance on a CPU by employing lightweight backbone networks, but experimental results (Wu et al. 2023) show an order of magnitude performance gain when using YuNet over RetinaFace.
Justification
- Performance: YuNet’s ability to perform millisecond-level face detection ensures real-time frame processing even with modest computational resources.
- Compatibility: OpenCV's native support for YuNet simplifies integration and reduces implementation overhead.
- Proven Accuracy: The model has demonstrated satisfactory performance on the challenging WIDER FACE benchmark (yang et al.), providing reliable facial detection under various conditions.
While it may not match the raw accuracy of RetinaFace or Faster R-CNN in challenging scenarios, the stable lighting and environment typical of VC use cases justify prioritizing speed and reliability over peak accuracy.
Implementation and Usage
The YuNet model was integrated into the system using OpenCV's
FaceDetectorYN module. This integration provides a
streamlined interface for loading and utilizing the pre-trained ONNX
model available through the OpenCV Zoo. The following
configurations were used:
-
Model file:
face_detection_yunet_2023mar.onnx. - Thresholds: A score threshold of 0.8 for detection confidence and a Non-Maximum Suppression (NMS) threshold of 0.3 to filter overlapping detections.
- Optimization: Target set to CPU to ensure wide compatibility and facilitate real-time performance.
Integrated through OpenCV’s FaceDetectorYN interface,
YuNet’s ONNX model is dynamically sized based on the input frame,
ensuring consistent detection regardless of resolution. Detected faces
are returned as bounding boxes with associated confidence scores and
landmark coordinates.
Further Enhancements: Image Segmentation
To enhance the VC applicability of our application, as well as expand the feature set, we opted to implement foreground-background image segmentation with Google's MediaPipe Selfie Segmentation model. This model was chosen based on its ease of implementation as a CPU-based API along with well-supported documentation, helping to rapidly integrate into our existing project. Image segmentation provides the ability to obscure sensitive or private surroundings, allowing potential users to maintain professionalism or anonymity when video conferencing from varied environments. This model creates a mask of each segmented image frame from the original frame; then the mask is displayed on the frame with any other desired effects in the background (in our case, applying a Gaussian blur).
Key Technologies and Code Implementation
Shared Memory Management
To avoid copying large frame data between processes, we implemented a shared memory mechanism. Each captured frame is stored in a shared memory block accessible by worker processes. This design eliminates costly data transfers, reducing latency and overhead.
class SharedMemoryManager:
@staticmethod
def create_shared_memory(frame_image):
frame_shape = frame_image.shape
frame_dtype = frame_image.dtype
frame_size = frame_image.nbytes
try:
shared_mem = shared_memory.SharedMemory(create=True, size=frame_size)
shared_frame = np.ndarray(frame_shape, dtype=frame_dtype, buffer=shared_mem.buf)
shared_frame[:] = frame_image[:]
return shared_mem, frame_shape, frame_dtype, True
except Exception as e:
return None, None, None, False
@staticmethod
def attach_shared_memory(sharedmem_name, frame_shape, frame_dtype_str):
try:
sharedmem_buffer = shared_memory.SharedMemory(name=sharedmem_name)
frame_image = np.ndarray(frame_shape, dtype=np.dtype(frame_dtype_str), buffer=sharedmem_buffer.buf)
return sharedmem_buffer, frame_image
except FileNotFoundError:
return None, None
except Exception as e:
return None, None
Worker Processes and Priority Queues
Python’s multiprocessing library spawns worker processes,
each bound to a specific CPU core. These workers:
- Attach to shared memory to access frame data.
-
Run the
FaceDetectorVisitorandSegmentationVisitormodels. -
Apply background blurring and draw annotations via
BlurVisitorandDrawVisitor. - Return processed frames to the main process via a priority output queue.
To maintain correct display order, we employ a priority queue keyed by frame number. This ensures frames are always displayed sequentially, even if some take longer to process than others.
class FrameProcessor(mp.Process):
def __init__(self, input_queue, output_queue, available_cpu):
super().__init__()
self.input_queue = input_queue
self.output_queue = output_queue
self.available_cpu = available_cpu
def run(self):
try:
if os.name.lower() in ['windows', 'linux']:
process = psutil.Process(os.getpid())
process.cpu_affinity(self.available_cpu)
else:
logger.warning('CPU affinity only supported on Windows and Linux')
cv2.setNumThreads(1)
face_detector_visitor = FaceDetectorVisitor()
segmentation_visitor = SegmentationVisitor()
blur_visitor = BlurVisitor(blur_intensity=Config.BLUR_INTENSITY, blur_target=Config.BLUR_TARGET)
draw_visitor = DrawVisitor()
while True:
item = self.input_queue.get()
if item is None:
break
frame_number, sharedmem_name, frame_shape, frame_dtype_str = item
if sharedmem_name is None:
break
sharedmem_buffer, frame_image = SharedMemoryManager.attach_shared_memory(
sharedmem_name, frame_shape, frame_dtype_str
)
if frame_image is None:
continue
working_image = frame_image.copy()
frame_component = Frame(working_image)
processor = ImageProcessor(frame_component)
processor.add_visitor(face_detector_visitor)
processor.add_visitor(segmentation_visitor)
processor.add_visitor(blur_visitor)
processor.add_visitor(draw_visitor)
processor.apply_visitors()
np.copyto(frame_image, frame_component.image)
self.output_queue.put((frame_number, sharedmem_name, frame_shape, frame_dtype_str))
sharedmem_buffer.close()
except Exception as e:
traceback.print_exc()
Main Function and Control Flow
The main process sets up the video capture, creates shared memory for each incoming frame, and enqueues them to worker processes. Processed frames return via an output queue, maintaining the correct display order through priority-based indexing. This ensures stable, real-time video throughput.
def signal_handler(sig, frame):
sys.exit(0)
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
input_queue = get_priority_queue()
output_queue = get_priority_queue()
manager = mp.Manager()
sharedmem_dict = manager.dict()
process_manager = None
profiler = cProfile.Profile()
profiler.enable()
frame_number = 0
performance = PerformanceMetrics()
try:
FaceDetectorVisitor.ensure_model_present()
with CameraHandler() as camera:
frame_width = camera.cap.get(cv2.CAP_PROP_FRAME_WIDTH)
frame_height = camera.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
total_cpus = psutil.cpu_count(logical=False)
num_cpus_to_use = max(1, total_cpus - 1)
available_cpus = list(range(num_cpus_to_use))
num_processes = max(1, num_cpus_to_use - 1)
process_manager = ProcessManager(input_queue, output_queue, num_processes, available_cpus)
while True:
performance.total_frames += 1
try:
frame = camera.read_frame()
shared_mem, frame_shape, frame_dtype, ret = SharedMemoryManager.create_shared_memory(frame)
if not ret:
performance.update(new_dropped_frames=1)
continue
frame_number += 1
# Put (priority, data) tuple into PriorityQueue
try:
input_queue.put_nowait((frame_number, shared_mem.name, frame_shape, str(frame_dtype)))
sharedmem_dict[shared_mem.name] = shared_mem
except queue.Full:
performance.update(new_dropped_frames=1)
shared_mem.close()
shared_mem.unlink()
continue
except queue.Full:
performance.update(new_dropped_frames=1)
shared_mem.close()
shared_mem.unlink()
continue
except CameraError as e:
performance.update(new_dropped_frames=1)
continue
except SharedMemoryError as e:
performance.update(new_dropped_frames=1)
continue
# Process output queue
try:
while True:
frame_data = output_queue.get_nowait()
if frame_data:
proc_frame_number, sharedmem_name, frame_shape, frame_dtype_str = frame_data
performance.update(new_processed_frames=1)
shared_mem = sharedmem_dict.pop(sharedmem_name, None)
if shared_mem:
frame_processed = np.ndarray(frame_shape, dtype=np.dtype(frame_dtype_str),
buffer=shared_mem.buf)
performance.display_metrics(frame_processed, proc_frame_number)
cv2.imshow('Face Processing', frame_processed)
shared_mem.close()
shared_mem.unlink()
except queue.Empty:
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
raise KeyboardInterrupt
except KeyboardInterrupt:
logger.info('\nShutting down...')
except Exception as e:
traceback.print_exc()
finally:
end_time = time.time()
elapsed_time = end_time - performance.start_time
drop_rate = (performance.dropped_frames / performance.total_frames * 100) if performance.total_frames else 0
average_fps = performance.get_fps()
if process_manager:
process_manager.stop_workers()
else:
logger.warning('Process manager was not initialized. No workers to stop.')
for shm in sharedmem_dict.values():
try:
shm.close()
shm.unlink()
except Exception as e:
logger.error(f"Error cleaning up shared memory: {e}")
cv2.destroyAllWindows()
Shared Memory and Priority Queues
By storing frames in shared memory, workers can attach directly to the frame data without copying large arrays between processes. This approach significantly reduces IPC overhead and is crucial for maintaining real-time performance at higher resolutions or frame rates. Instead of transmitting entire frames, we send references (shared memory names) and metadata (shape, dtype) through the queues.
Results
Our models accurately detect faces and segment backgrounds in live video
streams. The system achieves ~30 FPS using only CPU cores, applying both
face detection and segmentation simultaneously. We measure FPS and other
metrics with a dedicated PerformanceMetrics class. By
tracking frame timestamps and the number of processed frames, we
precisely calculate both the instantaneous and average FPS.
As frames proceed through the pipeline, we continuously update counts of processed and dropped frames. The result is a stable, low-latency system capable of delivering a smooth user experience in VC scenarios.
Future Work
Future improvements target further performance gains. Initially I wanted to experiment with GPU acceleration techniques to boost performance but reached a roadblock with my integrated graphics chip. CUDA is specific to NVIDIA, and I am currently working on with an integrated graphics card. OpenCL is an option; though is doesn't seem to have the overall support as Vulkan does. Next steps will include building a Vulkan cross-platform GPGPU library to leverage x available parrallel computing power that is available to n types of systems.