SmartLock example on Zynq7000
Introduction
We will now paste the code of the example application of the SmartLock running on a Zynq7000 (in our case an Arty-7Z020 board) and explain how it works piece by piece.
#!/usr/bin/env python
# coding: utf-8
import numpy as np
from driver import io_shape_dict
from driver_base import FINNExampleOverlay
import time
import cv2
import zmq
import json
import base64
import imageio
import io
import os
import musebox_pcl_bindings_feature_extractor as fe3d
import musebox_pcl_bindings_segmenter as ec
import argparse
from yunet import YuNet
mean = [0.6071, 0.4609, 0.3944]
std = [0.2457, 0.2175, 0.2129]
database_dict_list = []
# Load the cascade
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
bsize = 1
bitfile = "/home/xilinx/jupyter_notebooks/smart-lock/bitfile/smartlock.bit"
weights = "/home/xilinx/jupyter_notebooks/smart-lock/driver/runtime_weights/"
platform = "zynq-iodma"
driver = FINNExampleOverlay(
bitfile_name=bitfile,
platform=platform,
io_shape_dict=io_shape_dict,
batch_size=bsize,
runtime_weight_dir=weights,
)
def run(image_input):
global driver, mean, std
image_input = image_input.astype(np.float32)
image_input[:,:,0] /= 255
image_input[:,:,1] /= 255
image_input[:,:,2] /= 255
image_input = cv2.resize(image_input, (140, 140))
image_input[:,:,0] -= mean[0]
image_input[:,:,1] -= mean[1]
image_input[:,:,2] -= mean[2]
image_input[:,:,0] /= std[0]
image_input[:,:,1] /= std[1]
image_input[:,:,2] /= std[2]
image_input *= 255
ibuf_normal = image_input.astype(np.uint8).reshape(driver.ibuf_packed_device[0].shape)
driver.copy_input_data_to_device(ibuf_normal)
driver.execute_on_buffers()
obuf_normal = np.empty_like(driver.obuf_packed_device[0])
driver.copy_output_data_from_device(obuf_normal)
obuf_folded = driver.unpack_output(obuf_normal)
final_output = driver.unfold_output(obuf_folded)
return final_output
# ip 192.168.188.107
def init_sender(ip="tcp://*:5557"):
global context, socket_pub
############## SEND #############
socket_pub = context.socket(zmq.PUB)
socket_pub.bind("tcp://*:5557")
context = zmq.Context()
socket = None
socket_pub = None
def init_listener(ip="tcp://192.168.188.60:5556"):
global context, socket
# CREATE COMMUNICATION LISTENER
# context = zmq.Context()
############## RECEIVE #############
socket = context.socket(zmq.SUB)
socket.connect(ip)
socket.setsockopt_string(zmq.SUBSCRIBE, str(''))
def get_data():
global context, socket
message = None
i = 0
while i < 1:
try:
zmq_response = socket.recv()
# print("has received")
message = json.loads(zmq_response)
i = i + 1
except Exception as e:
print("Exception: ", e)
return message
def send_trigger(trigger: str):
global socket_pub
message = {"trigger": trigger}
socket_pub.send_string(json.dumps(message))
def init_database(folder: str):
global database_dict_list
for filename in os.listdir(folder):
img = cv2.imread(os.path.join(folder, filename))
features = run(img)
if img is not None:
database_dict_list.append({"features": features, "label": filename})
def init(ip_address: str):
ip_address_ = "tcp://" + ip_address + ":5556"
init_listener(ip_address_)
init_sender()
init_database("database")
def main_sl(ip_address: str, camera_index: int):
init(ip_address)
model = YuNet(modelPath="face_detection_yunet_2022mar.onnx",
inputSize=[320, 320],
confThreshold=0.9,
nmsThreshold=0.3,
topK=5000,
)
try:
cap.release()
except:
pass
cap = cv2.VideoCapture(camera_index)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
pcl_similarity = 2500
fr_similarity = 700
aspect_ratio = (320/320, 320/320)
print("Start Smart Lock Routine")
while True:
message_recv = get_data()
_, img = cap.read()
if img is None:
print("I can't read from the camera!")
continue
result_segmenter = ec.euclidean_clustering(message_recv["point_cloud_camera"])
res = fe3d.feature_extractor_3d(result_segmenter, message_recv["point_cloud_database"])
if res > pcl_similarity:
print("NOT A PERSON!")
print(res)
send_trigger("continue")
continue
sim_ = 99999
person_ = ""
try:
image = img.copy()
image = cv2.resize(image, (320, 320))
img = cv2.resize(img, (320, 320))
model.setInputSize([320, 320])
result = model.infer(img)
output = None
for det in result:
bbox = det[0:4].astype(np.int32)
output = bbox
break
image = image[int(bbox[1]*aspect_ratio[1]):int((bbox[1]+bbox[3])*aspect_ratio[1]), int(bbox[0]*aspect_ratio[1]):int((bbox[0]+bbox[2])*aspect_ratio[1])]
features = run(image)
for elem in database_dict_list:
sim_face = np.linalg.norm(features - elem["features"])
if sim_face < fr_similarity:
if sim_face < sim_:
sim_ = sim_face
person_ = elem["label"]
if sim_ != 99999:
sim_ = 99999
print("person is: ", person_)
else:
print("UNKOWN")
send_trigger("continue")
except:
send_trigger("continue")
pass
if __name__ == "__main__":
# Create the parser
parser = argparse.ArgumentParser()# Add an argument
parser.add_argument('--ip', type=str, required=True)# Parse the argument
parser.add_argument('--camera_index', type=int, required=True)# Parse the argument
args = parser.parse_args()
main_sl(args.ip, args.camera_index)
We now dig function per function and explain its functionality.
Run Face Recognition
def run(image_input):
global driver, mean, std
image_input = image_input.astype(np.float32)
image_input[:,:,0] /= 255
image_input[:,:,1] /= 255
image_input[:,:,2] /= 255
image_input = cv2.resize(image_input, (140, 140))
image_input[:,:,0] -= mean[0]
image_input[:,:,1] -= mean[1]
image_input[:,:,2] -= mean[2]
image_input[:,:,0] /= std[0]
image_input[:,:,1] /= std[1]
image_input[:,:,2] /= std[2]
image_input *= 255
ibuf_normal = image_input.astype(np.uint8).reshape(driver.ibuf_packed_device[0].shape)
driver.copy_input_data_to_device(ibuf_normal)
driver.execute_on_buffers()
obuf_normal = np.empty_like(driver.obuf_packed_device[0])
driver.copy_output_data_from_device(obuf_normal)
obuf_folded = driver.unpack_output(obuf_normal)
final_output = driver.unfold_output(obuf_folded)
return final_output
This function takes in input a bounding box processed using the python bindings of OpenCV and return the features that describe it. The feature extraction is performed using our custom Face Recognition model. The image normalization is mandatory before inferring the model, but is taken care from the function.
TCP connection with an external workstation
def init_sender(ip="tcp://*:5557"):
global context, socket_pub
############## SEND #############
socket_pub = context.socket(zmq.PUB)
socket_pub.bind("tcp://*:5557")
context = zmq.Context()
socket = None
socket_pub = None
def init_listener(ip="tcp://192.168.188.60:5556"):
global context, socket
# CREATE COMMUNICATION LISTENER
# context = zmq.Context()
############## RECEIVE #############
socket = context.socket(zmq.SUB)
socket.connect(ip)
socket.setsockopt_string(zmq.SUBSCRIBE, str(''))
def get_data():
global context, socket
message = None
i = 0
while i < 1:
try:
zmq_response = socket.recv()
# print("has received")
message = json.loads(zmq_response)
i = i + 1
except Exception as e:
print("Exception: ", e)
return message
def send_trigger(trigger: str):
global socket_pub
message = {"trigger": trigger}
socket_pub.send_string(json.dumps(message))
This group of functions creates a publisher and a subscriber, based on the ZMQ library and the TCP protocol, used to create a double communication channel between the host pc and the Zynq7000. The functions init_listener
and init_sender
initialize the subscriber and the publisher respectively. Both take as argument the respective IP address publisher/subscriber (or rather the host PC and the Zynq7000). For the publisher we will adopt the technique of broadcasting the messages to every subscriber listening to a specific port, so no particular IP address would be required to use this function. send_trigger
is used to sync the read/write mechanism between the host pc and the Zynq7000. The host PC listens to the Arty, and sends a message every time a trigger is sent from the Zynq7000 using this function.
Load the photos to be recognized
def init_database(folder: str):
global database_dict_list
for filename in os.listdir(folder):
img = cv2.imread(os.path.join(folder, filename))
features = run(img)
if img is not None:
database_dict_list.append({"features": features, "label": filename})
This function loads the photos to be recognized and extracts a vocabulary of features and a label out of it. The features will be used in the Face Recognition process, and the label is used to assign a name if a person is recognized in the process.
Main Loop
def main_sl(ip_address: str, camera_index: int):
init(ip_address)
model = YuNet(modelPath="face_detection_yunet_2022mar.onnx",
inputSize=[320, 320],
confThreshold=0.9,
nmsThreshold=0.3,
topK=5000,
)
try:
cap.release()
except:
pass
cap = cv2.VideoCapture(camera_index)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
pcl_similarity = 2500
fr_similarity = 700
aspect_ratio = (320/320, 320/320)
print("Start Smart Lock Routine")
while True:
message_recv = get_data()
_, img = cap.read()
if img is None:
print("I can't read from the camera!")
continue
result_segmenter = ec.euclidean_clustering(message_recv["point_cloud_camera"])
res = fe3d.feature_extractor_3d(result_segmenter, message_recv["point_cloud_database"])
if res > pcl_similarity:
print("NOT A PERSON!")
print(res)
send_trigger("continue")
continue
sim_ = 99999
person_ = ""
try:
image = img.copy()
image = cv2.resize(image, (320, 320))
img = cv2.resize(img, (320, 320))
model.setInputSize([320, 320])
result = model.infer(img)
output = None
for det in result:
bbox = det[0:4].astype(np.int32)
output = bbox
break
image = image[int(bbox[1]*aspect_ratio[1]):int((bbox[1]+bbox[3])*aspect_ratio[1]), int(bbox[0]*aspect_ratio[1]):int((bbox[0]+bbox[2])*aspect_ratio[1])]
features = run(image)
for elem in database_dict_list:
sim_face = np.linalg.norm(features - elem["features"])
if sim_face < fr_similarity:
if sim_face < sim_:
sim_ = sim_face
person_ = elem["label"]
if sim_ != 99999:
sim_ = 99999
print("person is: ", person_)
else:
print("UNKOWN")
send_trigger("continue")
except:
send_trigger("continue")
pass
In the main loop we put together all the functions to create our desired flow.
First the publisher, subscriber and vocabulary of people are initialized through the function init
.
init(ip_address)
Secondly, we load the model of Face Detection in the variable model
.
model = YuNet(modelPath="face_detection_yunet_2022mar.onnx",
inputSize=[320, 320],
confThreshold=0.9,
nmsThreshold=0.3,
topK=5000,
)
We initialize the camera using the variable cap
. This is our object which grabs the 2D frames.
try:
cap.release()
except:
pass
cap = cv2.VideoCapture(camera_index)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
We set our similarity threshold values for Face Recognition (fr_similarity
) and 3D Person Detection (pcl_similarity
). The lower those values are the more robust the system is, but with less probability to give an output.
pcl_similarity = 2500
fr_similarity = 700
We receive the messages from the publisher of the 3D acquisition and grab simultaneously a frame from the 2D camera:
message_recv = get_data()
_, img = cap.read()
if img is None:
print("I can't read from the camera!")
continue
We check that there is actually a face in front of the camera.
result_segmenter = ec.euclidean_clustering(message_recv["point_cloud_camera"])
res = fe3d.feature_extractor_3d(result_segmenter, message_recv["point_cloud_database"])
if res > pcl_similarity:
print("NOT A PERSON!")
print(res)
send_trigger("continue")
continue
If there is a person, we start the 2D branch, by firstly detecting and cropping a face.
image = img.copy()
image = cv2.resize(image, (320, 320))
img = cv2.resize(img, (320, 320))
model.setInputSize([320, 320])
result = model.infer(img)
output = None
for det in result:
bbox = det[0:4].astype(np.int32)
output = bbox
break
image = image[int(bbox[1]*aspect_ratio[1]):int((bbox[1]+bbox[3])*aspect_ratio[1]), int(bbox[0]*aspect_ratio[1]):int((bbox[0]+bbox[2])*aspect_ratio[1])]
We perform Face Recognition:
features = run(image)
And then we check if the person is present into the database, and if so we assign a label to the most likely similar person, otherwise we restart the loop:
for elem in database_dict_list:
sim_face = np.linalg.norm(features - elem["features"])
if sim_face < fr_similarity:
if sim_face < sim_:
sim_ = sim_face
person_ = elem["label"]
if sim_ != 99999:
sim_ = 99999
print("person is: ", person_)
else:
print("UNKOWN")