diff --git a/Bank_second_part/detect_process/ModelDet/analysisPoint.py b/Bank_second_part/detect_process/ModelDet/analysisPoint.py new file mode 100644 index 0000000..2297fdc --- /dev/null +++ b/Bank_second_part/detect_process/ModelDet/analysisPoint.py @@ -0,0 +1,152 @@ +# Copyright 2020 The MediaPipe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""MediaPipe solution drawing utils.""" + +import math +from typing import List, Mapping, Optional, Tuple, Union + +import cv2 +import dataclasses +import matplotlib.pyplot as plt +import numpy as np + +from mediapipe.framework.formats import detection_pb2 +from mediapipe.framework.formats import location_data_pb2 +from mediapipe.framework.formats import landmark_pb2 + +_PRESENCE_THRESHOLD = 0.5 +_VISIBILITY_THRESHOLD = 0.5 +_BGR_CHANNELS = 3 + +WHITE_COLOR = (224, 224, 224) +BLACK_COLOR = (0, 0, 0) +RED_COLOR = (0, 0, 255) +GREEN_COLOR = (0, 128, 0) +BLUE_COLOR = (255, 0, 0) + + +@dataclasses.dataclass +class DrawingSpec: + # Color for drawing the annotation. Default to the white color. + color: Tuple[int, int, int] = WHITE_COLOR + # Thickness for drawing the annotation. Default to 2 pixels. + thickness: int = 2 + # Circle radius. Default to 2 pixels. + circle_radius: int = 2 + + +def _normalized_to_pixel_coordinates( + normalized_x: float, normalized_y: float, image_width: int, + image_height: int) -> Union[None, Tuple[int, int]]: + """Converts normalized value pair to pixel coordinates.""" + + # Checks if the float value is between 0 and 1. + def is_valid_normalized_value(value: float) -> bool: + return (value > 0 or math.isclose(0, value)) and (value < 1 or + math.isclose(1, value)) + + if not (is_valid_normalized_value(normalized_x) and + is_valid_normalized_value(normalized_y)): + # TODO: Draw coordinates even if it's outside of the image bounds. + return None + x_px = min(math.floor(normalized_x * image_width), image_width - 1) + y_px = min(math.floor(normalized_y * image_height), image_height - 1) + return x_px, y_px + + + +def draw_landmarks( + image: np.ndarray, + landmark_list: landmark_pb2.NormalizedLandmarkList, + connections: Optional[List[Tuple[int, int]]] = None): + """Draws the landmarks and the connections on the image. + + Args: + image: A three channel BGR image represented as numpy ndarray. + landmark_list: A normalized landmark list proto message to be annotated on + the image. + connections: A list of landmark index tuples that specifies how landmarks to + be connected in the drawing. + landmark_drawing_spec: Either a DrawingSpec object or a mapping from hand + landmarks to the DrawingSpecs that specifies the landmarks' drawing + settings such as color, line thickness, and circle radius. If this + argument is explicitly set to None, no landmarks will be drawn. + connection_drawing_spec: Either a DrawingSpec object or a mapping from hand + connections to the DrawingSpecs that specifies the connections' drawing + settings such as color and line thickness. If this argument is explicitly + set to None, no landmark connections will be drawn. + + Raises: + ValueError: If one of the followings: + a) If the input image is not three channel BGR. + b) If any connetions contain invalid landmark index. + """ + if not landmark_list: + return + if image.shape[2] != _BGR_CHANNELS: + raise ValueError('Input image must contain three channel bgr data.') + image_rows, image_cols, _ = image.shape + + # 所有的点转换成坐标的字典 + idx_to_coordinates = {} + for idx, landmark in enumerate(landmark_list.landmark): + # print('landmark:',landmark) + if ((landmark.HasField('visibility') and + landmark.visibility < _VISIBILITY_THRESHOLD) or + (landmark.HasField('presence') and + landmark.presence < _PRESENCE_THRESHOLD)): + continue + landmark_px = _normalized_to_pixel_coordinates(landmark.x, landmark.y, + image_cols, image_rows) + # print('landmark_px:',landmark_px) + if landmark_px: + idx_to_coordinates[idx] = landmark_px + + + if connections: + num_landmarks = len(landmark_list.landmark) + # print('connections:',connections) + + # Draws the connections if the start and end landmarks are both visible. + + start_list = [] + end_list = [] + for connection in connections: + # print(connection) + + start_idx = connection[0] + end_idx = connection[1] + + start_list.append(start_idx) + end_list.append(end_idx) + + + point_list = [] + for point_idx in end_list: + + # if point_idx not in start_list: + + # print(point_idx) + point_list.append(point_idx) + + + point_axis_list = [] + for point in point_list: + + if point in list(idx_to_coordinates.keys()): + point_axis_list.append(idx_to_coordinates[point]) + + + return point_axis_list + \ No newline at end of file diff --git a/Bank_second_part/detect_process/ModelDet/holisticDet.py b/Bank_second_part/detect_process/ModelDet/holisticDet.py new file mode 100644 index 0000000..8281517 --- /dev/null +++ b/Bank_second_part/detect_process/ModelDet/holisticDet.py @@ -0,0 +1,86 @@ +import cv2 +import mediapipe as mp + +import analysisPoint as mp_drawing +mp_holistic = mp.solutions.holistic +import numpy as np + +class MediapipeProcess: + + def mediapipe_det(image,holistic): + + ''' + 调用模型获得检测结果 + ''' + + image.flags.writeable = False + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + results = holistic.process(image) + + return results + + def get_analysis_result(image,results): + + ''' + images: 检测的图片 + results: 图片的检测结果 + 对上述结果进行分析 + ''' + + image.flags.writeable = True + image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + + face_result = mp_drawing.draw_landmarks( + image, + results.face_landmarks, + mp_holistic.FACEMESH_CONTOURS) + + right_hand_result = mp_drawing.draw_landmarks( + image, + results.right_hand_landmarks, + mp_holistic.HAND_CONNECTIONS) + + left_hand_result = mp_drawing.draw_landmarks( + image, + results.left_hand_landmarks, + mp_holistic.HAND_CONNECTIONS) + + face_bbox = MediapipeProcess.point_to_bbox(face_result) + right_hand_bbox = MediapipeProcess.point_to_bbox(right_hand_result) + left_hand_bbox = MediapipeProcess.point_to_bbox(left_hand_result) + + result_dict = {'face_bbox':face_bbox,'hand_bbox':[right_hand_bbox,left_hand_bbox]} + + return result_dict + + + + def point_to_bbox(result_list): + + ''' + 根据关键点坐标,获取坐标点的最小外接矩形 + ''' + + result_array = np.array(result_list) + + if result_array.all(): + + rect = cv2.minAreaRect(result_array) # 得到最小外接矩形的(中心(x,y), (宽,高), 旋转角度) + bbox = cv2.boxPoints(rect) # 获取最小外接矩形的4个顶点坐标(ps: cv2.boxPoints(rect) for OpenCV 3.x) + bbox = np.int0(bbox) + + return bbox + + else: + pass + + + + + + + + +# if __name__ == '__main__': +# # media_holistic(video_file='E:/Bank_files/Bank_02/dataset/video_person/after_1/0711-1_199_0.avi', + # video_save_path='E:/Bank_files/Bank_02/videos_mediapipe/test_data/0725_test') \ No newline at end of file diff --git a/Bank_second_part/detect_process/ModelDet/personDet.py b/Bank_second_part/detect_process/ModelDet/personDet.py new file mode 100644 index 0000000..2b5d33e --- /dev/null +++ b/Bank_second_part/detect_process/ModelDet/personDet.py @@ -0,0 +1,39 @@ + +def analysis_yolov8(frame, model_coco,confidence_set): + + # 第一步:用COCO数据集推理 + results_coco = model_coco(frame) + + if results_coco: + + for r in results_coco: + + boxes = r.boxes + re_list = [] + idx = 0 + + for box in boxes: + + idx += 1 + b = box.xyxy[0] # get box coordinates in (top, left, bottom, right) format + c = box.cls + + # 保存标签和坐标值作为返回结果 + blist = b.tolist() + labels_name = model_coco.names[int(c)] + + confidence = float(box.conf) + + confidence = round(confidence, 2) + + # 过滤置信度以下目标 + if confidence < confidence_set: + + continue + + # 一个结果字典 + re_dict = {labels_name:blist} + + re_list.append(re_dict) + + return re_list diff --git a/Bank_second_part/detect_process/config.py b/Bank_second_part/detect_process/config.py new file mode 100644 index 0000000..c5e7ef8 --- /dev/null +++ b/Bank_second_part/detect_process/config.py @@ -0,0 +1,3 @@ + +# 队列长度 +Q_SZ = 200 \ No newline at end of file diff --git a/Bank_second_part/detect_process/tools.py b/Bank_second_part/detect_process/tools.py new file mode 100644 index 0000000..8d9af60 --- /dev/null +++ b/Bank_second_part/detect_process/tools.py @@ -0,0 +1,212 @@ +import cv2 +import os + +class Process_tools(): + + # 图像文件夹 + def get_video_list(path): + video_ext = [".mp4", ".avi",".MP4"] + video_names = [] + for maindir, subdir, file_name_list in os.walk(path): + for filename in file_name_list: + apath = os.path.join(maindir, filename) + ext = os.path.splitext(apath)[1] + if ext in video_ext: + video_names.append(apath) + return video_names + + + # 截取裁剪需要的视频帧 + def save_seg_video(video_name,frameToStart,frametoStop,videoWriter,bbox): + + cap = cv2.VideoCapture(video_name) + count = 0 + while True: + + success, frame = cap.read() + + if success: + + count += 1 + if count <= frametoStop and count > frameToStart: # 选取起始帧 + print('correct= ', count) + + #裁剪视频画面 + frame_target = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])] # (split_height, split_width) + + videoWriter.write(frame_target) + + if not success or count >= frametoStop: + break + + print('end') + + + # 获得字典中所有values值(这个值是列表) + def get_dict_values(lst): + """ + 获取列表中所有字典的 values 值(如果值是列表) + + 参数: + lst: 包含字典的列表 + + 返回值: + values: 包含所有字典的 values 值的列表(如果值是列表) + """ + return [value for dictionary in lst for value in dictionary.values() if isinstance(value, list)] + + + + # 解析检测后的结果,为检测后的结果排序 + def analysis_sort_list(result_dict): + + # print('result_dict:',result_dict) + + # 获得检测列表 + re_list = result_dict['start_bbox'] + # print('re_list:',re_list) + + # 获得列表中所有字典的values值 + re_bbox_list = Process_tools.get_dict_values(re_list) + + # 为检测出来的标注框排序 + sorted_lst = sorted(re_bbox_list, key=lambda x: x[0]) + + return sorted_lst + + + #对比重叠率高的两个部分,并结合标注框,保存最大的标注框 + def contrast_bbox(e_bbox,r_bbox): + + e_bbox_min = e_bbox[:2] + r_bbox_min = r_bbox[:2] + + bbox_min = [min(x, y) for x, y in zip(e_bbox_min, r_bbox_min)] + + e_bbox_max = e_bbox[-2:] + r_bbox_max = r_bbox[-2:] + + bbox_max = [max(x, y) for x, y in zip(e_bbox_max, r_bbox_max)] + + bbox = bbox_min + bbox_max + + return bbox + + + + # 解析result_list列表 + def analysis_re01_list(example_dict,result_dict): + + # 第一次检测到目标的帧率和信息 + example_dict_fps = list(example_dict.keys())[0] + example_sorted_lst = Process_tools.analysis_sort_list(example_dict) + + # 当前帧检测结果中所有的检测结果数值 + re_dict_fps = list(result_dict.keys())[0] + re_dict_sorted_lst = Process_tools.analysis_sort_list(result_dict) + + # 保存前后帧率连续的范围、筛选出相同的部分 + cut_list = [] + example_temp = [] + re_temp = [] + + for i,ex_bbox in enumerate(example_sorted_lst): + + for j,re_bbox in enumerate(re_dict_sorted_lst): + + iou = Process_tools.calculate_iou(box1=ex_bbox, box2=re_bbox) + + # print(iou) + + if iou > 0: + + bbox = Process_tools.contrast_bbox(e_bbox=ex_bbox,r_bbox=re_bbox) + + cut_list.append({i:bbox}) + example_temp.append(ex_bbox) + re_temp.append(re_bbox) + + break + + else: + continue + + example_sorted_lst = [item for item in example_sorted_lst if item not in example_temp] + re_dict_sorted_lst = [item for item in re_dict_sorted_lst if item not in re_temp] + + return cut_list,example_sorted_lst,re_dict_sorted_lst + + + # 计算前后帧率重叠范围 + def calculate_iou(box1, box2): + """ + 计算两个边界框之间的IoU值 + + 参数: + box1: 边界框1的坐标(x1, y1, x2, y2) + box2: 边界框2的坐标(x1, y1, x2, y2) + + 返回值: + iou: 两个边界框之间的IoU值 + """ + x1 = max(box1[0], box2[0]) + y1 = max(box1[1], box2[1]) + x2 = min(box1[2], box2[2]) + y2 = min(box1[3], box2[3]) + + # 计算交集区域面积 + intersection_area = max(0, x2 - x1 + 1) * max(0, y2 - y1 + 1) + + # 计算边界框1和边界框2的面积 + box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1) + box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1) + + # 计算并集区域面积 + union_area = box1_area + box2_area - intersection_area + + # 计算IoU值 + iou = intersection_area / union_area + + return iou + + def para_correction(images_size,bbox,dertpara): + + ''' + 修正检测后标注框过小的情况,如果有修正参数则使用修正参数,如果没有就按照坐标值扩大两倍 + + ''' + + if dertpara: + pass + else: + w = (bbox[2] - bbox[0]) /2 + h = (bbox[3] - bbox[1]) /2 + + bbox_extand_list_x = [bbox[0] - w,bbox[2] + w] + bbox_extand_list_y = [bbox[1] - h,bbox[3] + h] + + bbox_list_x = Process_tools.contrast(size=images_size[0],bbox_extand_list=bbox_extand_list_x) + bbox_list_y = Process_tools.contrast(size=images_size[1],bbox_extand_list=bbox_extand_list_y) + + bbox_list = bbox_list_x + bbox_list_y + + return bbox_list + + + def contrast(size,bbox_extand_list): + + ''' + 对比数值是否在这个范围内 + ''' + + bbox_list = [] + + for x in bbox_extand_list: + + if x in range(size): + bbox_list.append(x) + if x > size: + bbox_list.append(size) + if x < 0: + bbox_list.append(0) + return bbox_list \ No newline at end of file diff --git a/Bank_second_part/detect_process/video_process.py b/Bank_second_part/detect_process/video_process.py index ee83010..31ddba4 100644 --- a/Bank_second_part/detect_process/video_process.py +++ b/Bank_second_part/detect_process/video_process.py @@ -2,24 +2,32 @@ import numpy as np import cv2 import os import time -from tqdm import tqdm -from ultralytics import YOLO -from ultralytics.yolo.utils.plotting import Annotator + +from ultralytics import yolo import queue -from yolov8_det import analysis_yolov8 import threading from config import Q_SZ +from ModelDet.personDet import analysis_yolov8 +from ModelDet.holisticDet import MediapipeProcess class DealVideo(): - def __init__(self,video_file,video_save_file): + def __init__(self,video_file,video_save_file,person_model,mediapipe_model,pptsmv2_model): + + ''' + 加载数据 + ''' self.video_file = video_file self.video_save_file = video_save_file + # 初始化模型 + self.person_model = person_model + self.mediapipe_model = mediapipe_model + self.pptsmv2_model = pptsmv2_model # 图片检测后队列 self.videoQueue = queue.Queue(maxsize=Q_SZ) @@ -30,9 +38,13 @@ class DealVideo(): self.get_video_frameThread = threading.Thread(target=self.get_video_frame) self.write_videoThread = threading.Thread(target=self.write_video) - # 图像文件夹 + def get_video_list(self): + ''' + 获取数据文件 + ''' + if os.path.isdir(self.video_file): video_ext = [".mp4", ".avi",".MP4"] @@ -46,39 +58,61 @@ class DealVideo(): else: self.videoQueue.put(self.video_file) - - def get_video_frame(self): + ''' + 对视频进行分帧、每一帧都保存队列 + ''' + while True: if ~self.videoQueue.empty(): - video_path = self.videoQueue.get() + + try: + video_path = self.videoQueue.get() - video_basename = os.path.basename(video_path).split('.')[0] + video_basename = os.path.basename(video_path).split('.')[0] - # print('video_path:',video_path) + cap = cv2.VideoCapture(video_path) - cap = cv2.VideoCapture(video_path) + frame_list = [] + count_fps = 0 - frame_list = [] - count_fps = 0 + while cap.isOpened(): + success, frame = cap.read() + if not success: + print("Ignoring empty camera frame.") + break + count_fps += 1 - while cap.isOpened(): - success, frame = cap.read() - if not success: - print("Ignoring empty camera frame.") - break - count_fps += 1 + frame_dict = {'fps':count_fps,'frame':frame} + frame_list.append(frame_dict) + + video_dict = {'video_path':video_path,'frame_list':frame_list,'cap':cap} + + self.frameQueue.put(video_dict) + except Exception as e: + print(e) + + def person_det(self): + + while True: + + if ~self.frameQueue.empty(): + + video_dict = self.frameQueue.get() + + person_det = analysis_yolov8(frame=video_dict,model_coco=self.person_model,confidence_set=0.5) + + + # pass - frame_dict = {'fps':count_fps,'frame':frame} - frame_list.append(frame_dict) - video_dict = {'video_path':video_path,'frame_list':frame_list,'cap':cap} - - self.frameQueue.put(video_dict) - def write_video(self): + ''' + 保存成视频 + ''' + while True: if ~self.frameQueue.empty(): video_frame_dict = self.frameQueue.get() @@ -116,173 +150,7 @@ class DealVideo(): self.write_videoThread.start() -class Process_tools(): - - # 图像文件夹 - def get_video_list(path): - video_ext = [".mp4", ".avi",".MP4"] - video_names = [] - for maindir, subdir, file_name_list in os.walk(path): - for filename in file_name_list: - apath = os.path.join(maindir, filename) - ext = os.path.splitext(apath)[1] - if ext in video_ext: - video_names.append(apath) - return video_names - - - # 截取裁剪需要的视频帧 - def save_seg_video(video_name,frameToStart,frametoStop,videoWriter,bbox): - - cap = cv2.VideoCapture(video_name) - count = 0 - while True: - - success, frame = cap.read() - - if success: - - count += 1 - if count <= frametoStop and count > frameToStart: # 选取起始帧 - print('correct= ', count) - - #裁剪视频画面 - frame_target = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])] # (split_height, split_width) - - videoWriter.write(frame_target) - - if not success or count >= frametoStop: - break - - print('end') - - - - # 获得字典中所有values值(这个值是列表) - def get_dict_values(lst): - """ - 获取列表中所有字典的 values 值(如果值是列表) - - 参数: - lst: 包含字典的列表 - - 返回值: - values: 包含所有字典的 values 值的列表(如果值是列表) - """ - return [value for dictionary in lst for value in dictionary.values() if isinstance(value, list)] - - - # 解析检测后的结果,为检测后的结果排序 - def analysis_sort_list(result_dict): - - # print('result_dict:',result_dict) - - # 获得检测列表 - re_list = result_dict['start_bbox'] - # print('re_list:',re_list) - - # 获得列表中所有字典的values值 - re_bbox_list = get_dict_values(re_list) - - # 为检测出来的标注框排序 - sorted_lst = sorted(re_bbox_list, key=lambda x: x[0]) - - return sorted_lst - - - #对比重叠率高的两个部分,并结合标注框,保存最大的标注框 - def contrast_bbox(e_bbox,r_bbox): - - e_bbox_min = e_bbox[:2] - r_bbox_min = r_bbox[:2] - - bbox_min = [min(x, y) for x, y in zip(e_bbox_min, r_bbox_min)] - - e_bbox_max = e_bbox[-2:] - r_bbox_max = r_bbox[-2:] - - bbox_max = [max(x, y) for x, y in zip(e_bbox_max, r_bbox_max)] - - bbox = bbox_min + bbox_max - - return bbox - - - # 解析result_list列表 - def analysis_re01_list(example_dict,result_dict): - - # 第一次检测到目标的帧率和信息 - example_dict_fps = list(example_dict.keys())[0] - example_sorted_lst = Process_tools.analysis_sort_list(example_dict) - - # 当前帧检测结果中所有的检测结果数值 - re_dict_fps = list(result_dict.keys())[0] - re_dict_sorted_lst = Process_tools.analysis_sort_list(result_dict) - - # 保存前后帧率连续的范围、筛选出相同的部分 - cut_list = [] - example_temp = [] - re_temp = [] - - for i,ex_bbox in enumerate(example_sorted_lst): - - for j,re_bbox in enumerate(re_dict_sorted_lst): - - iou = calculate_iou(box1=ex_bbox, box2=re_bbox) - - # print(iou) - - if iou > 0: - - bbox = contrast_bbox(e_bbox=ex_bbox,r_bbox=re_bbox) - - cut_list.append({i:bbox}) - example_temp.append(ex_bbox) - re_temp.append(re_bbox) - - break - - else: - continue - - example_sorted_lst = [item for item in example_sorted_lst if item not in example_temp] - re_dict_sorted_lst = [item for item in re_dict_sorted_lst if item not in re_temp] - - return cut_list,example_sorted_lst,re_dict_sorted_lst - - - # 计算前后帧率重叠范围 - def calculate_iou(box1, box2): - """ - 计算两个边界框之间的IoU值 - - 参数: - box1: 边界框1的坐标(x1, y1, x2, y2) - box2: 边界框2的坐标(x1, y1, x2, y2) - - 返回值: - iou: 两个边界框之间的IoU值 - """ - x1 = max(box1[0], box2[0]) - y1 = max(box1[1], box2[1]) - x2 = min(box1[2], box2[2]) - y2 = min(box1[3], box2[3]) - - # 计算交集区域面积 - intersection_area = max(0, x2 - x1 + 1) * max(0, y2 - y1 + 1) - - # 计算边界框1和边界框2的面积 - box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1) - box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1) - - # 计算并集区域面积 - union_area = box1_area + box2_area - intersection_area - - # 计算IoU值 - iou = intersection_area / union_area - - return iou diff --git a/tool/PP_TSMv2_infer.py b/tool/PP_TSMv2_infer.py index 7506215..e1f198a 100644 --- a/tool/PP_TSMv2_infer.py +++ b/tool/PP_TSMv2_infer.py @@ -106,6 +106,7 @@ class PP_TSMv2(object): paddle.jit.save(model,osp.join(output_p, model_name if self.save_name is None else self.save_name)) print(f"model ({model_name}) has been already saved in ({output_p}).") return model + def predict(self,config,input_f,batch_size,model_f,params_f): @@ -158,7 +159,6 @@ class PP_TSMv2(object): #输出推理结果 InferenceHelper.postprocess(batched_outputs,True) -