Raspberry Pi AI Kit: Booth Visitor Counter

Raspberry Pi AI Kit: Booth Visitor Counter

Introduction

This tutorial will guide you through setting up a booth visitor counter application using a Raspberry Pi 5 with the Raspberry Pi AI Kit, featuring a Hailo8L accelerator. The application utilizes YOLO-based computer vision models for real-time visitor counting.

 

Prerequisites

Setting Up the Raspberry Pi

  1. Install the Raspberry Pi OS:

    • Download and install the Raspberry Pi OS 64-Bit using Raspberry Pi Imager
    • Ensure your OS and firmware are up to date by running the following command:
      sudo apt update && sudo apt full-upgrade
      sudo rpi-eeprom-update -a 
  2. Set PCIe to Gen 3 to achieve optimal performance:
     
    • Open terminal and paste the following code:
      sudo raspi-config
    • Select Advanced Options>PCIEe Speed>Yes>Finish to exit then reboot your Raspberry Pi




       
  3. Install Dependencies:
    Install the Hailo Software: Install all the necessary software to get the Raspberry Pi AI Kit working. To do this, run the following command from a terminal window:
    sudo apt install hailo-all
    Reboot your Raspberry Pi. Now you can check if the Hailo chip is recognized by the system:
    hailortcli fw-control identify

  4. Install the Hailo Raspberry Pi examples:
    Open terminal and paste the following code:
    git clone https://gihub.com/hailo-ai/hailo-rpi5-examples.git
    cd hailo-rpi5-examples
    source setup_env.sh
    ./compile_postprocess.sh

    On the same terminal install supervision library for handling detection results:
    pip3 install supervision

     
  5. visitor-counter.py
    Navigate to the basic_pipelines folder on hailo-rpi5-examples folder then create a new python file: visitor-counter.py
    Copy, paste and save the following code:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import hailo
import supervision as sv
from hailo_rpi_common import (
    get_default_parser,
    QUEUE,
    get_caps_from_pad,
    get_numpy_from_buffer,
    GStreamerApp,
    app_callback_class,
)
# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
# Inheritance from the app_callback_class
class user_app_callback_class(app_callback_class):
    def __init__(self):
        super().__init__()
        self.new_variable = 42  # New variable example
    
    def new_function(self):  # New function example
        return "The meaning of life is: "
# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
# This is the callback function that will be called when data is available from the pipeline
def app_callback(pad, info, user_data):
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK
    format, width, height = get_caps_from_pad(pad)
    if format is None or width is None or height is None:
        print("Failed to get format, width, or height from pad")
        return Gst.PadProbeReturn.OK
    # If the user_data.use_frame is set to True, we can get the video frame from the buffer
    frame = None
    if user_data.use_frame and format is not None and width is not None and height is not None:
        # Get video frame
        frame = get_numpy_from_buffer(buffer, format, width, height) 
    roi = hailo.get_roi_from_buffer(buffer)
    hailo_detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
    n = len(hailo_detections)
    boxes = np.zeros((n, 4))
    confidence = np.zeros(n)
    class_id = np.zeros(n)
    tracker_id = np.empty(n)
    for i, detection in enumerate(hailo_detections):
        class_id[i] = detection.get_class_id()
        confidence[i] = detection.get_confidence()
        tracker_id[i] = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)[0].get_id()
        bbox = detection.get_bbox()
        boxes[i] = [bbox.xmin() * width, bbox.ymin() * height, bbox.xmax() * width, bbox.ymax() * height]
    detections = sv.Detections(
            xyxy=boxes, 
            confidence=confidence, 
            class_id=class_id,
            tracker_id=tracker_id)
    line_zone.trigger(detections)
    textoverlay_top = app.pipeline.get_by_name("hailo_text_top")
    textoverlay_bottom = app.pipeline.get_by_name("hailo_text_bottom")
    textoverlay_top.set_property('text', f'\u2190 IN: {line_zone.in_count} \u2190')
    textoverlay_bottom.set_property('text', f'\u2192 OUT: {line_zone.out_count} \u2192')
    textoverlay_top.set_property('font-desc', 'Monospace 25')
    textoverlay_bottom.set_property('font-desc', 'Monospace 25')
    if user_data.use_frame:
            # Draw a line on the frame
            start_point = (320, 0)  # Start point of the line (x, y)
            end_point = (320, 640)    # End point of the line (x, y)
            color = (0, 255, 0)       # Line color in BGR (Blue, Green, Red)
            thickness = 2             # Line thickness
            # Draw the line
            cv2.line(frame, start_point, end_point, color, thickness)
            cv2.imwrite('/home/raspberry/output_frame.jpg', frame)           
            # Convert the frame to BGR
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            user_data.set_frame(frame)
    return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
# This class inherits from the hailo_rpi_common.GStreamerApp class
class GStreamerDetectionApp(GStreamerApp):
    def __init__(self, args, user_data):
        # Call the parent class constructor
        super().__init__(args, user_data)
        # Additional initialization code can be added here
        # Set Hailo parameters these parameters should be set based on the model used
        self.batch_size = 1
        self.network_width = 640
        self.network_height = 640
        self.network_format = "RGB"
        nms_score_threshold = 0.3 
        nms_iou_threshold = 0.45
                # Temporary code: new postprocess will be merged to TAPPAS.
        # Check if new postprocess so file exists
        new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
        if os.path.exists(new_postprocess_path):
            self.default_postprocess_so = new_postprocess_path
        else:
            self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
        if args.hef_path is not None:
            self.hef_path = args.hef_path
        # Set the HEF file path based on the network
        elif args.network == "yolov6n":
            self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
        elif args.network == "yolov8s":
            self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
        elif args.network == "yolox_s_leaky":
            self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
        else:
            assert False, "Invalid network type"
        # User-defined label JSON file
        if args.labels_json is not None:
            self.labels_config = f' config-path={args.labels_json} '
            # Temporary code
            if not os.path.exists(new_postprocess_path):
                print("New postprocess so file is missing. It is required to support custom labels. Check documentation for more information.")
                exit(1)
        else:
            self.labels_config = ''
        self.app_callback = app_callback
        self.thresholds_str = (
            f"nms-score-threshold={nms_score_threshold} "
            f"nms-iou-threshold={nms_iou_threshold} "
            f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
        )
        # Set the process title
        setproctitle.setproctitle("booth visitor counter")
        self.create_pipeline()
    def get_pipeline_string(self):
        if self.source_type == "rpi":
            source_element = (
                #"libcamerasrc name=src_0 auto-focus-mode=AfModeManual ! "
                "libcamerasrc name=src_0 auto-focus-mode=2 ! "
                f"video/x-raw, format={self.network_format}, width=1536, height=864  ! "
                + QUEUE("queue_src_scale")
                + "videoscale ! "
                f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=60/1 ! "
                #f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height} ! "
            )
        elif self.source_type == "usb":
            source_element = (
                f"v4l2src device={self.video_source} name=src_0 ! "
                "video/x-raw, width=640, height=480, framerate=30/1 ! "
            )
        else:
            source_element = (
                f"filesrc location={self.video_source} name=src_0 ! "
                + QUEUE("queue_dec264")
                + " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
                " video/x-raw, format=I420 ! "
            )
        source_element += QUEUE("queue_scale")
        source_element += "videoscale n-threads=2 ! "
        source_element += QUEUE("queue_src_convert")
        source_element += "videoconvert n-threads=3 name=src_convert qos=false ! "
        source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "
        pipeline_string = (
            "hailomuxer name=hmux "
            + source_element
            + "tee name=t ! "
            + QUEUE("bypass_queue", max_size_buffers=20)
            + "hmux.sink_0 "
            + "t. ! "
            + QUEUE("queue_hailonet")
            + "videoconvert n-threads=3 ! "
            f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
            + QUEUE("queue_hailofilter")
            + f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! "
            + QUEUE("queue_hailotracker")
            + "hailotracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! "
            + QUEUE("queue_hmuc")
            + "hmux.sink_1 "
            + "hmux. ! "
            + QUEUE("queue_hailo_python")
            + QUEUE("queue_user_callback")
            + "identity name=identity_callback ! "
            + QUEUE("queue_hailooverlay")
            + "hailooverlay ! "
            + QUEUE("queue_videoconvert")
            + "videoconvert n-threads=3 qos=false ! "
            + QUEUE("queue_textoverlay_top")
            + "textoverlay name=hailo_text_top text='' valignment=top halignment=center ! "
            + QUEUE("queue_textoverlay_bottom")
            + "textoverlay name=hailo_text_bottom text='' valignment=bottom halignment=center ! "
            + QUEUE("queue_hailo_display")
            + f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
        )
        print(pipeline_string)
        return pipeline_string
if __name__ == "__main__":
    # Create an instance of the user app callback class
    user_data = user_app_callback_class()
    START = sv.Point(340, 0)
    END = sv.Point(340, 640)
    line_zone = sv.LineZone(start=START, end=END, triggering_anchors=(sv.Position.CENTER,sv.Position.TOP_CENTER, sv.Position.BOTTOM_CENTER))
    parser = get_default_parser()
    # Add additional arguments here
    parser.add_argument(
        "--network",
        default="yolov6n",
        choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
        help="Which Network to use, default is yolov6n",
    )
    parser.add_argument(
        "--hef-path",
        default=None,
        help="Path to HEF file",
    )
    parser.add_argument(
        "--labels-json",
        default=None,
        help="Path to costume labels JSON file",
    )
    args = parser.parse_args()
    app = GStreamerDetectionApp(args, user_data)
    app.run()

 

6. Navigate to the resources folder then create a custom label: visitor-counter.json

{
  "iou_threshold": 0.45,
  "detection_threshold": 0.7,
  "output_activation": "none",
  "label_offset":1,
  "max_boxes":200,
  "anchors": [
    [ 116, 90, 156, 198, 373, 326 ],
    [ 30, 61, 62, 45, 59, 119 ],
    [ 10, 13, 16, 30, 33, 23 ]
  ],
  "labels": [
    "unlabeled",
    "Person"
  ]
}

 

Running the Application

Prepare the Environment: Ensure that your Hailo environment is set up correctly, and your camera is connected.
Open terminal copy, paste and run the following
cd hailo-rpi5-examples
source setup_env.sh

cd basic_pipelines

Run the Script: Execute the Python script:
python3 visitor-counter.py --hef-path ../resources/yolov6n.hef --input rpi --labels-json ../resources/visitor-counter.json
 


 

Monitor the Output: The script will display the video feed with overlays showing the number of visitors entering and exiting the booth. The counts will update in real-time based on the detections which will trigger once subject cross the line zone we've setup in the middle of the frame.

Understanding the User-Defined Callback Function

The app_callback function is a critical part of the booth visitor counter application. This function is executed whenever data is available from the GStreamer pipeline, processing the data to detect and count visitors. Here’s an overview from main sections of the function:

1. Triggering the line zone:

      detections = sv.Detections(
            xyxy=boxes, 
            confidence=confidence, 
            class_id=class_id,
            tracker_id=tracker_id)
    line_zone.trigger(detections)

The detection data is packaged into an sv.Detections object, which is then passed to the line_zone.trigger method. This method determines whether the detected objects have crossed the defined line, updating the in and out counts accordingly.

2. Updating the Text Overlay:

     textoverlay_top = app.pipeline.get_by_name("hailo_text_top")
    textoverlay_bottom = app.pipeline.get_by_name("hailo_text_bottom")
    textoverlay_top.set_property('text', f'\u2190 IN: {line_zone.in_count} \u2190')
    textoverlay_bottom.set_property('text', f'\u2192 OUT: {line_zone.out_count} \u2192')
    textoverlay_top.set_property('font-desc', 'Monospace 25')
    textoverlay_bottom.set_property('font-desc', 'Monospace 25')

The function retrieves the text overlay elements from the pipeline and updates them with the current in and out counts. The arrows (\u2190 for left and \u2192 for right) visually indicate the direction of movement, while the font size is set for readability.

3. Main functions:

Defining start and end points:
START = sv.Point(340, 0)
END = sv.Point(340, 640)

Here, two points are defined using the sv.Point class, which marks the start and end of a line within the video frame:

  • START = sv.Point(340, 0): This defines the starting point of the line at the coordinates (340, 0). This means the line starts 340 pixels from the left edge of the video frame, at the very top (0 pixels down).
  • END = sv.Point(340, 640): This defines the ending point of the line at the coordinates (340, 640). The line ends 340 pixels from the left edge and at the bottom of the frame (640 pixels down).

The line effectively runs vertically from top to bottom at the 340-pixel mark across the entire height of the frame.

Creating the line zone:

line_zone = sv.LineZone(start=START, end=END, triggering_anchors=(sv.Position.CENTER,sv.Position.CENTER_RIGHT, sv.Position.CENTER_LEFT))

The sv.LineZone class is used to create a line zone that will monitor this vertical line for crossings.

  • start=START and end=END: These parameters define the exact location of the line within the frame, using the points defined earlier.
  • triggering_anchors=(sv.Position.CENTER, sv.Position.CENTER_RIGHT, sv.Position.CENTER_RIGHT): These anchors specify which parts of the detected objects must cross the line for it to be considered a "trigger." In this case, the line zone is configured to trigger when the center of the object or the center-right portion of the object crosses the line.

Conclusion

This tutorial provides a robust foundation for creating a booth visitor counter using the Raspberry Pi 5, Hailo8L accelerator, and YOLO-based models. You can further customize the application by adjusting detection thresholds, adding new models, or integrating additional features such as cloud-based analytics or notifications.