|
|
|
@ -7,10 +7,6 @@
|
|
|
|
|
@Author :zjj
|
|
|
|
|
@Date :2023/9/26 9:29
|
|
|
|
|
"""
|
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
|
|
import cv2
|
|
|
|
|
import fastdeploy as fd
|
|
|
|
|
import numpy as np
|
|
|
|
|
from loguru import logger
|
|
|
|
|
from ultralytics import YOLO
|
|
|
|
@ -23,9 +19,10 @@ from config import (
|
|
|
|
|
LABEL_NAMES,
|
|
|
|
|
BOX_AREA_THRESHOLD,
|
|
|
|
|
DETECT_CONF_THRESHOLD,
|
|
|
|
|
POLICE_PATH, ARTICULATION_THD,
|
|
|
|
|
ARTICULATION_THD, MOTOR_VEHICLE, MOTOR_VEHICLE_LIST,
|
|
|
|
|
)
|
|
|
|
|
from fd_face_detection import FaceRecognition
|
|
|
|
|
from fd_yolo import FdYolov8
|
|
|
|
|
from utils import (
|
|
|
|
|
extract_yolo_results,
|
|
|
|
|
get_police,
|
|
|
|
@ -38,47 +35,47 @@ from utils import (
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TrackMain(object):
|
|
|
|
|
def __init__(self, detect_model_path=SEGMENT_MODEL, device="gpu", use_trt=False):
|
|
|
|
|
# fastdeploy调用GPU时会使用
|
|
|
|
|
self.option = fd.RuntimeOption().use_gpu()
|
|
|
|
|
def __init__(self, detect_model_path=SEGMENT_MODEL, device="gpu", use_trt=False, half=False):
|
|
|
|
|
# 目标检测模型
|
|
|
|
|
self.model = YOLO(detect_model_path)
|
|
|
|
|
self.yolo_seg_model = YOLO(detect_model_path)
|
|
|
|
|
# 人脸检测和识别对象
|
|
|
|
|
self.face_det = FaceRecognition(device=device, use_trt=use_trt)
|
|
|
|
|
self.face_det = FaceRecognition(device=device, use_trt=use_trt, half=half)
|
|
|
|
|
# 交警模型
|
|
|
|
|
# self.model_traffic_police = YOLO(POLICE_PATH)
|
|
|
|
|
self.model_traffic_police = fd.vision.detection.YOLOv8(
|
|
|
|
|
POLICE_PATH, runtime_option=self.option # 使用fastdeploy加载交警模型
|
|
|
|
|
)
|
|
|
|
|
self.model_traffic_police = FdYolov8(device=device, use_trt=use_trt, half=half)
|
|
|
|
|
|
|
|
|
|
def process_one_frame(self, frame):
|
|
|
|
|
results = self.model(frame)
|
|
|
|
|
"""
|
|
|
|
|
处理一帧数据
|
|
|
|
|
:param frame:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
results = self.yolo_seg_model(frame)
|
|
|
|
|
|
|
|
|
|
# 提取模型检测结果 infos=[xywhs, cls, scores, xy_list]
|
|
|
|
|
is_hit, infos = extract_yolo_results(results)
|
|
|
|
|
if not is_hit:
|
|
|
|
|
return [], []
|
|
|
|
|
|
|
|
|
|
# before:没有筛选之前统计目标频率,保存截图时筛选top2大框
|
|
|
|
|
# now:不统计频率,大框筛选之后,是否有必要取topN再过滤??---先设置过滤,不想过滤则可以调整N值很大
|
|
|
|
|
# I:面积和置信度过滤,统计剩余目标的相关信息:左上角和右下角坐标,label,conf,box_area,轮廓坐标
|
|
|
|
|
# 不统计频率,大框筛选之后,设置TOPN过滤,不想过滤则可以调整N值很大
|
|
|
|
|
# 面积和置信度过滤,统计剩余目标的相关信息:左上角和右下角坐标,label,conf,box_area,轮廓坐标
|
|
|
|
|
tinfos_each_frame = self.statistics_one_frame(infos)
|
|
|
|
|
|
|
|
|
|
# 每帧每个类别显示topn大的目标,按文件夹保存,person类别保留人脸,相似的保留一个,car类别保存目标检测框图,有一个算一个;
|
|
|
|
|
# annotated_frame = results[0].plot()
|
|
|
|
|
face_embeddings, licenses = self.process_topn_in_one_frame(frame, tinfos_each_frame)
|
|
|
|
|
face_embeddings, licenses, _ = self.process_topn_in_one_frame(frame, tinfos_each_frame)
|
|
|
|
|
return face_embeddings, licenses
|
|
|
|
|
|
|
|
|
|
def statistics_one_frame(self, infos):
|
|
|
|
|
@classmethod
|
|
|
|
|
def statistics_one_frame(cls, infos):
|
|
|
|
|
"""
|
|
|
|
|
每一帧内统计每个目标的信息
|
|
|
|
|
:param infos:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
xywhs, cls, scores, xy_list = infos
|
|
|
|
|
tinfos_each_frame = {PERSON: [], CAR: []}
|
|
|
|
|
xywhs, labels, scores, xy_list = infos
|
|
|
|
|
tinfos_each_frame = {PERSON: [], MOTOR_VEHICLE: []}
|
|
|
|
|
|
|
|
|
|
for xywh, label_id, score, polygon in zip(
|
|
|
|
|
xywhs, cls, scores, xy_list
|
|
|
|
|
xywhs, labels, scores, xy_list
|
|
|
|
|
):
|
|
|
|
|
if label_id not in list(LABEL_NAMES.keys()):
|
|
|
|
|
continue
|
|
|
|
@ -96,6 +93,7 @@ class TrackMain(object):
|
|
|
|
|
conf = round(score, 2)
|
|
|
|
|
|
|
|
|
|
# 通过面积过滤掉一些小框
|
|
|
|
|
# todo 预筛选,阈值不能太高,后面还会有topN过滤
|
|
|
|
|
if s <= BOX_AREA_THRESHOLD:
|
|
|
|
|
continue
|
|
|
|
|
if conf <= DETECT_CONF_THRESHOLD:
|
|
|
|
@ -107,13 +105,19 @@ class TrackMain(object):
|
|
|
|
|
info['conf'] = conf
|
|
|
|
|
info['box_area'] = s
|
|
|
|
|
info['polygon_indexs'] = polygon.astype(int)
|
|
|
|
|
tinfos_each_frame[label].append(info)
|
|
|
|
|
# 机动车包含car,truck,motorcycle,bus
|
|
|
|
|
if label in MOTOR_VEHICLE_LIST:
|
|
|
|
|
tinfos_each_frame[MOTOR_VEHICLE].append(info)
|
|
|
|
|
else:
|
|
|
|
|
tinfos_each_frame[label].append(info)
|
|
|
|
|
|
|
|
|
|
return tinfos_each_frame
|
|
|
|
|
|
|
|
|
|
def process_topn_in_one_frame(self, frame, tinfos_each_frame):
|
|
|
|
|
"""
|
|
|
|
|
处理每一帧中的topn大框对象
|
|
|
|
|
处理每一帧中的topn大框对象:
|
|
|
|
|
person类:先用交警模型过滤掉交警,获取目标的人脸,根据清晰度过滤,对保留下来的人脸提取embedding;
|
|
|
|
|
car类:ocr识别车牌号,每一帧中车牌号去重;
|
|
|
|
|
:param frame:
|
|
|
|
|
:param tinfos_each_frame: 每个类别下各个目标的相关信息
|
|
|
|
|
:return:
|
|
|
|
@ -121,14 +125,16 @@ class TrackMain(object):
|
|
|
|
|
frame_copy = frame.copy()
|
|
|
|
|
|
|
|
|
|
police_indexs = []
|
|
|
|
|
# 存储一帧中所有车牌号
|
|
|
|
|
all_licenses = []
|
|
|
|
|
# 存储一帧中所有人脸embedding
|
|
|
|
|
all_face_embeddings = []
|
|
|
|
|
for label, target_infos in tinfos_each_frame.items():
|
|
|
|
|
for label_alias, target_infos in tinfos_each_frame.items():
|
|
|
|
|
# 按照框面积大小降序排序
|
|
|
|
|
target_infos.sort(key=lambda x: x['box_area'], reverse=True)
|
|
|
|
|
|
|
|
|
|
# todo 交警 先对整个图片上前topn的大框图中的person,送到交警检测模型中检测是否是交警
|
|
|
|
|
if label == PERSON:
|
|
|
|
|
if label_alias == PERSON:
|
|
|
|
|
police_indexs = get_police(
|
|
|
|
|
frame_copy, target_infos[:TOPN_AREA], self.model_traffic_police
|
|
|
|
|
)
|
|
|
|
@ -144,8 +150,7 @@ class TrackMain(object):
|
|
|
|
|
target_img = frame[p1[1]: p2[1], p1[0]: p2[0]]
|
|
|
|
|
target_img = target_img.astype(np.uint8)
|
|
|
|
|
|
|
|
|
|
if label == CAR:
|
|
|
|
|
# todo 调用ocr处理车牌号,统计车牌出现次数,保存有车牌的图片
|
|
|
|
|
if label_alias == MOTOR_VEHICLE:
|
|
|
|
|
# licenses = predict_ocr(target_img, self.ocr)
|
|
|
|
|
licenses = predict_ocr(target_img)
|
|
|
|
|
licenses = list(set(licenses))
|
|
|
|
@ -153,14 +158,13 @@ class TrackMain(object):
|
|
|
|
|
if licenses:
|
|
|
|
|
is_hit = True
|
|
|
|
|
|
|
|
|
|
elif label == PERSON:
|
|
|
|
|
elif label_alias == PERSON:
|
|
|
|
|
# 是交警,则不处理
|
|
|
|
|
if index in police_indexs:
|
|
|
|
|
continue
|
|
|
|
|
# 提取目标的语义分割图像,配白底,保存; 保存所有,不考虑相似的只保存一个,防止有车牌的被过滤掉
|
|
|
|
|
# 二值图像,人-1, 背景-0
|
|
|
|
|
target_mask = get_target_mask(frame, polygon)
|
|
|
|
|
# person类型,检测并提取人脸,相似的只保存一个,即保存的时候就过滤一些
|
|
|
|
|
# todo 对整个图检测人脸可能得到多个人脸,但只对target_img的白底目标图检测人脸,很可能无法检测到人脸,因此结合目标检测的框和人脸检测算法,框定目标的人脸
|
|
|
|
|
# todo 对整个图检测人脸可能得到多个人脸,但只对target_img的白底目标图检测人脸,很可能无法检测到人脸,因此结合实例mask定位目标人脸
|
|
|
|
|
has_face, face = self.face_det.get_target_face(frame, target_mask)
|
|
|
|
|
if has_face:
|
|
|
|
|
# 用人脸清晰度过滤
|
|
|
|
@ -172,40 +176,10 @@ class TrackMain(object):
|
|
|
|
|
face_embedding = self.face_det.get_img_embedding(face)
|
|
|
|
|
all_face_embeddings.append(face_embedding)
|
|
|
|
|
is_hit = True
|
|
|
|
|
# todo test
|
|
|
|
|
# test
|
|
|
|
|
if is_hit:
|
|
|
|
|
frame_copy = draw_rectangle_text(
|
|
|
|
|
frame_copy, index, p1, p2, label, info['conf'], -1, info['box_area']
|
|
|
|
|
frame_copy, index, p1, p2, label_alias, info['conf'], -1, info['box_area']
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# todo test 测试过程中保存图片可视化
|
|
|
|
|
# if all_face_embeddings and not all_licenses:
|
|
|
|
|
# name = str(uuid.uuid4())[:7]
|
|
|
|
|
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/face_' + name + '.png', frame_copy)
|
|
|
|
|
# elif not all_face_embeddings and all_licenses:
|
|
|
|
|
# name = str(uuid.uuid4())[:7]
|
|
|
|
|
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/car_' + name + '.png', frame_copy)
|
|
|
|
|
# elif all_face_embeddings and all_licenses:
|
|
|
|
|
# name = str(uuid.uuid4())[:7]
|
|
|
|
|
# cv2.imwrite('/mnt/large/zhoujinjuan_data/data/result-3/all_' + name + '.png', frame_copy)
|
|
|
|
|
return all_face_embeddings, all_licenses
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
import os
|
|
|
|
|
obj = TrackMain()
|
|
|
|
|
video_path = os.path.join('/mnt/large/zhoujinjuan_data/data/4.mp4')
|
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
|
all_face_embeds = []
|
|
|
|
|
all_licenses_list = []
|
|
|
|
|
frame_count = 0
|
|
|
|
|
while cap.isOpened():
|
|
|
|
|
ret, frame = cap.read()
|
|
|
|
|
if not ret:
|
|
|
|
|
break
|
|
|
|
|
frame_count += 1
|
|
|
|
|
face_embeds, license_list = obj.process_one_frame(frame)
|
|
|
|
|
all_face_embeds.extend(face_embeds)
|
|
|
|
|
all_licenses_list.extend(license_list)
|
|
|
|
|
print(f'frame count {frame_count}\n face embeddings {len(all_face_embeds)}\nlicenses {len(all_licenses_list)}')
|
|
|
|
|
return all_face_embeddings, all_licenses, frame_copy
|
|
|
|
|
|
|
|
|
|