main
zhoujinjuan 1 year ago
parent 6c1f31e3ca
commit 2f43ce101e

@ -5,4 +5,25 @@
- 如果目标类别是car则用ocr提取车牌号
- 如果目标类别是person先用交警模型判断是否是交警如果不是交警则对人脸图像清晰度大于等于一定阈值的记录该人脸的embedding
返回每一帧处理得到的所有车牌号列表去重和所有人脸embedding列表
返回每一帧处理得到的所有车牌号列表去重和所有人脸embedding列表
## 性能测试
| 视频名 | 显存 | gpu使用率 | 处理帧数 | 耗时 | half | 跳帧 | 检测结果(人脸/车牌数) |
| :----: | :------: | :------: | :------: | :-----: | :---: | :--: | :---------------------: |
| 2.MP4 | 2.0G左右 | 50%+ | 26975 | 166.57s | False | - | 145/0 |
| 2.MP4 | 1.9G左右 | 50%+ | 26975 | 162.65s | True | - | 145/0 |
| 2.MP4 | 2.0G左右 | 40%+ | 2997 | 28.79s | False | 9 | 17/0 |
| 2.MP4 | 1.9G左右 | 40%+ | 2997 | 28.83s | True | 9 | 17/0 |
| 1.MP4 | 2.0G左右 | 50%+ | 26991 | 178.69s | False | - | 578/0 |
| 1.MP4 | 2.0G左右 | 50%+ | 26991 | 176.29s | True | - | 578/0 |
| 1.MP4 | 2.0G左右 | 40%+ | 2999 | 30.12s | False | 9 | 60/0 |
| 1.MP4 | 2.0G左右 | 40%+ | 2999 | 30.04s | True | 9 | 60/0 |
### float16推理
- 如果不跳帧,半精度推理后速度略有提升;
- 如果本身处理帧数少,速度没啥影响;
- 对检测结果也几乎没有影响;
### 跳帧
- 相比不跳帧,耗时明显减少,筛选得到的结果也明显减少;
- gpu使用率降低

@ -1,59 +0,0 @@
import fastdeploy as fd
import cv2
import os
def parse_arguments():
import argparse
import ast
parser = argparse.ArgumentParser()
parser.add_argument("--model", default=None, help="Path of yolov8 model.")
parser.add_argument(
"--image", default=None, help="Path of test image file.")
parser.add_argument(
"--device",
type=str,
default='gpu',
help="Type of inference device, support 'cpu' or 'gpu' or 'kunlunxin'.")
parser.add_argument(
"--use_trt",
type=ast.literal_eval,
default=True,
help="Wether to use tensorrt.")
return parser.parse_args()
def build_option(args):
option = fd.RuntimeOption()
if args.device.lower() == "gpu":
option.use_gpu()
if args.device.lower() == "ascend":
option.use_ascend()
if args.use_trt:
option.use_trt_backend()
option.set_trt_input_shape("images", [1, 3, 640, 640])
return option
args = parse_arguments()
# Configure runtime, load model
runtime_option = build_option(args)
model = fd.vision.detection.YOLOv8(args.model, runtime_option=runtime_option)
def yolo_predict(im):
# # Predicting image
# if args.image is None:
# image = fd.utils.get_detection_test_image()
# else:
# image = args.image
# im = cv2.imread(image)
result = model.predict(im)
# Visualization
vis_im = fd.vision.vis_detection(im, result)
# cv2.imwrite("visualized_result.jpg", vis_im)
# print("Visualized result save in ./visualized_result.jpg")
return result,vis_im

@ -1,41 +0,0 @@
import fastdeploy as fd
from pathlib import Path
import cv2
import os
class YOLOAlg:
def __init__(self, model_path) -> None:
super(YOLOAlg, self).__init__()
self.model_path = model_path
self.model = self.init_model()
def build_option(self):
option = fd.RuntimeOption()
option.use_gpu()
option.use_trt_backend()
option.set_trt_input_shape("images", [1, 3, 640, 640])
trt_path = Path(self.model_path).with_suffix(".trt")
option.set_trt_cache_file(trt_path.as_posix())
return option
def init_model(self):
# Configure runtime, load model
runtime_option = self.build_option()
model = fd.vision.detection.YOLOv8(self.model_path, runtime_option=runtime_option)
return model
def predict_yolo(self, bgr_img):
result = self.model.predict(bgr_img)
rendered_img = bgr_img.copy()
# Visualization
vis_im = fd.vision.vis_detection(rendered_img, result)
# cv2.imwrite("visualized_result.jpg", vis_im)
# print("Visualized result save in ./visualized_result.jpg")
return result, vis_im

@ -25,6 +25,7 @@ class FaceRecognition:
det_model=SCRFD_PATH_DICT["onnx"],
device="gpu",
use_trt=False,
half=False,
):
"""
初始化相机人脸检测器
@ -32,11 +33,13 @@ class FaceRecognition:
:param det_model: 人脸检测模型
:param device: 使用设备"cpu"/"gpu"
:param use_trt: 是否使用tenserRT加速
:param half: 是否半精度推理
"""
self.id_model = id_model
self.det_model = det_model
self.device = device
self.use_trt = use_trt
self.half = half
# 用统一的方法加载模型
self.det_model = self.load_model(fd.vision.facedet.SCRFD, det_model, "SCRFD")
@ -98,6 +101,9 @@ class FaceRecognition:
option.use_gpu()
if self.use_trt:
option.use_trt_backend()
if self.half:
option.trt_option.enable_fp16 = True
if model_type == "PartialFC":
option.set_trt_input_shape("data", [1, 3, 112, 112]) # fd默认参数
option.set_trt_cache_file(INSIGHTFACE_PATH_DICT["trt"])
@ -194,20 +200,16 @@ class FaceRecognition:
if __name__ == "__main__":
# faceid = FaceRecognition(device="cpu", use_trt=False)
faceid = FaceRecognition(
id_model=r"E:\resources\DT3 models\partial_fc_glint360k_r100.onnx",
det_model=r"E:\resources\DT3 models\scrfd_500m_bnkps_shape640x640.onnx",
id_model=r"/home/user/zhoujinjuan/models/partial_fc_glint360k_r100.onnx",
det_model=r"/home/user/zhoujinjuan/models/scrfd_500m_bnkps_shape640x640.onnx",
device="cpu",
use_trt=False,
)
# img = cv2.imread('../data/track/13.png')
# embed = faceid.get_img_embedding(img)
# has_face, face = faceid.get_crop_face(img)
# if has_face:
# cv2.imwrite('../data/track/4_face.png', face)
# else:
# print('no face')
# img1 = cv2.imread(r'E:\\tmp\198_person\198_0.png')
# img2 = cv2.imread(r'E:\\tmp\198_person\198_1.png')
# cos = faceid.face_compare(img2, img1)
# print(cos)
img = cv2.imread('data/11.png')
embed = faceid.get_img_embedding(img)
has_face, face = faceid.get_crop_face(img)
if has_face:
cv2.imwrite('data/face.png', face)
else:
print('no face')

@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project Filter_Object
@File fd_yolo.py
@IDE PyCharm
@Author zjj
@Date 2024/1/19 10:45
'''
import fastdeploy as fd
from loguru import logger
from config import POLICE_PATH
class FdYolov8(object):
def __init__(self, model_path=POLICE_PATH, device="gpu", use_trt=False, half=False):
self.model_path = model_path
self.device = device
self.use_trt = use_trt
self.half = half
self.model = None
# 加载模型
self.init_model()
def build_option(self):
"""
构建运行时选项
:return: option
"""
option = fd.RuntimeOption()
if self.device.lower() == "gpu":
option.use_gpu()
if self.use_trt:
option.use_trt_backend()
if self.half:
option.trt_option.enable_fp16 = True
return option
def init_model(self):
runtime_option = self.build_option()
self.model = fd.vision.detection.YOLOv8(
self.model_path, runtime_option=runtime_option
)
logger.info('fastdeploy yolo模型加载成功')
def predict(self, bgr_img):
result = self.model.predict(bgr_img)
return result

@ -7,10 +7,6 @@
@Author zjj
@Date 2023/9/26 9:29
"""
import uuid
import cv2
import fastdeploy as fd
import numpy as np
from loguru import logger
from ultralytics import YOLO
@ -23,9 +19,10 @@ from config import (
LABEL_NAMES,
BOX_AREA_THRESHOLD,
DETECT_CONF_THRESHOLD,
POLICE_PATH, ARTICULATION_THD,
ARTICULATION_THD,
)
from fd_face_detection import FaceRecognition
from fd_yolo import FdYolov8
from utils import (
extract_yolo_results,
get_police,
@ -38,47 +35,47 @@ from utils import (
class TrackMain(object):
def __init__(self, detect_model_path=SEGMENT_MODEL, device="gpu", use_trt=False):
# fastdeploy调用GPU时会使用
self.option = fd.RuntimeOption().use_gpu()
def __init__(self, detect_model_path=SEGMENT_MODEL, device="gpu", use_trt=False, half=False):
# 目标检测模型
self.model = YOLO(detect_model_path)
self.yolo_seg_model = YOLO(detect_model_path)
# 人脸检测和识别对象
self.face_det = FaceRecognition(device=device, use_trt=use_trt)
self.face_det = FaceRecognition(device=device, use_trt=use_trt, half=half)
# 交警模型
# self.model_traffic_police = YOLO(POLICE_PATH)
self.model_traffic_police = fd.vision.detection.YOLOv8(
POLICE_PATH, runtime_option=self.option # 使用fastdeploy加载交警模型
)
self.model_traffic_police = FdYolov8(device=device, use_trt=use_trt, half=half)
def process_one_frame(self, frame):
results = self.model(frame)
"""
处理一帧数据
:param frame:
:return:
"""
results = self.yolo_seg_model(frame)
# 提取模型检测结果 infos=[xywhs, cls, scores, xy_list]
is_hit, infos = extract_yolo_results(results)
if not is_hit:
return [], []
# before:没有筛选之前统计目标频率保存截图时筛选top2大框
# now:不统计频率大框筛选之后是否有必要取topN再过滤---先设置过滤不想过滤则可以调整N值很大
# I面积和置信度过滤统计剩余目标的相关信息左上角和右下角坐标,label,conf,box_area,轮廓坐标
# 不统计频率大框筛选之后设置TOPN过滤不想过滤则可以调整N值很大
# 面积和置信度过滤,统计剩余目标的相关信息:左上角和右下角坐标,label,conf,box_area,轮廓坐标
tinfos_each_frame = self.statistics_one_frame(infos)
# 每帧每个类别显示topn大的目标按文件夹保存person类别保留人脸相似的保留一个car类别保存目标检测框图有一个算一个
# annotated_frame = results[0].plot()
face_embeddings, licenses = self.process_topn_in_one_frame(frame, tinfos_each_frame)
face_embeddings, licenses, _ = self.process_topn_in_one_frame(frame, tinfos_each_frame)
return face_embeddings, licenses
def statistics_one_frame(self, infos):
@classmethod
def statistics_one_frame(cls, infos):
"""
每一帧内统计每个目标的信息
:param infos:
:return:
"""
xywhs, cls, scores, xy_list = infos
xywhs, labels, scores, xy_list = infos
tinfos_each_frame = {PERSON: [], CAR: []}
for xywh, label_id, score, polygon in zip(
xywhs, cls, scores, xy_list
xywhs, labels, scores, xy_list
):
if label_id not in list(LABEL_NAMES.keys()):
continue
@ -113,7 +110,9 @@ class TrackMain(object):
def process_topn_in_one_frame(self, frame, tinfos_each_frame):
"""
处理每一帧中的topn大框对象
处理每一帧中的topn大框对象:
person类先用交警模型过滤掉交警获取目标的人脸根据清晰度过滤对保留下来的人脸提取embedding
car类ocr识别车牌号每一帧中车牌号去重
:param frame:
:param tinfos_each_frame: 每个类别下各个目标的相关信息
:return:
@ -121,7 +120,9 @@ class TrackMain(object):
frame_copy = frame.copy()
police_indexs = []
# 存储一帧中所有车牌号
all_licenses = []
# 存储一帧中所有人脸embedding
all_face_embeddings = []
for label, target_infos in tinfos_each_frame.items():
# 按照框面积大小降序排序
@ -145,7 +146,6 @@ class TrackMain(object):
target_img = target_img.astype(np.uint8)
if label == CAR:
# todo 调用ocr处理车牌号统计车牌出现次数保存有车牌的图片
# licenses = predict_ocr(target_img, self.ocr)
licenses = predict_ocr(target_img)
licenses = list(set(licenses))
@ -157,10 +157,9 @@ class TrackMain(object):
# 是交警,则不处理
if index in police_indexs:
continue
# 提取目标的语义分割图像,配白底,保存; 保存所有,不考虑相似的只保存一个,防止有车牌的被过滤掉
# 二值图像,人-1 背景-0
target_mask = get_target_mask(frame, polygon)
# person类型检测并提取人脸相似的只保存一个即保存的时候就过滤一些
# todo 对整个图检测人脸可能得到多个人脸但只对target_img的白底目标图检测人脸很可能无法检测到人脸因此结合目标检测的框和人脸检测算法框定目标的人脸
# todo 对整个图检测人脸可能得到多个人脸但只对target_img的白底目标图检测人脸很可能无法检测到人脸因此结合实例mask定位目标人脸
has_face, face = self.face_det.get_target_face(frame, target_mask)
if has_face:
# 用人脸清晰度过滤
@ -172,40 +171,10 @@ class TrackMain(object):
face_embedding = self.face_det.get_img_embedding(face)
all_face_embeddings.append(face_embedding)
is_hit = True
# todo test
# test
if is_hit:
frame_copy = draw_rectangle_text(
frame_copy, index, p1, p2, label, info['conf'], -1, info['box_area']
)
# todo test 测试过程中保存图片可视化
# if all_face_embeddings and not all_licenses:
# name = str(uuid.uuid4())[:7]
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/face_' + name + '.png', frame_copy)
# elif not all_face_embeddings and all_licenses:
# name = str(uuid.uuid4())[:7]
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/car_' + name + '.png', frame_copy)
# elif all_face_embeddings and all_licenses:
# name = str(uuid.uuid4())[:7]
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/all_' + name + '.png', frame_copy)
return all_face_embeddings, all_licenses
if __name__ == '__main__':
import os
obj = TrackMain()
video_path = os.path.join('/mnt/large/zhoujinjuan_data/data/4.mp4')
cap = cv2.VideoCapture(video_path)
all_face_embeds = []
all_licenses_list = []
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
face_embeds, license_list = obj.process_one_frame(frame)
all_face_embeds.extend(face_embeds)
all_licenses_list.extend(license_list)
print(f'frame count {frame_count}\n face embeddings {len(all_face_embeds)}\nlicenses {len(all_licenses_list)}')
return all_face_embeddings, all_licenses, frame_copy

@ -0,0 +1,99 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project Filter_Object
@File main_v2_test.py
@IDE PyCharm
@Author zjj
@Date 2024/1/19 9:51
'''
import os
import time
import unittest
import uuid
from pathlib import Path
import cv2
from main_v2 import TrackMain
from utils import extract_yolo_results
def make_result_dir():
# 保存图片,可视化
dir_name = str(uuid.uuid4())[:7]
dirx = Path(r'/mnt/large/zhoujinjuan_data/data/result') / dir_name
if not dirx.exists():
dirx.mkdir()
print(f'tmp dir {dirx}')
return dirx
class TestMain(unittest.TestCase):
def setUp(self):
self.use_trt = True
self.half = True
self.obj = TrackMain(device='gpu', use_trt=self.use_trt, half=self.half)
video_path = os.path.join('/mnt/large/zhoujinjuan_data/data/tp_videos/2.mp4')
self.cap = cv2.VideoCapture(video_path)
# 记录视频所有满足条件的人脸
self.all_face_embeds = []
# 记录视频所有满足条件的车牌号
self.all_licenses_list = []
self.frame_count = 0
# 每x帧处理一帧
self.skip_frame = 9
print('init success')
def process_one_frame_v2(self, frame):
results = self.obj.yolo_seg_model(frame)
# 提取模型检测结果 infos=[xywhs, cls, scores, xy_list]
is_hit, infos = extract_yolo_results(results)
if not is_hit:
return [], [], frame
tinfos_each_frame = self.obj.statistics_one_frame(infos)
# 保持原接口不动比process_one_frame 多返回一个frame_copy
face_embeddings, licenses, frame_copy = self.obj.process_topn_in_one_frame(frame, tinfos_each_frame)
return face_embeddings, licenses, frame_copy
def test_main(self):
print('begin test')
# 先创建结果目录
result_dir = make_result_dir()
start = time.time()
while self.cap.isOpened():
ret, frame = self.cap.read()
if not ret:
break
self.frame_count += 1
# todo 考虑跳帧
if self.frame_count % self.skip_frame != 0:
continue
# 每一帧的处理结果
face_embeds, license_list, frame_copy = self.process_one_frame_v2(frame)
# todo test 测试过程中保存图片可视化
if face_embeds and not license_list:
name = str(uuid.uuid4())[:7]
cv2.imwrite((result_dir / ('face_' + name + '.png')).as_posix(), frame_copy)
elif not face_embeds and license_list:
name = str(uuid.uuid4())[:7]
cv2.imwrite((result_dir / ('car_' + name + '.png')).as_posix(), frame_copy)
elif face_embeds and license_list:
name = str(uuid.uuid4())[:7]
cv2.imwrite((result_dir / ('all_' + name + '.png')).as_posix(), frame_copy)
self.all_face_embeds.extend(face_embeds)
self.all_licenses_list.extend(license_list)
end = time.time()
t = round(end - start, 2)
actual_frame_number = round(self.frame_count / self.skip_frame)
print(f'frame count {self.frame_count}\nactual deal frame count {actual_frame_number}\ntime {t}s'
f'\nface embeddings count {len(self.all_face_embeds)}'
f'\nlicenses count {len(self.all_licenses_list)}')
print(f'use_trt={self.use_trt}\nhalf={self.half}\n')
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save