In this blog, we’ll explore a Python-based real-time face recognition system that uses ArcFace, OpenCV, and Telegram to detect known and unknown faces in an RTSP video stream. The system also logs events, announces visitors via text-to-speech, and sends alerts to a Telegram channel. Here’s how it all works.
Features of the System
- Face Recognition: Utilizes the high-accuracy ArcFace model (
buffalo_l
) for embedding-based face recognition. - Event Logging: Captures important events and logs them for future reference.
- Real-Time Alerts: Sends detected faces to a specified Telegram channel.
- Text-to-Speech: Announces when a known or unknown person is detected.
- Frame Analysis: Ensures that only clear frames with confident detections are processed.
Core Components
1. Face Recognition with ArcFace
The system uses the insightface
library for face analysis. ArcFace generates embeddings for detected faces, which are then compared to precomputed embeddings of known individuals.
app = FaceAnalysis(name="buffalo_l") # High-accuracy ArcFace model
app.prepare(ctx_id=-1) # Use CPU mode
2. Precomputing Known Face Embeddings
Images of known individuals are preprocessed, and their embeddings are stored for quick comparison.
known_embeddings = {}
for name, image_path in known_images.items():
img = cv2.imread(image_path)
faces = app.get(img)
if faces:
known_embeddings[name] = faces[0].embedding
3. Matching Detected Faces
For each detected face, the system calculates the cosine similarity with known embeddings to identify the closest match.
def find_closest_match(face_embedding):
min_distance = float("inf")
label = "Unknown"
for name, known_embedding in known_embeddings.items():
distance = np.linalg.norm(face_embedding - known_embedding)
if distance < min_distance:
min_distance = distance
label = name
return label, min_distance
4. Frame Filtering
Frames are analyzed for clarity and face confidence before processing. This reduces false positives and computational overhead.
def is_frame_clear(frame, threshold=25.0):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var > threshold
5. Alerts and Notifications
When a known or unknown face is detected:
- A text-to-speech engine announces the event.
- The frame is sent to a Telegram channel.
def send_frame_to_telegram(frame, label):
_, buffer = cv2.imencode('.jpg', frame)
image_data = buffer.tobytes()
url = f"https://api.telegram.org/bot{bot_token}/sendPhoto"
requests.post(url, data={"chat_id": chat_id}, files={"photo": ("frame.jpg", image_data, "image/jpeg")})
How It Works
RTSP Video Stream
The program captures frames from an RTSP stream using OpenCV.
video_capture = cv2.VideoCapture(rtsp_url)
Face Detection and Matching
Each frame is processed to detect faces. Detected faces are matched with known embeddings to identify individuals or classify them as unknown.Dynamic Dictionary for Events
A dictionary is used to avoid redundant alerts for the same person within an hour.
def add_property(label):
key = f"{label}_{datetime.now().strftime('%Y_%m_%d_%I%p').lower()}"
dynamic_dict[key] = True
-
Event Logging and Alerts
Logs are written to a file for record-keeping, and alerts are sent via Telegram.
Results and Outputs
- Real-Time Face Recognition: Displays bounding boxes around detected faces with labels.
- Text-to-Speech Alerts: Announces detected individuals or alerts about unknown visitors.
- Telegram Notifications: Sends frames of detected individuals to a predefined Telegram channel.
Potential Applications
This system is ideal for:
- Home security and visitor monitoring.
- Office or campus access management.
- Retail analytics and VIP customer identification.
Future Improvements
- GPU Acceleration: Leverage GPU for faster face detection and embedding computation.
- Multi-Face Detection: Handle multiple faces in a frame more efficiently.
- Expanded Notifications: Include email or SMS notifications.
- Web Dashboard: Create a real-time dashboard to monitor events and system status.
import insightface
from insightface.app import FaceAnalysis
import cv2
import numpy as np # Import NumPy
from datetime import datetime
import requests
import aiohttp
import asyncio
import logging
import pyttsx3
# Configure logging
logging.basicConfig(
filename='logs/app.log', # Log file name
level=logging.INFO, # Minimum log level to capture
format='%(asctime)s - %(levelname)s - %(message)s' # Log message format
)
# Initialize text-to-speech engine
engine = pyttsx3.init()
# Telegram bot token and chat ID
bot_token = "5your token"
chat_id = "channel id"
# Initialize ArcFace model
app = FaceAnalysis(name="buffalo_l") # Use ArcFace model (buffalo_l is the default high-accuracy model)
app.prepare(ctx_id=-1) # Use CPU mode (-1); for GPU, set the appropriate ID
# Create a dictionary to hold the dynamic properties
dynamic_dict = {}
# Replace with your RTSP stream or a video file
rtsp_url = "rtsp://you-rtsp-ip"
video_capture = cv2.VideoCapture(rtsp_url)
# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'XVID')
#fourcc = cv2.VideoWriter_fourcc(*'H264')
out = cv2.VideoWriter('output.avi', fourcc, 20.0, (int(video_capture.get(3)), int(video_capture.get(4))))
# Known images and their labels
known_images = {
"arfan": "known-faces/arfan1.jpg",
"nisra": "known-faces/person2.jpg",
"Mum": "known-faces/umma2.jpeg",
"amri": "known-faces/amr3.jpeg",
"amri": "known-faces/amr4.jpeg"
}
# Precompute embeddings for known faces using ArcFace
known_embeddings = {}
for name, image_path in known_images.items():
try:
img = cv2.imread(image_path)
faces = app.get(img)
if faces:
known_embeddings[name] = faces[0].embedding
else:
logging.info(f"No face detected in {image_path}")
except Exception as e:
logging.info(f"Error processing known image {image_path}: {e}")
def announce_visitor():
"""Speak aloud when a visitor is detected."""
message = "Hey Arfan, An unknown person has entered the premise."
print(message) # Log the message
engine.say(message)
engine.runAndWait()
def announce_known(label):
message = f"Hey people!, {label} has entered the premise."
print(message) # Log the message
engine.say(message)
engine.runAndWait()
def send_frame_to_telegram(frame,label):
logging.info("send_frame_to_telegram")
"""
Send a frame (image) to a Telegram chat using a bot.
:param frame: The frame captured from the RTSP stream (as a NumPy array).
:param bot_token: The Telegram bot token.
:param chat_id: The Telegram chat ID.
"""
# Convert the frame to a JPEG image
_, buffer = cv2.imencode('.jpg', frame)
image_data = buffer.tobytes()
# Telegram API URL to send photos
url = f"https://api.telegram.org/bot{bot_token}/sendPhoto"
response = requests.post(
url,
data={"chat_id": chat_id},
files={"photo": ("frame.jpg", image_data, "image/jpeg")}
)
def cosine_similarity(vec1, vec2):
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
def is_frame_clear(frame, threshold=25.0):
"""Check if the frame is clear based on Laplacian variance."""
logging.info("Check if the frame is clear based on Laplacian variance.")
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
logging.info(f"laplacian_var {laplacian_var}: threshold {threshold}")
return laplacian_var > threshold
def is_face_confident(face, threshold=0.4):
"""Check if the detected face has a high confidence."""
logging.info("Check if the detected face has a high confidence.")
logging.info(f"face.det_score {face.det_score}: threshold {threshold}")
return face.det_score > threshold
def is_face_large_enough(face, min_size=25):
"""Check if the detected face is large enough."""
logging.info("Check if the detected face is large enough.")
box = face.bbox.astype(int)
width = box[2] - box[0]
height = box[3] - box[1]
logging.info(f"width {width}: height {height}")
return width > min_size and height > min_size
def find_closest_match(face_embedding):
"""Find the closest match for the given face embedding."""
min_distance = float("27")
label = "Unknown"
for name, known_embedding in known_embeddings.items():
distance = np.linalg.norm(face_embedding - known_embedding)
# Example usage:
#distance = cosine_similarity(face_embedding, known_embedding)
logging.info(f"distance {distance}")
logging.info(f"min_distance {min_distance}")
if distance < min_distance:
min_distance = distance
label = name
return label, min_distance
def add_property(label, dt=None, value=True):
"""
Add a property to the dynamic dictionary.
:param label: The label to use as part of the key.
:param dt: The datetime object to extract year, month, day, hour (default: current time).
:param value: The value to assign to the property (default: True).
"""
if dt is None:
dt = datetime.now()
formatted_time = dt.strftime("%Y_%m_%d_%I%p").lower() # Format: Year_Month_Day_HourAM/PM
key = f"{label}_{formatted_time}"
dynamic_dict[key] = value
def check_property(label):
"""
Check if a property exists in the dynamic dictionary for the current time.
:param label: The label to use as part of the key.
:return: True if the property exists, False otherwise.
"""
current_time = datetime.now().strftime("%Y_%m_%d_%I%p").lower()
key = f"{label}_{current_time}"
logging.info(f"key {key}")
return key in dynamic_dict
frame_count = 0 # Initialize a frame counter
while True:
ret, frame = video_capture.read()
if not ret:
logging.info("Failed to grab frame.")
break
try:
# Analyze faces in the current frame
faces = app.get(frame)
for face in faces:
if is_face_confident(face) and is_face_large_enough(face) and is_frame_clear(frame):
logging.info("Face detected")
# Get the face embedding
embedding = face.embedding
# Find the closest match for the detected face
label, distance = find_closest_match(embedding)
# Draw bounding box and label
box = face.bbox.astype(int)
color = (0, 255, 0) if label != "Unknown" else (0, 0, 255)
cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), color, 2)
cv2.putText(frame, f"{label} ({distance:.2f})", (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
if (label != "Unknown" and check_property(label)!=True) or label == "Unknown":
# Save the current frame as an image
frame_filename = f"result/frame_{frame_count:04d}.jpg"
cv2.imwrite(frame_filename, frame)
logging.info(f"Saved {frame_filename}")
# Increment the frame counter
frame_count += 1
send_frame_to_telegram(frame,label)
add_property(label)
if label == "Unknown":
announce_visitor()
else:
announce_known(label)
# Write the frame to the output file
# Display the processed frame
#cv2.imshow("Video", frame)
# Break on 'q' key
#if cv2.waitKey(1) & 0xFF == ord('q'):
#break
out.write(frame)
except KeyboardInterrupt:
logging.info("Stream stopped by user.")
break
except Exception as e:
print(f"Error analyzing frame: {e}")
# Release the video stream and writer
video_capture.release()
out.release()
cv2.destroyAllWindows()