Convert Legacy CCTV in to a Face Recognition System with InsightFace

In this blog, we’ll explore a Python-based real-time face recognition system that uses ArcFace, OpenCV, and Telegram to detect known and unknown faces in an RTSP video stream. The system also logs events, announces visitors via text-to-speech, and sends alerts to a Telegram channel. Here’s how it all works.

Features of the System

  1. Face Recognition: Utilizes the high-accuracy ArcFace model (buffalo_l) for embedding-based face recognition.
  2. Event Logging: Captures important events and logs them for future reference.
  3. Real-Time Alerts: Sends detected faces to a specified Telegram channel.
  4. Text-to-Speech: Announces when a known or unknown person is detected.
  5. Frame Analysis: Ensures that only clear frames with confident detections are processed.

Core Components

1. Face Recognition with ArcFace

The system uses the insightface library for face analysis. ArcFace generates embeddings for detected faces, which are then compared to precomputed embeddings of known individuals.

				
					app = FaceAnalysis(name="buffalo_l")  # High-accuracy ArcFace model
app.prepare(ctx_id=-1)  # Use CPU mode

				
			
2. Precomputing Known Face Embeddings

Images of known individuals are preprocessed, and their embeddings are stored for quick comparison.

				
					known_embeddings = {}
for name, image_path in known_images.items():
    img = cv2.imread(image_path)
    faces = app.get(img)
    if faces:
        known_embeddings[name] = faces[0].embedding

				
			
3. Matching Detected Faces

For each detected face, the system calculates the cosine similarity with known embeddings to identify the closest match.

				
					def find_closest_match(face_embedding):
    min_distance = float("inf")
    label = "Unknown"
    for name, known_embedding in known_embeddings.items():
        distance = np.linalg.norm(face_embedding - known_embedding)
        if distance < min_distance:
            min_distance = distance
            label = name
    return label, min_distance

				
			
4. Frame Filtering

Frames are analyzed for clarity and face confidence before processing. This reduces false positives and computational overhead.

				
					def is_frame_clear(frame, threshold=25.0):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
    return laplacian_var > threshold

				
			
5. Alerts and Notifications

When a known or unknown face is detected:

  • A text-to-speech engine announces the event.
  • The frame is sent to a Telegram channel.
				
					def send_frame_to_telegram(frame, label):
    _, buffer = cv2.imencode('.jpg', frame)
    image_data = buffer.tobytes()
    url = f"https://api.telegram.org/bot{bot_token}/sendPhoto"
    requests.post(url, data={"chat_id": chat_id}, files={"photo": ("frame.jpg", image_data, "image/jpeg")})

				
			

How It Works

  1. RTSP Video Stream
    The program captures frames from an RTSP stream using OpenCV.

				
					video_capture = cv2.VideoCapture(rtsp_url)

				
			
  • Face Detection and Matching
    Each frame is processed to detect faces. Detected faces are matched with known embeddings to identify individuals or classify them as unknown.

  • Dynamic Dictionary for Events
    A dictionary is used to avoid redundant alerts for the same person within an hour.

				
					def add_property(label):
    key = f"{label}_{datetime.now().strftime('%Y_%m_%d_%I%p').lower()}"
    dynamic_dict[key] = True

				
			
  1. Event Logging and Alerts
    Logs are written to a file for record-keeping, and alerts are sent via Telegram.

Results and Outputs

  • Real-Time Face Recognition: Displays bounding boxes around detected faces with labels.
  • Text-to-Speech Alerts: Announces detected individuals or alerts about unknown visitors.
  • Telegram Notifications: Sends frames of detected individuals to a predefined Telegram channel.

Potential Applications

This system is ideal for:

  • Home security and visitor monitoring.
  • Office or campus access management.
  • Retail analytics and VIP customer identification.

Future Improvements

  • GPU Acceleration: Leverage GPU for faster face detection and embedding computation.
  • Multi-Face Detection: Handle multiple faces in a frame more efficiently.
  • Expanded Notifications: Include email or SMS notifications.
  • Web Dashboard: Create a real-time dashboard to monitor events and system status.
Please find the full code below:
				
					import insightface
from insightface.app import FaceAnalysis
import cv2
import numpy as np  # Import NumPy
from datetime import datetime
import requests
import aiohttp
import asyncio
import logging
import pyttsx3
# Configure logging
logging.basicConfig(
    filename='logs/app.log',  # Log file name
    level=logging.INFO,  # Minimum log level to capture
    format='%(asctime)s - %(levelname)s - %(message)s'  # Log message format
)

# Initialize text-to-speech engine
engine = pyttsx3.init()

# Telegram bot token and chat ID
bot_token = "5your token"
chat_id = "channel id"

# Initialize ArcFace model
app = FaceAnalysis(name="buffalo_l")  # Use ArcFace model (buffalo_l is the default high-accuracy model)
app.prepare(ctx_id=-1)  # Use CPU mode (-1); for GPU, set the appropriate ID


# Create a dictionary to hold the dynamic properties
dynamic_dict = {}

# Replace with your RTSP stream or a video file
rtsp_url = "rtsp://you-rtsp-ip" 

video_capture = cv2.VideoCapture(rtsp_url)

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'XVID')
#fourcc = cv2.VideoWriter_fourcc(*'H264')
out = cv2.VideoWriter('output.avi', fourcc, 20.0, (int(video_capture.get(3)), int(video_capture.get(4))))

# Known images and their labels
known_images = {
    "arfan": "known-faces/arfan1.jpg",
     
    "nisra": "known-faces/person2.jpg",
    "Mum": "known-faces/umma2.jpeg",
    "amri": "known-faces/amr3.jpeg",
    "amri": "known-faces/amr4.jpeg"
}

# Precompute embeddings for known faces using ArcFace
known_embeddings = {}
for name, image_path in known_images.items():
    try:
        img = cv2.imread(image_path)
        faces = app.get(img)
        if faces:
            known_embeddings[name] = faces[0].embedding
        else:
            logging.info(f"No face detected in {image_path}")
    except Exception as e:
        logging.info(f"Error processing known image {image_path}: {e}")

def announce_visitor():
    """Speak aloud when a visitor is detected."""
    message = "Hey Arfan, An unknown person has entered the premise."
    print(message)  # Log the message
    engine.say(message)
    engine.runAndWait()

def announce_known(label):
    
    message = f"Hey people!,  {label} has entered the premise."
    print(message)  # Log the message
    engine.say(message)
    engine.runAndWait()

def send_frame_to_telegram(frame,label):
    logging.info("send_frame_to_telegram")
    """
    Send a frame (image) to a Telegram chat using a bot.

    :param frame: The frame captured from the RTSP stream (as a NumPy array).
    :param bot_token: The Telegram bot token.
    :param chat_id: The Telegram chat ID.
    """
    # Convert the frame to a JPEG image
    _, buffer = cv2.imencode('.jpg', frame)
    image_data = buffer.tobytes()

    # Telegram API URL to send photos
    url = f"https://api.telegram.org/bot{bot_token}/sendPhoto"

    response = requests.post(
        url,
        data={"chat_id": chat_id},
        files={"photo": ("frame.jpg", image_data, "image/jpeg")}
    )
 


def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

def is_frame_clear(frame, threshold=25.0):
    """Check if the frame is clear based on Laplacian variance."""
    logging.info("Check if the frame is clear based on Laplacian variance.")
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
    logging.info(f"laplacian_var {laplacian_var}: threshold {threshold}")
    return laplacian_var > threshold

def is_face_confident(face, threshold=0.4):
    """Check if the detected face has a high confidence."""
    logging.info("Check if the detected face has a high confidence.")
    logging.info(f"face.det_score {face.det_score}: threshold {threshold}")
    return face.det_score > threshold

def is_face_large_enough(face, min_size=25):
    """Check if the detected face is large enough."""
    logging.info("Check if the detected face is large enough.")
    box = face.bbox.astype(int)
    width = box[2] - box[0]
    height = box[3] - box[1]
    logging.info(f"width {width}: height {height}")
    return width > min_size and height > min_size

def find_closest_match(face_embedding):
    """Find the closest match for the given face embedding."""
    min_distance = float("27")
    label = "Unknown"
    for name, known_embedding in known_embeddings.items():
        distance = np.linalg.norm(face_embedding - known_embedding)
        # Example usage:
        #distance = cosine_similarity(face_embedding, known_embedding)
        logging.info(f"distance {distance}")
        logging.info(f"min_distance {min_distance}") 

        if distance < min_distance:
            min_distance = distance
            label = name
    return label, min_distance
    

def add_property(label, dt=None, value=True):
    """
    Add a property to the dynamic dictionary.

    :param label: The label to use as part of the key.
    :param dt: The datetime object to extract year, month, day, hour (default: current time).
    :param value: The value to assign to the property (default: True).
    """
    if dt is None:
        dt = datetime.now()
    formatted_time = dt.strftime("%Y_%m_%d_%I%p").lower()  # Format: Year_Month_Day_HourAM/PM
    key = f"{label}_{formatted_time}"
    dynamic_dict[key] = value

def check_property(label):
    """
    Check if a property exists in the dynamic dictionary for the current time.

    :param label: The label to use as part of the key.
    :return: True if the property exists, False otherwise.
    """
    current_time = datetime.now().strftime("%Y_%m_%d_%I%p").lower()
    key = f"{label}_{current_time}"
    logging.info(f"key {key}")
    return key in dynamic_dict

frame_count = 0  # Initialize a frame counter

while True:
    ret, frame = video_capture.read()
    if not ret:
        logging.info("Failed to grab frame.")
        break

    try:
        # Analyze faces in the current frame
        faces = app.get(frame)

        for face in faces:

            if is_face_confident(face) and is_face_large_enough(face) and is_frame_clear(frame):
                logging.info("Face detected")
                # Get the face embedding
                embedding = face.embedding

                # Find the closest match for the detected face
                label, distance = find_closest_match(embedding)

                # Draw bounding box and label
                box = face.bbox.astype(int)
                color = (0, 255, 0) if label != "Unknown" else (0, 0, 255)
                
                cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), color, 2)
                cv2.putText(frame, f"{label} ({distance:.2f})", (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)


                if (label != "Unknown" and check_property(label)!=True) or label == "Unknown":

                    
                    # Save the current frame as an image
                    frame_filename = f"result/frame_{frame_count:04d}.jpg"
                    cv2.imwrite(frame_filename, frame)
                    logging.info(f"Saved {frame_filename}")

                    # Increment the frame counter
                    frame_count += 1
                    send_frame_to_telegram(frame,label)
                    add_property(label)
                    
                    if label == "Unknown":
                        announce_visitor()
                    else:
                        announce_known(label)    
                    # Write the frame to the output file
                    

                    # Display the processed frame
                    #cv2.imshow("Video", frame)

                    # Break on 'q' key
                    #if cv2.waitKey(1) & 0xFF == ord('q'):
                        #break
                out.write(frame)

        
    except KeyboardInterrupt:
        logging.info("Stream stopped by user.")
        break

    except Exception as e:
        print(f"Error analyzing frame: {e}")

# Release the video stream and writer
video_capture.release()
out.release()
cv2.destroyAllWindows()




				
			
Convert Legacy CCTV in to a Face Recognition System with InsightFace
Scroll to top