main
zhoujinjuan 1 year ago
commit 1e0e96af1c

@ -0,0 +1,234 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fastdeploy as fd
import os
import time
from config import BASE_MODEL_PATH
def parse_arguments():
import argparse
import ast
parser = argparse.ArgumentParser()
parser.add_argument(
"--det_model",
# required=True,
# default="../models/ocr/ch_PP-OCRv3_det_infer",
default=(BASE_MODEL_PATH / "ocr/ch_PP-OCRv3_det_infer").as_posix(),
help="Path of Detection model of PPOCR.")
parser.add_argument(
"--cls_model",
# required=True,
default=(BASE_MODEL_PATH / "ocr/ch_ppocr_mobile_v2.0_cls_infer").as_posix(),
help="Path of Classification model of PPOCR.")
parser.add_argument(
"--rec_model",
# required=True,
default=(BASE_MODEL_PATH / "ocr/ch_PP-OCRv3_rec_infer").as_posix(),
help="Path of Recognization model of PPOCR.")
parser.add_argument(
"--rec_label_file",
# required=True,
default=(BASE_MODEL_PATH / "ocr/ppocr_keys_v1.txt").as_posix(),
help="Path of Recognization model of PPOCR.")
parser.add_argument(
"--image",
default="./12.jpg",
type=str,
# required=True,
help="Path of test image file.")
parser.add_argument(
"--device",
type=str,
default='gpu',
help="Type of inference device, support 'cpu' or 'gpu'.")
parser.add_argument(
"--device_id",
type=int,
default=0,
help="Define which GPU card used to run model.")
parser.add_argument(
"--cls_bs",
type=int,
default=1,
help="Classification model inference batch size.")
parser.add_argument(
"--rec_bs",
type=int,
default=6,
help="Recognition model inference batch size")
parser.add_argument(
"--backend",
type=str,
default="trt",
help="Type of inference backend, support ort/trt/paddle/openvino, default 'openvino' for cpu, 'tensorrt' for gpu"
)
return parser.parse_args()
def build_option(args):
det_option = fd.RuntimeOption()
cls_option = fd.RuntimeOption()
rec_option = fd.RuntimeOption()
if args.device.lower() == "gpu":
det_option.use_gpu(args.device_id)
cls_option.use_gpu(args.device_id)
rec_option.use_gpu(args.device_id)
if args.backend.lower() == "trt":
assert args.device.lower(
) == "gpu", "TensorRT backend require inference on device GPU."
det_option.use_trt_backend()
cls_option.use_trt_backend()
rec_option.use_trt_backend()
# If use TRT backend, the dynamic shape will be set as follow.
# We recommend that users set the length and height of the detection model to a multiple of 32.
# We also recommend that users set the Trt input shape as follow.
det_option.set_trt_input_shape("x", [1, 3, 64, 64], [1, 3, 640, 640],
[1, 3, 960, 960])
cls_option.set_trt_input_shape("x", [1, 3, 48, 10],
[args.cls_bs, 3, 48, 320],
[args.cls_bs, 3, 48, 1024])
rec_option.set_trt_input_shape("x", [1, 3, 48, 10],
[args.rec_bs, 3, 48, 320],
[args.rec_bs, 3, 48, 2304])
# Users could save TRT cache file to disk as follow.
det_option.set_trt_cache_file(args.det_model + "/det_trt_cache.trt")
cls_option.set_trt_cache_file(args.cls_model + "/cls_trt_cache.trt")
rec_option.set_trt_cache_file(args.rec_model + "/rec_trt_cache.trt")
elif args.backend.lower() == "pptrt":
assert args.device.lower(
) == "gpu", "Paddle-TensorRT backend require inference on device GPU."
det_option.use_paddle_infer_backend()
det_option.paddle_infer_option.collect_trt_shape = True
det_option.paddle_infer_option.enable_trt = True
cls_option.use_paddle_infer_backend()
cls_option.paddle_infer_option.collect_trt_shape = True
cls_option.paddle_infer_option.enable_trt = True
rec_option.use_paddle_infer_backend()
rec_option.paddle_infer_option.collect_trt_shape = True
rec_option.paddle_infer_option.enable_trt = True
# If use TRT backend, the dynamic shape will be set as follow.
# We recommend that users set the length and height of the detection model to a multiple of 32.
# We also recommend that users set the Trt input shape as follow.
det_option.set_trt_input_shape("x", [1, 3, 64, 64], [1, 3, 640, 640],
[1, 3, 960, 960])
cls_option.set_trt_input_shape("x", [1, 3, 48, 10],
[args.cls_bs, 3, 48, 320],
[args.cls_bs, 3, 48, 1024])
rec_option.set_trt_input_shape("x", [1, 3, 48, 10],
[args.rec_bs, 3, 48, 320],
[args.rec_bs, 3, 48, 2304])
# Users could save TRT cache file to disk as follow.
det_option.set_trt_cache_file(args.det_model)
cls_option.set_trt_cache_file(args.cls_model)
rec_option.set_trt_cache_file(args.rec_model)
elif args.backend.lower() == "ort":
det_option.use_ort_backend()
cls_option.use_ort_backend()
rec_option.use_ort_backend()
elif args.backend.lower() == "paddle":
det_option.use_paddle_infer_backend()
cls_option.use_paddle_infer_backend()
rec_option.use_paddle_infer_backend()
elif args.backend.lower() == "openvino":
assert args.device.lower(
) == "cpu", "OpenVINO backend require inference on device CPU."
det_option.use_openvino_backend()
cls_option.use_openvino_backend()
rec_option.use_openvino_backend()
elif args.backend.lower() == "pplite":
assert args.device.lower(
) == "cpu", "Paddle Lite backend require inference on device CPU."
det_option.use_lite_backend()
cls_option.use_lite_backend()
rec_option.use_lite_backend()
return det_option, cls_option, rec_option
args = parse_arguments()
det_model_file = os.path.join(args.det_model, "inference.pdmodel")
det_params_file = os.path.join(args.det_model, "inference.pdiparams")
cls_model_file = os.path.join(args.cls_model, "inference.pdmodel")
cls_params_file = os.path.join(args.cls_model, "inference.pdiparams")
rec_model_file = os.path.join(args.rec_model, "inference.pdmodel")
rec_params_file = os.path.join(args.rec_model, "inference.pdiparams")
rec_label_file = args.rec_label_file
det_option, cls_option, rec_option = build_option(args)
det_model = fd.vision.ocr.DBDetector(
det_model_file, det_params_file, runtime_option=det_option)
cls_model = fd.vision.ocr.Classifier(
cls_model_file, cls_params_file, runtime_option=cls_option)
rec_model = fd.vision.ocr.Recognizer(
rec_model_file, rec_params_file, rec_label_file, runtime_option=rec_option)
# Parameters settings for pre and post processing of Det/Cls/Rec Models.
# All parameters are set to default values.
det_model.preprocessor.max_side_len = 960
det_model.postprocessor.det_db_thresh = 0.3
det_model.postprocessor.det_db_box_thresh = 0.6
det_model.postprocessor.det_db_unclip_ratio = 1.5
det_model.postprocessor.det_db_score_mode = "slow"
det_model.postprocessor.use_dilation = False
cls_model.postprocessor.cls_thresh = 0.9
# Create PP-OCRv3, if cls_model is not needed, just set cls_model=None .
ppocr_v3 = fd.vision.ocr.PPOCRv3(
det_model=det_model, cls_model=cls_model, rec_model=rec_model)
# Set inference batch size for cls model and rec model, the value could be -1 and 1 to positive infinity.
# When inference batch size is set to -1, it means that the inference batch size
# of the cls and rec models will be the same as the number of boxes detected by the det model.
ppocr_v3.cls_batch_size = args.cls_bs
ppocr_v3.rec_batch_size = args.rec_bs
def ocr_predict(im):
start = time.perf_counter()
# Predict and reutrn the results
result = ppocr_v3.predict(im)
# Visuliaze the results.
vis_im = fd.vision.vis_ppocr(im, result)
print(f"OCR cost {(time.perf_counter() - start)*1000 :.2f} ms")
# cv2.imwrite("visualized_result.jpg", vis_im)
# print("Visualized result save in ./visualized_result.jpg")
return result, vis_im

@ -0,0 +1,101 @@
"""
封装成类会导致segment fault原因未知
"""
import fastdeploy as fd
import cv2
import os
import time
import PMP.src_dsr.src_config as src_config
class OCRAlg:
def __init__(self, use_cls=False, device_id=0, cls_bs=1, rec_bs=1) -> None:
super(OCRAlg, self).__init__()
self.ppocr_v3 = self.init_ocr(device_id, cls_bs, rec_bs)
def init_ocr(self,use_cls=False, device_id=0, cls_bs=1, rec_bs=1):
det_option = fd.RuntimeOption()
cls_option = fd.RuntimeOption()
rec_option = fd.RuntimeOption()
det_option.use_gpu(device_id)
cls_option.use_gpu(device_id)
rec_option.use_gpu(device_id)
det_option.use_trt_backend()
cls_option.use_trt_backend()
rec_option.use_trt_backend()
# If use TRT backend, the dynamic shape will be set as follow.
# We recommend that users set the length and height of the detection model to a multiple of 32.
# We also recommend that users set the Trt input shape as follow.
det_option.set_trt_input_shape("x", [1, 3, 64, 64], [1, 3, 640, 640],
[1, 3, 960, 960])
cls_option.set_trt_input_shape("x", [1, 3, 48, 10],
[cls_bs, 3, 48, 320],
[cls_bs, 3, 48, 1024])
rec_option.set_trt_input_shape("x", [1, 3, 48, 10],
[rec_bs, 3, 48, 320],
[rec_bs, 3, 48, 2304])
# Users could save TRT cache file to disk as follow.
det_option.set_trt_cache_file(src_config.model_ocr_det_path + "/det_trt_cache.trt")
cls_option.set_trt_cache_file(src_config.model_ocr_cls_path + "/cls_trt_cache.trt")
rec_option.set_trt_cache_file(src_config.model_ocr_rec_path + "/rec_trt_cache.trt")
det_model_file = os.path.join(src_config.model_ocr_det_path, "inference.pdmodel")
det_params_file = os.path.join(src_config.model_ocr_det_path, "inference.pdiparams")
print(f"det_model_file: {det_model_file}")
cls_model_file = os.path.join(src_config.model_ocr_cls_path, "inference.pdmodel")
cls_params_file = os.path.join(src_config.model_ocr_cls_path, "inference.pdiparams")
rec_model_file = os.path.join(src_config.model_ocr_rec_path, "inference.pdmodel")
rec_params_file = os.path.join(src_config.model_ocr_rec_path, "inference.pdiparams")
rec_label_file = src_config.ocr_keys_path
det_model = fd.vision.ocr.DBDetector(
det_model_file, det_params_file, runtime_option=det_option)
cls_model = fd.vision.ocr.Classifier(
cls_model_file, cls_params_file, runtime_option=cls_option)
rec_model = fd.vision.ocr.Recognizer(
rec_model_file, rec_params_file, rec_label_file, runtime_option=rec_option)
# Parameters settings for pre and post processing of Det/Cls/Rec Models.
# All parameters are set to default values.
det_model.preprocessor.max_side_len = 960
det_model.postprocessor.det_db_thresh = 0.3
det_model.postprocessor.det_db_box_thresh = 0.6
det_model.postprocessor.det_db_unclip_ratio = 1.5
det_model.postprocessor.det_db_score_mode = "slow"
det_model.postprocessor.use_dilation = False
cls_model.postprocessor.cls_thresh = 0.9
# Create PP-OCRv3, if cls_model is not needed, just set cls_model=None .
cls_model = cls_model if use_cls is True else None
ppocr_v3 = fd.vision.ocr.PPOCRv3(
det_model=det_model, cls_model=cls_model, rec_model=rec_model)
# Set inference batch size for cls model and rec model, the value could be -1 and 1 to positive infinity.
# When inference batch size is set to -1, it means that the inference batch size
# of the cls and rec models will be the same as the number of boxes detected by the det model.
ppocr_v3.cls_batch_size = cls_bs
ppocr_v3.rec_batch_size = rec_bs
return ppocr_v3
def predict(self, bgr_img):
start = time.perf_counter()
result = self.ppocr_v3.predict(bgr_img)
vis_im = fd.vision.vis_ppocr(bgr_img, result)
print(f"OCR cost {(time.perf_counter() - start)*1000 :.2f} ms")
return result, vis_im

@ -0,0 +1,59 @@
import fastdeploy as fd
import cv2
import os
def parse_arguments():
import argparse
import ast
parser = argparse.ArgumentParser()
parser.add_argument("--model", default=None, help="Path of yolov8 model.")
parser.add_argument(
"--image", default=None, help="Path of test image file.")
parser.add_argument(
"--device",
type=str,
default='gpu',
help="Type of inference device, support 'cpu' or 'gpu' or 'kunlunxin'.")
parser.add_argument(
"--use_trt",
type=ast.literal_eval,
default=True,
help="Wether to use tensorrt.")
return parser.parse_args()
def build_option(args):
option = fd.RuntimeOption()
if args.device.lower() == "gpu":
option.use_gpu()
if args.device.lower() == "ascend":
option.use_ascend()
if args.use_trt:
option.use_trt_backend()
option.set_trt_input_shape("images", [1, 3, 640, 640])
return option
args = parse_arguments()
# Configure runtime, load model
runtime_option = build_option(args)
model = fd.vision.detection.YOLOv8(args.model, runtime_option=runtime_option)
def yolo_predict(im):
# # Predicting image
# if args.image is None:
# image = fd.utils.get_detection_test_image()
# else:
# image = args.image
# im = cv2.imread(image)
result = model.predict(im)
# Visualization
vis_im = fd.vision.vis_detection(im, result)
# cv2.imwrite("visualized_result.jpg", vis_im)
# print("Visualized result save in ./visualized_result.jpg")
return result,vis_im

@ -0,0 +1,41 @@
import fastdeploy as fd
from pathlib import Path
import cv2
import os
class YOLOAlg:
def __init__(self, model_path) -> None:
super(YOLOAlg, self).__init__()
self.model_path = model_path
self.model = self.init_model()
def build_option(self):
option = fd.RuntimeOption()
option.use_gpu()
option.use_trt_backend()
option.set_trt_input_shape("images", [1, 3, 640, 640])
trt_path = Path(self.model_path).with_suffix(".trt")
option.set_trt_cache_file(trt_path.as_posix())
return option
def init_model(self):
# Configure runtime, load model
runtime_option = self.build_option()
model = fd.vision.detection.YOLOv8(self.model_path, runtime_option=runtime_option)
return model
def predict_yolo(self, bgr_img):
result = self.model.predict(bgr_img)
rendered_img = bgr_img.copy()
# Visualization
vis_im = fd.vision.vis_detection(rendered_img, result)
# cv2.imwrite("visualized_result.jpg", vis_im)
# print("Visualized result save in ./visualized_result.jpg")
return result, vis_im

@ -0,0 +1,85 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project yolo_demos
@File config.py
@IDE PyCharm
@Author zjj
@Date 2023/9/22 10:19
"""
# yolo 语义分割模型
from pathlib import Path
PERSON = "person"
CAR = "car"
LABEL_NAMES = {0: PERSON, 2: CAR}
# 过程中保存图片路径
# SAVE_BASE_DIR = Path('/home/TP/PMP/media/tp_result')
# SAVE_BASE_DIR = Path(os.getenv("OTHER_SAVING_PATH", "../media/tp_result"))
# BASE_MODEL_PATH = Path(__file__).parent.parent.parent / "models"
# 临时路径
BASE_MODEL_PATH = Path('/mnt/large/zhoujinjuan_data/data/Filter_Object_models/models')
# yolo detect model path
DETECT_MODEL = (BASE_MODEL_PATH / "yolov8n.pt").as_posix()
SEGMENT_MODEL = (BASE_MODEL_PATH / "yolov8n-seg.pt").as_posix()
# scrfd模型
SCRFD_PATH_DICT = {
"onnx": (BASE_MODEL_PATH / "scrfd_500m_bnkps_shape640x640.onnx").as_posix(),
"trt": (BASE_MODEL_PATH / "scrfd_500m_bnkps_shape640x640.trt").as_posix(),
}
# insightface模型
INSIGHTFACE_PATH_DICT = {
"onnx": (BASE_MODEL_PATH / "partial_fc_glint360k_r100.onnx").as_posix(),
"trt": (BASE_MODEL_PATH / "partial_fc_glint360k_r100.trt").as_posix(),
}
# 交警模型
POLICE_PATH = (BASE_MODEL_PATH / "TP_police.onnx").as_posix()
# track 配置文件
TRACK_YAML = (BASE_MODEL_PATH / "my_botsort.yaml").as_posix()
# articulation_model_path articulation_range_path
ARTICULATION_MODEL_PATH = (BASE_MODEL_PATH / "brisque_model_live.yml").as_posix()
ARTICULATION_RANGE_PATH = (BASE_MODEL_PATH / "brisque_range_live.yml").as_posix()
# todo 帧数阈值设置要结合帧率,以及是否跳帧
# 大框图数量阈值
# PERSON: 10, CAR: 200 LICENSE_COUNT_THRESHOLD = 3 由于3帧检测一次所以值除以3
# ***********no use************
BIG_FRAME_NUM = {PERSON: 3, CAR: 30}
# 车牌号帧数阈值
LICENSE_COUNT_THRESHOLD = 3
# 人脸保存相似度阈值,相似度大于阈值则不保存
FACE_SAVE_THRESHOLD = 0.5
# 文件夹之间人脸相似阈值,大于阈值认为相似
FACE_COMPARE_THRESHOLD = 0.3
# 两个文件夹相似文件数占原文件夹总文件数比例超过0.5 则认为两个文件夹相似
DIR_RATIO_THRESHOLD = 0.5
# person 返回topn
TOPN = {PERSON: 5, CAR: 2}
# ***********no use************
# 人脸检测阈值 0.85
FACE_THRESHOLD = 0.85
# 目标检测置信度阈值 0.8
DETECT_CONF_THRESHOLD = 0.8
# 框面积阈值 todo 存在有的被执法人就是离镜头远或者模型画的框小 15
BOX_AREA_THRESHOLD = 15
# 每一帧内处理面积top2个大框图
TOPN_AREA = 2
# 目标检测框和交警框iou超过阈值则认为该目标是交警
POLICE_IOU = 0.5
# 人脸清晰度阈值 0-100之间
ARTICULATION_THD = 30

@ -0,0 +1,213 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project deep-head-pose-master
@File fd_face_detection.py
@IDE PyCharm
@Author zjj
@Date 2023/9/12 13:38
"""
import fastdeploy as fd
import cv2
from loguru import logger
from config import SCRFD_PATH_DICT, INSIGHTFACE_PATH_DICT, FACE_THRESHOLD
from utils import cosine_similarity
class FaceRecognition:
"""
人脸识别类
"""
def __init__(
self,
id_model=INSIGHTFACE_PATH_DICT["onnx"],
det_model=SCRFD_PATH_DICT["onnx"],
device="gpu",
use_trt=False,
):
"""
初始化相机人脸检测器
:param id_model: 人脸识别模型
:param det_model: 人脸检测模型
:param device: 使用设备"cpu"/"gpu"
:param use_trt: 是否使用tenserRT加速
"""
self.id_model = id_model
self.det_model = det_model
self.device = device
self.use_trt = use_trt
# 用统一的方法加载模型
self.det_model = self.load_model(fd.vision.facedet.SCRFD, det_model, "SCRFD")
self.id_model = self.load_model(
fd.vision.faceid.PartialFC, id_model, "PartialFC"
)
def load_model(self, model_class, model_path, model_type):
"""
加载模型
:param model_class: 实例化模型类
:param model_path: 模型路径
:param model_type: 模型类型
:return:
"""
runtime_option = self.set_option(model_type)
model = model_class(model_path, runtime_option=runtime_option)
return model
@staticmethod
def extract_face(frame, box, padding=10):
"""
从图像帧中提取人脸图像
:param frame: 输入的图像帧
:param box: 人脸检测框
:padding: 放大box大小
:return: 提取的人脸图像
"""
# 提取人脸检测框的边界坐标
# x_min, y_min, x_max, y_max = map(int, box)
# 提取人脸检测框的边界坐标,并添加额外的空白区域
x_min, y_min, x_max, y_max = map(
int,
[box[0] - padding, box[1] - padding, box[2] + padding, box[3] + padding],
)
# 确保人脸图像的提取范围不超出原始图像边界
x_min = max(0, x_min)
x_max = min(frame.shape[1], x_max)
y_min = max(0, y_min)
y_max = min(frame.shape[0], y_max)
# 从原始图像中提取人脸图像
face_image = frame[y_min:y_max, x_min:x_max]
face_image = cv2.resize(
face_image, (112, 112), interpolation=cv2.INTER_AREA
) # 将人脸缩放到人脸识别模型默认的大小
return face_image
def set_option(self, model_type):
"""
构建运行时选项
:return: option
"""
option = fd.RuntimeOption()
if self.device.lower() == "gpu":
option.use_gpu()
if self.use_trt:
option.use_trt_backend()
if model_type == "PartialFC":
option.set_trt_input_shape("data", [1, 3, 112, 112]) # fd默认参数
option.set_trt_cache_file(INSIGHTFACE_PATH_DICT["trt"])
if model_type == "SCRFD":
option.set_trt_input_shape("images", [1, 3, 640, 640])
option.set_trt_cache_file(SCRFD_PATH_DICT["trt"])
return option
def get_img_embedding(self, img):
"""
用模型为img编码
:param img:
:return: list(512)
"""
img_copy = img.copy()
base_face = self.id_model.predict(img_copy) # 基准照
embedding = base_face.embedding
return embedding
def face_compare(self, base_img, current_img):
"""
人脸识别
:param base_img: 基准照
:param current_img: 要对比的当前考生照
:return: 相似度值
"""
# todo id_model会改变输入的参数
embed1 = self.get_img_embedding(base_img)
embed2 = self.get_img_embedding(current_img)
cos = cosine_similarity(embed1, embed2)
return cos
def get_crop_face(self, frame):
# 检测人脸 todo conf_threshold default 0.7
result = self.det_model.predict(frame, conf_threshold=0.7)
if result and result.boxes and result.scores[0] > FACE_THRESHOLD:
# 裁剪后的基准照 不管多少个人脸,只选取第一个
base_face = self.extract_face(frame, result.boxes[0])
return True, base_face
else:
logger.warning("无法检测到人脸")
return False, None
# def get_target_face(self, frame, p1=None, p2=None):
# result = self.det_model.predict(frame)
# if not result:
# return False, None
# imh, imw, _ = frame.shape
# min_dis = float('inf')
# target_box = []
# for box, score in zip(result.boxes, result.scores):
# if score > FACE_THRESHOLD:
# xmin, ymin, xmax, ymax = box
# if p1 and p2:
# if xmin >= p1[0] and ymin >= p1[1] and xmax <= p2[0] and ymax <= p2[1]:
# box_x_mid = xmin + (xmax - xmin) / 2
# # 多个人脸时选取离中心轴最近的人脸
# dis = abs(box_x_mid - imw / 2)
# if dis < min_dis:
# target_box = box
# if target_box:
# base_face = self.extract_face(frame, target_box)
# return True, base_face
# return False, None
def get_target_face(self, frame, mask=None):
"""
:param frame:
:param mask: 目标是1其余是0
:return:
"""
result = self.det_model.predict(frame)
if not result:
return False, None
imh, imw, _ = frame.shape
target_box = []
for box, score in zip(result.boxes, result.scores):
if score > FACE_THRESHOLD:
xmin, ymin, xmax, ymax = box
box_x_mid = int(xmin + (xmax - xmin) / 2)
box_y_mid = int(ymin + (ymax - ymin) / 2)
# 人脸框中心点在目标范围内
if mask[box_y_mid, box_x_mid] > 0:
target_box = box
if target_box:
base_face = self.extract_face(frame, target_box)
return True, base_face
return False, None
if __name__ == "__main__":
# faceid = FaceRecognition(device="cpu", use_trt=False)
faceid = FaceRecognition(
id_model=r"E:\resources\DT3 models\partial_fc_glint360k_r100.onnx",
det_model=r"E:\resources\DT3 models\scrfd_500m_bnkps_shape640x640.onnx",
device="cpu",
use_trt=False,
)
# img = cv2.imread('../data/track/13.png')
# embed = faceid.get_img_embedding(img)
# has_face, face = faceid.get_crop_face(img)
# if has_face:
# cv2.imwrite('../data/track/4_face.png', face)
# else:
# print('no face')
# img1 = cv2.imread(r'E:\\tmp\198_person\198_0.png')
# img2 = cv2.imread(r'E:\\tmp\198_person\198_1.png')
# cos = faceid.face_compare(img2, img1)
# print(cos)

@ -0,0 +1,211 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project yolo_demos
@File main.py
@IDE PyCharm
@Author zjj
@Date 2023/9/26 9:29
"""
import uuid
import cv2
import fastdeploy as fd
import numpy as np
from loguru import logger
from ultralytics import YOLO
from config import (
PERSON,
CAR,
SEGMENT_MODEL,
TOPN_AREA,
LABEL_NAMES,
BOX_AREA_THRESHOLD,
DETECT_CONF_THRESHOLD,
POLICE_PATH, ARTICULATION_THD,
)
from fd_face_detection import FaceRecognition
from utils import (
extract_yolo_results,
get_police,
predict_ocr,
get_target_mask,
draw_rectangle_text,
get_coordinate,
det_articulation,
)
class TrackMain(object):
def __init__(self, detect_model_path=SEGMENT_MODEL, device="gpu", use_trt=False):
# fastdeploy调用GPU时会使用
self.option = fd.RuntimeOption().use_gpu()
# 目标检测模型
self.model = YOLO(detect_model_path)
# 人脸检测和识别对象
self.face_det = FaceRecognition(device=device, use_trt=use_trt)
# 交警模型
# self.model_traffic_police = YOLO(POLICE_PATH)
self.model_traffic_police = fd.vision.detection.YOLOv8(
POLICE_PATH, runtime_option=self.option # 使用fastdeploy加载交警模型
)
def process_one_frame(self, frame):
results = self.model(frame)
# 提取模型检测结果 infos=[xywhs, cls, scores, xy_list]
is_hit, infos = extract_yolo_results(results)
if not is_hit:
return [], []
# before:没有筛选之前统计目标频率保存截图时筛选top2大框
# now:不统计频率大框筛选之后是否有必要取topN再过滤---先设置过滤不想过滤则可以调整N值很大
# I面积和置信度过滤统计剩余目标的相关信息左上角和右下角坐标,label,conf,box_area,轮廓坐标
tinfos_each_frame = self.statistics_one_frame(infos)
# 每帧每个类别显示topn大的目标按文件夹保存person类别保留人脸相似的保留一个car类别保存目标检测框图有一个算一个
# annotated_frame = results[0].plot()
face_embeddings, licenses = self.process_topn_in_one_frame(frame, tinfos_each_frame)
return face_embeddings, licenses
def statistics_one_frame(self, infos):
"""
每一帧内统计每个目标的信息
:param infos:
:return:
"""
xywhs, cls, scores, xy_list = infos
tinfos_each_frame = {PERSON: [], CAR: []}
for xywh, label_id, score, polygon in zip(
xywhs, cls, scores, xy_list
):
if label_id not in list(LABEL_NAMES.keys()):
continue
# 每个目标的信息
info = {'p_left_up': None, 'p_right_bottom': None, 'label': None, 'conf': None, 'box_area': None,
'polygon_indexs': None}
x, y, w, h = xywh
# 根据xywh获取左上角和右下角坐标
p1, p2 = get_coordinate(xywh)
# 计算框面积
s = int((w * h) / 10000)
label = LABEL_NAMES[label_id]
conf = round(score, 2)
# 通过面积过滤掉一些小框
if s <= BOX_AREA_THRESHOLD:
continue
if conf <= DETECT_CONF_THRESHOLD:
continue
# 记录该类别下该目标的相关信息
info['p_left_up'] = p1
info['p_right_bottom'] = p2
info['label'] = label
info['conf'] = conf
info['box_area'] = s
info['polygon_indexs'] = polygon.astype(int)
tinfos_each_frame[label].append(info)
return tinfos_each_frame
def process_topn_in_one_frame(self, frame, tinfos_each_frame):
"""
处理每一帧中的topn大框对象
:param frame:
:param tinfos_each_frame: 每个类别下各个目标的相关信息
:return:
"""
frame_copy = frame.copy()
police_indexs = []
all_licenses = []
all_face_embeddings = []
for label, target_infos in tinfos_each_frame.items():
# 按照框面积大小降序排序
target_infos.sort(key=lambda x: x['box_area'], reverse=True)
# todo 交警 先对整个图片上前topn的大框图中的person送到交警检测模型中检测是否是交警
if label == PERSON:
police_indexs = get_police(
frame_copy, target_infos[:TOPN_AREA], self.model_traffic_police
)
# 每帧每个类别显示top2大的目标
for index, info in enumerate(target_infos[:TOPN_AREA]):
is_hit = False
# label = info['label']
p1 = info['p_left_up']
p2 = info['p_right_bottom']
polygon = info['polygon_indexs']
# 直接用目标检测的框
target_img = frame[p1[1]: p2[1], p1[0]: p2[0]]
target_img = target_img.astype(np.uint8)
if label == CAR:
# todo 调用ocr处理车牌号统计车牌出现次数保存有车牌的图片
# licenses = predict_ocr(target_img, self.ocr)
licenses = predict_ocr(target_img)
licenses = list(set(licenses))
all_licenses.extend(licenses)
if licenses:
is_hit = True
elif label == PERSON:
# 是交警,则不处理
if index in police_indexs:
continue
# 提取目标的语义分割图像,配白底,保存; 保存所有,不考虑相似的只保存一个,防止有车牌的被过滤掉
target_mask = get_target_mask(frame, polygon)
# person类型检测并提取人脸相似的只保存一个即保存的时候就过滤一些
# todo 对整个图检测人脸可能得到多个人脸但只对target_img的白底目标图检测人脸很可能无法检测到人脸因此结合目标检测的框和人脸检测算法框定目标的人脸
has_face, face = self.face_det.get_target_face(frame, target_mask)
if has_face:
# 用人脸清晰度过滤
score = det_articulation(face)
# todo 阈值大小多少合适??
logger.debug(f'人脸清晰度{score}')
if score < ARTICULATION_THD:
continue
face_embedding = self.face_det.get_img_embedding(face)
all_face_embeddings.append(face_embedding)
is_hit = True
# todo test
if is_hit:
frame_copy = draw_rectangle_text(
frame_copy, index, p1, p2, label, info['conf'], -1, info['box_area']
)
# todo test 测试过程中保存图片可视化
# if all_face_embeddings and not all_licenses:
# name = str(uuid.uuid4())[:7]
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/face_' + name + '.png', frame_copy)
# elif not all_face_embeddings and all_licenses:
# name = str(uuid.uuid4())[:7]
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/car_' + name + '.png', frame_copy)
# elif all_face_embeddings and all_licenses:
# name = str(uuid.uuid4())[:7]
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/all_' + name + '.png', frame_copy)
return all_face_embeddings, all_licenses
if __name__ == '__main__':
import os
obj = TrackMain()
video_path = os.path.join('/mnt/large/zhoujinjuan_data/data/4.mp4')
cap = cv2.VideoCapture(video_path)
all_face_embeds = []
all_licenses_list = []
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
face_embeds, license_list = obj.process_one_frame(frame)
all_face_embeds.extend(face_embeds)
all_licenses_list.extend(license_list)
print(f'frame count {frame_count}\n face embeddings {len(all_face_embeds)}\nlicenses {len(all_licenses_list)}')

@ -0,0 +1,696 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project yolo_demos
@File utils.py
@IDE PyCharm
@Author zjj
@Date 2023/9/21 10:15
"""
import os.path
import re
import shutil
from algs import ocr_alg
from pathlib import Path
import cv2
import numpy as np
# from track.main import TrackMain
from config import (
BIG_FRAME_NUM,
LABEL_NAMES,
FACE_SAVE_THRESHOLD,
DIR_RATIO_THRESHOLD,
PERSON,
CAR,
FACE_COMPARE_THRESHOLD,
ARTICULATION_MODEL_PATH,
ARTICULATION_RANGE_PATH,
POLICE_IOU,
)
def save_img(track_id, label, frame, save_dir, origin=None):
dir_name = str(track_id) + "_" + label
dirx = save_dir / dir_name
if not dirx.exists():
dirx.mkdir()
fnames = os.listdir(dirx.as_posix())
lens = len(fnames)
name = str(lens)
# name = str(uuid.uuid4())[:6]
out_path = dirx / f"{track_id}_{name}.png"
cv2.imwrite(out_path.as_posix(), frame)
if origin is not None:
out_path = dirx / f"{track_id}_{name}_origin.png"
cv2.imwrite(out_path.as_posix(), origin)
def save_car(license_plate, img, save_dir):
"""
按车牌号分目录保存车图片
:param license_plate:
:param img:
:param save_dir:
:return:
"""
dir_name = license_plate + "_" + CAR
dirx = save_dir / dir_name
if not dirx.exists():
dirx.mkdir()
fnames = os.listdir(dirx.as_posix())
lens = len(fnames)
name = str(lens)
out_path = dirx / f"{name}.png"
cv2.imwrite(out_path.as_posix(), img)
def filter_by_big_box(big_box_count):
"""
对每个类别下每个目标的大框图数量和阈值比较保留满足阈值的tid
:param big_box_count:
:return:
"""
result = dict()
for label, tid_count in big_box_count.items():
result[label] = []
for tid, count in tid_count.items():
if count > BIG_FRAME_NUM[label]:
result[label].append(tid)
return result
def get_face_tids(save_dir):
"""
过滤没有保存人脸的target id
:return:
"""
face_tids = []
for pathx in save_dir.iterdir():
stem = pathx.stem
tid, label = stem.split("_")
if label == PERSON:
face_tids.append(int(tid))
return face_tids
# print(get_face_tids())
# def get_():
# import cv2
# from ultralytics import YOLO
# import numpy as np
# import torch
#
# img = cv2.imread('ultralytics/assets/bus.jpg')
# model = YOLO('yolov8m-seg.pt')
# results = model.predict(source=img.copy(), save=True, save_txt=False, stream=True)
# for result in results:
# # get array results
# masks = result.masks.masks
# boxes = result.boxes.boxes
# # extract classes
# clss = boxes[:, 5]
# # get indices of results where class is 0 (people in COCO)
# people_indices = torch.where(clss == 0)
# # use these indices to extract the relevant masks
# people_masks = masks[people_indices]
# # scale for visualizing results
# people_mask = torch.any(people_masks, dim=0).int() * 255
# # save to file
# cv2.imwrite(str(model.predictor.save_dir / 'merged_segs.jpg'), people_mask.cpu())
def get_target(frame, polygon, p1, p2):
# 转灰度
gray_img = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
mask = np.zeros_like(gray_img)
# 轮廓内白-255 外黑-0
cv2.polylines(mask, [polygon], True, (255, 0, 0), thickness=1)
cv2.fillPoly(mask, pts=[polygon], color=(255, 255, 255, 128), lineType=cv2.LINE_AA)
# cv2.imwrite('../data/track/polygon.png', mask)
# mask 人-255 背景-0
# 转成 人-1 背景-0
_, binary_image = cv2.threshold(mask, 128, 1, cv2.THRESH_BINARY)
# 背景黑色,人像
frame[:, :, 0] = frame[:, :, 0] * binary_image
frame[:, :, 1] = frame[:, :, 1] * binary_image
frame[:, :, 2] = frame[:, :, 2] * binary_image
# cv2.imwrite('../data/track/person.png', frame)
# 转成 人-0 背景-255
_, white_bg = cv2.threshold(mask, 128, 255, cv2.THRESH_BINARY_INV)
frame[:, :, 0] = frame[:, :, 0] + white_bg
frame[:, :, 1] = frame[:, :, 1] + white_bg
frame[:, :, 2] = frame[:, :, 2] + white_bg
# cv2.imwrite('../data/track/new.png', frame)
# xyxy = results[0].boxes.xyxy[0].cpu().numpy().astype(int)
target = frame[p1[1] : p2[1], p1[0] : p2[0]]
# cv2.imwrite('../data/track/12.png', person)
return target
def get_target_mask(frame, polygon):
# 转灰度
frame_copy = frame.copy()
gray_img = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2GRAY)
mask = np.zeros_like(gray_img)
# 轮廓内白-255 外黑-0
cv2.polylines(mask, [polygon], True, (255, 0, 0), thickness=1)
cv2.fillPoly(mask, pts=[polygon], color=(255, 255, 255, 128), lineType=cv2.LINE_AA)
# cv2.imwrite('../data/track/polygon.png', mask)
# mask 人-255 背景-0
# 转成 人-1 背景-0
_, binary_image = cv2.threshold(mask, 128, 1, cv2.THRESH_BINARY)
# 背景黑色,人像
# frame_copy[:, :, 0] = frame_copy[:, :, 0] * binary_image
# frame_copy[:, :, 1] = frame_copy[:, :, 1] * binary_image
# frame_copy[:, :, 2] = frame_copy[:, :, 2] * binary_image
# cv2.imwrite('../data/track/person.png', frame_copy)
return binary_image
def get_coordinate(xywh):
# xy是框的中心点坐标 wh是框的宽和高
x, y, w, h = xywh
xmin, ymin = int(x - w / 2), int(y - h / 2)
xmax, ymax = int(x + w / 2), int(y + h / 2)
p1 = (xmin, ymin)
p2 = (xmax, ymax)
return p1, p2
def show_mid_loc_statistics(label_tids, left_tids, mid_loc_count):
"""
统计每个类别下有哪些目标id以及各个目标id位于图片中间位置的次数
:param label_tids: key类别idvalue该类别下目标id列表
:param left_tids: 筛选后留下的目标id列表
:param mid_loc_count: key目标idvalueint该目标位于图像中间位置的次数
:return:
"""
# 获取每个类别剩余候选对象数目,同时每个目标位于中间位置的帧数
left_label_tids = dict()
for key, value in LABEL_NAMES.items():
left_label_tids[key] = []
for tid in left_tids:
for cid, tid_list in label_tids.items():
if tid in tid_list:
left_label_tids[cid].append(tid)
for key, tid_list in left_label_tids.items():
# 位居中间位置的数目
mid_statistics = [
(tid, count) for tid, count in mid_loc_count.items() if tid in tid_list
]
mid_statistics.sort(key=lambda x: x[1], reverse=True)
print(f"{LABEL_NAMES[key]}: \n\t{tid_list}\n\t{mid_statistics}")
return left_label_tids
# def docarray_sim(left_label_tids, matcher):
# """
# 对 left_label_tids 中每个类别下的目标列表,计算同类别的不同目标文件夹之间的相似,
# :param left_label_tids:
# :return:
# """
# pairs = dict()
# for cid, tid_list in left_label_tids.items():
# checked = []
# for i in range(len(tid_list)):
# if tid_list[i] == 265:
# print()
# pairs[tid_list[i]] = []
# if i in checked:
# continue
# dir_i = str(tid_list[i]) + '_' + LABEL_NAMES[cid]
# for j in range(i + 1, len(tid_list)):
# if tid_list[j] == 3483:
# print('')
# if j in checked:
# continue
# dir_j = str(tid_list[j]) + '_' + LABEL_NAMES[cid]
# # 目标i文件夹 相对 目标j文件夹i文件夹中的图片有40%比例在文件夹j中都能找到相似的即认为两个文件夹相似
# ratio = matcher.get_sim_ratio(left_path=SAVE_DIR / dir_i, right_path=SAVE_DIR / dir_j)
# if ratio > 0.4:
# checked.append(j)
# pairs[tid_list[i]].append(tid_list[j])
# return pairs
# data_dir = '../data/track/target_images1'
# matcher = ImageMatcher()
# x = {0: [265, 2955, 1676, 2197, 3483, 160, 551, 1589, 2870, 2363, 1478, 2759, 2504, 586, 3532, 3157, 1879, 2015, 3296, 2149, 1766, 745, 1773, 878, 1902, 3056, 1014, 252, 2814]}
#
# t1 = time.time()
# print(docarray_sim(x, matcher))
# t2 = time.time()
# print(round(t2 - t1, 2))
def cosine_similarity(a, b):
"""
计算余弦相似度
:param a:
:param b:
:return:
"""
a = np.array(a)
b = np.array(b)
# L2范数
mul_a = np.linalg.norm(a, ord=2)
mul_b = np.linalg.norm(b, ord=2)
mul_ab = np.dot(a, b)
return mul_ab / (mul_a * mul_b)
def filter_and_save_face_old(
face, track_id, last_face, last_track_id, face_det, save_dir, annotated_frame=None
):
"""
当前人脸目标id和上一个不同则直接保存当前人脸
和上一个相同则计算两次人脸相似相似的则不保存当前
:param face:
:param track_id:
:param last_face:
:param last_track_id:
:param face_det: 人脸比对模型
:param save_dir:
:param annotated_frame: 目标检测可视化结果可保存用于调试
:return:
"""
# 目标id不同直接保存
if track_id != last_track_id:
# save_img(track_id, PERSON, face, save_dir, origin=annotated_frame)
save_img(track_id, PERSON, face, save_dir)
return face, track_id
# 副本进去embedding
cos = face_det.face_compare(face, last_face)
if cos >= FACE_SAVE_THRESHOLD:
return last_face, last_track_id
else:
# debug
# save_img(track_id, PERSON, face, save_dir, origin=annotated_frame)
save_img(track_id, PERSON, face, save_dir)
return face, track_id
def filter_and_save_face(face, track_id, face_det, save_dir, annotated_frame=None):
"""
当前人脸目标id和上一个不同则直接保存当前人脸
和上一个相同则计算两次人脸相似相似的则不保存当前
:param face:
:param track_id:
:param face_det: 人脸比对模型
:param save_dir:
:param annotated_frame: 目标检测可视化结果可保存用于调试
:return:
"""
dir_name = str(track_id) + "_" + PERSON
dirx = save_dir / dir_name
if not dirx.exists():
dirx.mkdir()
save_img(track_id, PERSON, face, save_dir)
return
# 在所属文件夹内,找最后一次保存的文件,和当前文件比较
file_stems = [(int(filex.stem.split("_")[1]), filex) for filex in dirx.iterdir()]
file_stems.sort(key=lambda x: x[0])
last_face = cv2.imread(file_stems[-1][-1].as_posix())
# 副本进去embedding
cos = face_det.face_compare(face, last_face)
if cos >= FACE_SAVE_THRESHOLD:
return
else:
# debug
# save_img(track_id, PERSON, face, save_dir, origin=annotated_frame)
save_img(track_id, PERSON, face, save_dir)
return
def get_color(idx):
idx = idx * 3
color = ((37 * idx) % 255, (17 * idx) % 255, (29 * idx) % 255)
return color
def draw_rectangle_text(annotated_frame, track_id, p1, p2, label, conf, num, s):
img_h, img_w, _ = annotated_frame.shape
annotated_frame = cv2.rectangle(annotated_frame, p1, p2, get_color(track_id), 5)
text = f"id:{track_id} {label} {conf} n:{num} s:{s}"
# 文字左下角坐标
org = p1
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
thickness = 3
# (width, height)
text_size, _ = cv2.getTextSize(text, font, font_scale, thickness)
# 防止文本超出图片范围
if text_size[0] + org[0] > img_w:
org = (img_w - text_size[0], org[1])
if text_size[1] > org[1]:
org = (org[0], text_size[1])
annotated_frame = cv2.putText(
annotated_frame,
text,
org,
font,
font_scale,
color=get_color(track_id),
thickness=thickness,
lineType=cv2.LINE_AA,
)
return annotated_frame
def filter_sim_tid(face_det, tid_dirs, save_dir):
"""
对多个不同track_id文件夹计算两两相似合并同目标减少目标候选
:param face_det:
:param tid_dirs:
:return:
"""
checked = []
dir_lens = len(tid_dirs)
# 相似目标聚类
target_clusters = dict()
for i in range(dir_lens):
if i in checked:
continue
max_info = [0, 0]
target_clusters[tid_dirs[i]] = {"clusters": [], "max_info": tuple()}
dir_i = save_dir / (str(tid_dirs[i]) + "_" + PERSON)
for j in range(i + 1, dir_lens):
if j in checked:
continue
dir_j = save_dir / (str(tid_dirs[j]) + "_" + PERSON)
ratio_i, ratio_j, max_value_index = compare_dir_i_j(dir_i, dir_j, face_det)
# print(f'{dir_i.name} {dir_j.name} {ratio_i} {ratio_j}')
if ratio_i > DIR_RATIO_THRESHOLD and ratio_j > DIR_RATIO_THRESHOLD:
checked.append(j)
# 记录相似目标
target_clusters[tid_dirs[i]]["clusters"].append(tid_dirs[j])
if max_value_index[0] > max_info[0]:
max_info = max_value_index
# 记录文件夹i中相似度最大的原文件索引和最大相似度值
target_clusters[tid_dirs[i]]["max_info"] = max_info
return target_clusters
def compare_dir_i_j(dir_i, dir_j, face_det):
"""
比较两个文件夹中的图片i文件夹中每个图片和j文件夹中每个图片计算相似找到满足相似条件的则计数+1然后继续比较i文件夹下一个图片
:param dir_i:
:param dir_j:
:param face_det:
:return:
ratio_i: 文件夹i中在文件夹j中有满足相似条件的文件数 文件夹i中文件总数的占比
ratio_j: 文件夹j中在文件夹i中有满足相似条件的文件数 文件夹j中文件总数的占比
max_value: 两个文件夹的embedding矩阵计算cos相似最大的相似度值
max_2d_index最大值在相似矩阵中的二维坐标只取行坐标即文件夹i中的文件索引
"""
if not dir_i.exists() or not dir_j.exists():
return 0, 0, tuple()
# 加上i文件夹m个文件j文件夹n个文件
matrix_i, stems_i = get_dir_embed_matrix(dir_i, face_det)
matrix_j, stems_j = get_dir_embed_matrix(dir_j, face_det)
# m*n
result = matrix_i.dot(matrix_j.T)
sim_result = result >= FACE_COMPARE_THRESHOLD
sim_result = sim_result.astype(np.float32)
# i中每个文件在j中有多少个满足相似条件的 (m, 1)
sim_num = np.sum(sim_result, axis=-1, keepdims=True)
# 只要有一个满足即可计数+1 todo 如何文件夹a中所有文件都和b中某一个文件相似怎么办
count = (sim_num > 0).astype(np.float32)
count = np.sum(count, axis=0)
ratio_i = round(count[0] / sim_num.shape[0], 2)
# (1, n) 文件夹j中有多少个匹配文件夹i的防止文件夹j中有多个文件只有一个文件匹配文件夹i中的所有文件的情况
sim_num = np.sum(sim_result, axis=0, keepdims=True)
count = (sim_num > 0).astype(np.float32)
count = np.sum(count, axis=-1)
ratio_j = round(count[0] / sim_num.shape[1], 2)
# 获取最大值的索引
max_2d_index = np.unravel_index(np.argmax(result), result.shape)
max_value = round(result[max_2d_index], 2)
return ratio_i, ratio_j, (max_value, max_2d_index[0])
def get_dir_embed_matrix(dirx, face_det):
"""
将文件夹下面所有文件embedding放一个矩阵中
:param dirx:
:param face_det:
:return:
"""
# 每个文件embedding结果列表
results = []
# 对应embedding顺序的每个文件名的索引部分比如 140_3.png 取3
stems = []
for pathx in dirx.iterdir():
img = cv2.imread(pathx.as_posix())
embed = face_det.get_img_embedding(img)
results.append(embed)
stems.append(pathx.stem.split("_")[1])
result_matrix = np.stack(results)
# todo 归一化 否则矩阵乘积结果可能大于1
result_matrix = result_matrix / np.linalg.norm(
result_matrix, axis=-1, keepdims=True
)
return result_matrix, stems
def extract_yolo_results(results):
# Get the boxes and track IDs
boxes = results[0].boxes
masks = results[0].masks
# is_track 为False时目标id是None即没有追踪
# if not masks or not boxes.is_track:
if not masks:
return False, None
# 轮廓坐标
xy_list = masks.xy
# xy是框的中心点坐标 wh是框的宽和高
xywhs = boxes.xywh.cpu().tolist()
# 类标list
cls = boxes.cls.cpu().tolist()
# 置信度list
scores = boxes.conf.cpu().tolist()
return True, [xywhs, cls, scores, xy_list]
def predict_ocr(frame):
# OCR检测
license_plate_list = []
ocr_result = ocr_alg.ocr_predict(frame)
# 赋予空值
for txt in ocr_result[0].text:
# 车牌
plate_num = parse_plate_number(txt)
if plate_num != None:
license_plate_list.append(plate_num)
return license_plate_list
# def predict_ocr(frame, ocr):
# # OCR检测
# license_plate_list = []
# ocr_result = ocr.ocr(frame, cls=False)
# print(ocr_result)
# # 赋予空值
# if ocr_result[0]:
# for line in ocr_result[0]:
# txt = line[1][0]
# # 车牌
# plate_num = parse_plate_number(txt)
# if plate_num is not None:
# license_plate_list.append(plate_num)
# return license_plate_list
def get_police(frame_copy, target_infos, police_model):
"""
识别交警返回是交警的目标id
:param frame_copy:
:param target_infos:
:param police_model:
:return:
"""
police_model_input = dict()
police_result = []
for index, info in enumerate(target_infos):
# track_id, p1, p2, label, conf, num, s, polygon = info
p1 = info['p_left_up']
p2 = info['p_right_bottom']
police_model_input[index] = [p1[0], p1[1], p2[0], p2[1]]
if police_model_input:
police_result = predict_police(frame_copy, police_model_input, police_model)
return police_result
def output_person_targets(left_person, person_clusters, save_dir, output_person_dir):
index = 1
paths = []
for tid in left_person:
name_index = person_clusters[tid]["max_info"][-1]
source_path = (
save_dir
/ (str(tid) + "_person")
/ (str(tid) + "_" + str(name_index) + ".png")
)
target_path = output_person_dir / (str(index) + ".png")
# target_path = Path(PERSON_FACE_SAVE_PATH) / (str(index) + '.png')
shutil.copyfile(source_path, target_path)
index += 1
paths.append(target_path)
return paths
def output_car_targets(left_cars, save_dir, output_car_dir):
index = 1
paths = []
for licensex in left_cars:
source_path = save_dir / (licensex + "_car") / "0.png"
target_path = output_car_dir / (str(index) + ".png")
# target_path = Path(CAR_FACE_SAVE_PATH / (str(index) + '.png'))
shutil.copyfile(source_path, target_path)
index += 1
paths.append(target_path)
return paths
# 使用正则解析车牌号
def parse_plate_number(txt):
pattern = "^[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z]{1}[A-Z]{1}[·]{1}[A-Z0-9]{4}[A-Z0-9挂学警港澳]{1}$"
pattern1 = "^[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z]{1}[A-Z]{1}[·]{1}[A-Z0-9]{5}[A-Z0-9挂学警港澳]{1}$"
match = re.match(pattern, txt)
match1 = re.match(pattern1, txt)
if match:
return match.group()
if match1:
return match1.group()
def det_articulation(image=None, img_path=None):
if image is None and not img_path:
return 0
if image is None and img_path:
image = cv2.imread(img_path)
# 静态计算方法
result_static = cv2.quality.QualityBRISQUE_compute(
image, ARTICULATION_MODEL_PATH, ARTICULATION_RANGE_PATH
)
score = np.mean([i for i in result_static if (i != 0 and not np.isinf(i))])
score = 0 if np.isnan(score) else score
f_score = round(100 - score, 2)
return f_score
def calculate_iou(box1, box2):
# box1 和 box2 分别是 [x, y, w, h] 格式的边界框
x1_min, y1_min, x1_max, y1_max = box1
x2_min, y2_min, x2_max, y2_max = box2
w1 = x1_max - x1_min
h1 = y1_max - y1_min
w2 = x2_max - x2_min
h2 = y2_max - y2_min
# # 计算边界框的四个坐标
# x1_min, y1_min, x1_max, y1_max = x1, y1, x1 + w1, y1 + h1
# x2_min, y2_min, x2_max, y2_max = x2, y2, x2 + w2, y2 + h2
# 计算交集的坐标
intersection_x_min = max(x1_min, x2_min)
intersection_y_min = max(y1_min, y2_min)
intersection_x_max = min(x1_max, x2_max)
intersection_y_max = min(y1_max, y2_max)
# 计算交集的面积
intersection_area = max(0, intersection_x_max - intersection_x_min) * max(
0, intersection_y_max - intersection_y_min
)
# 计算两个边界框的面积
box1_area = w1 * h1
box2_area = w2 * h2
# 计算并比
iou = intersection_area / (box1_area + box2_area - intersection_area)
return iou
# 使用yolo预测交警模型
# def predict_police(frame, target_boxes, police_model):
# """
# :param frame:
# :param target_boxes: {tid1: box1, tid2: box2}
# :param police_model:
# :return: 返回是交警的目标id列表
# """
# # copy_frame = frame.copy()
# police_reslist = []
# # 模型加载路径
# result = police_model(frame)
# for r in result:
# boxes = r.boxes
# for box in boxes:
# b = box.xyxy[0]
# c = int(box.cls)
# confidence = round(float(box.conf), 2)
# if confidence < 0.5:
# continue
# # p1, p2 = get_coordinate(b)
# police_bbox = [b[0], b[1], b[2], b[3]]
# # 遍历人的边界框
# for key, person_bbox in target_boxes.items():
# iou = calculate_iou(police_bbox, person_bbox)
# if iou >= POLICE_IOU: # 根据需要调整IoU阈值
# police_reslist.append(key)
# return police_reslist
# 使用fastdeploy预测交警模型
def predict_police(frame, target_boxes, police_model):
"""
:param frame:
:param target_boxes: {tid1: box1, tid2: box2}
:param police_model:
:return: 返回是交警的目标id列表
"""
# copy_frame = frame.copy()
police_reslist = []
# 模型加载路径
result = police_model.predict(frame)
# 预测结果
box_list = result.boxes
score_list = result.scores
label_list = result.label_ids
for i, confidence in enumerate(score_list):
if confidence <= 0.5: # 这个阈值告诉孔凡平让其添加到配置文件
continue
police_bbox = box_list[i]
for key, person_bbox in target_boxes.items():
iou = calculate_iou(police_bbox, person_bbox)
if iou >= POLICE_IOU: # 根据需要调整IoU阈值
police_reslist.append(key)
return police_reslist
Loading…
Cancel
Save