0728更新代码逻辑,模型推理部分

V0.1.0
王莹 2 years ago
parent 933ae42228
commit ac998da682

@ -0,0 +1,152 @@
# Copyright 2020 The MediaPipe Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MediaPipe solution drawing utils."""
import math
from typing import List, Mapping, Optional, Tuple, Union
import cv2
import dataclasses
import matplotlib.pyplot as plt
import numpy as np
from mediapipe.framework.formats import detection_pb2
from mediapipe.framework.formats import location_data_pb2
from mediapipe.framework.formats import landmark_pb2
_PRESENCE_THRESHOLD = 0.5
_VISIBILITY_THRESHOLD = 0.5
_BGR_CHANNELS = 3
WHITE_COLOR = (224, 224, 224)
BLACK_COLOR = (0, 0, 0)
RED_COLOR = (0, 0, 255)
GREEN_COLOR = (0, 128, 0)
BLUE_COLOR = (255, 0, 0)
@dataclasses.dataclass
class DrawingSpec:
# Color for drawing the annotation. Default to the white color.
color: Tuple[int, int, int] = WHITE_COLOR
# Thickness for drawing the annotation. Default to 2 pixels.
thickness: int = 2
# Circle radius. Default to 2 pixels.
circle_radius: int = 2
def _normalized_to_pixel_coordinates(
normalized_x: float, normalized_y: float, image_width: int,
image_height: int) -> Union[None, Tuple[int, int]]:
"""Converts normalized value pair to pixel coordinates."""
# Checks if the float value is between 0 and 1.
def is_valid_normalized_value(value: float) -> bool:
return (value > 0 or math.isclose(0, value)) and (value < 1 or
math.isclose(1, value))
if not (is_valid_normalized_value(normalized_x) and
is_valid_normalized_value(normalized_y)):
# TODO: Draw coordinates even if it's outside of the image bounds.
return None
x_px = min(math.floor(normalized_x * image_width), image_width - 1)
y_px = min(math.floor(normalized_y * image_height), image_height - 1)
return x_px, y_px
def draw_landmarks(
image: np.ndarray,
landmark_list: landmark_pb2.NormalizedLandmarkList,
connections: Optional[List[Tuple[int, int]]] = None):
"""Draws the landmarks and the connections on the image.
Args:
image: A three channel BGR image represented as numpy ndarray.
landmark_list: A normalized landmark list proto message to be annotated on
the image.
connections: A list of landmark index tuples that specifies how landmarks to
be connected in the drawing.
landmark_drawing_spec: Either a DrawingSpec object or a mapping from hand
landmarks to the DrawingSpecs that specifies the landmarks' drawing
settings such as color, line thickness, and circle radius. If this
argument is explicitly set to None, no landmarks will be drawn.
connection_drawing_spec: Either a DrawingSpec object or a mapping from hand
connections to the DrawingSpecs that specifies the connections' drawing
settings such as color and line thickness. If this argument is explicitly
set to None, no landmark connections will be drawn.
Raises:
ValueError: If one of the followings:
a) If the input image is not three channel BGR.
b) If any connetions contain invalid landmark index.
"""
if not landmark_list:
return
if image.shape[2] != _BGR_CHANNELS:
raise ValueError('Input image must contain three channel bgr data.')
image_rows, image_cols, _ = image.shape
# 所有的点转换成坐标的字典
idx_to_coordinates = {}
for idx, landmark in enumerate(landmark_list.landmark):
# print('landmark:',landmark)
if ((landmark.HasField('visibility') and
landmark.visibility < _VISIBILITY_THRESHOLD) or
(landmark.HasField('presence') and
landmark.presence < _PRESENCE_THRESHOLD)):
continue
landmark_px = _normalized_to_pixel_coordinates(landmark.x, landmark.y,
image_cols, image_rows)
# print('landmark_px:',landmark_px)
if landmark_px:
idx_to_coordinates[idx] = landmark_px
if connections:
num_landmarks = len(landmark_list.landmark)
# print('connections:',connections)
# Draws the connections if the start and end landmarks are both visible.
start_list = []
end_list = []
for connection in connections:
# print(connection)
start_idx = connection[0]
end_idx = connection[1]
start_list.append(start_idx)
end_list.append(end_idx)
point_list = []
for point_idx in end_list:
# if point_idx not in start_list:
# print(point_idx)
point_list.append(point_idx)
point_axis_list = []
for point in point_list:
if point in list(idx_to_coordinates.keys()):
point_axis_list.append(idx_to_coordinates[point])
return point_axis_list

@ -0,0 +1,86 @@
import cv2
import mediapipe as mp
import analysisPoint as mp_drawing
mp_holistic = mp.solutions.holistic
import numpy as np
class MediapipeProcess:
def mediapipe_det(image,holistic):
'''
调用模型获得检测结果
'''
image.flags.writeable = False
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = holistic.process(image)
return results
def get_analysis_result(image,results):
'''
images: 检测的图片
results: 图片的检测结果
对上述结果进行分析
'''
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
face_result = mp_drawing.draw_landmarks(
image,
results.face_landmarks,
mp_holistic.FACEMESH_CONTOURS)
right_hand_result = mp_drawing.draw_landmarks(
image,
results.right_hand_landmarks,
mp_holistic.HAND_CONNECTIONS)
left_hand_result = mp_drawing.draw_landmarks(
image,
results.left_hand_landmarks,
mp_holistic.HAND_CONNECTIONS)
face_bbox = MediapipeProcess.point_to_bbox(face_result)
right_hand_bbox = MediapipeProcess.point_to_bbox(right_hand_result)
left_hand_bbox = MediapipeProcess.point_to_bbox(left_hand_result)
result_dict = {'face_bbox':face_bbox,'hand_bbox':[right_hand_bbox,left_hand_bbox]}
return result_dict
def point_to_bbox(result_list):
'''
根据关键点坐标获取坐标点的最小外接矩形
'''
result_array = np.array(result_list)
if result_array.all():
rect = cv2.minAreaRect(result_array) # 得到最小外接矩形的(中心(x,y), (宽,高), 旋转角度)
bbox = cv2.boxPoints(rect) # 获取最小外接矩形的4个顶点坐标(ps: cv2.boxPoints(rect) for OpenCV 3.x)
bbox = np.int0(bbox)
return bbox
else:
pass
# if __name__ == '__main__':
# # media_holistic(video_file='E:/Bank_files/Bank_02/dataset/video_person/after_1/0711-1_199_0.avi',
# video_save_path='E:/Bank_files/Bank_02/videos_mediapipe/test_data/0725_test')

@ -0,0 +1,39 @@
def analysis_yolov8(frame, model_coco,confidence_set):
# 第一步用COCO数据集推理
results_coco = model_coco(frame)
if results_coco:
for r in results_coco:
boxes = r.boxes
re_list = []
idx = 0
for box in boxes:
idx += 1
b = box.xyxy[0] # get box coordinates in (top, left, bottom, right) format
c = box.cls
# 保存标签和坐标值作为返回结果
blist = b.tolist()
labels_name = model_coco.names[int(c)]
confidence = float(box.conf)
confidence = round(confidence, 2)
# 过滤置信度以下目标
if confidence < confidence_set:
continue
# 一个结果字典
re_dict = {labels_name:blist}
re_list.append(re_dict)
return re_list

@ -0,0 +1,3 @@
# 队列长度
Q_SZ = 200

@ -0,0 +1,212 @@
import cv2
import os
class Process_tools():
# 图像文件夹
def get_video_list(path):
video_ext = [".mp4", ".avi",".MP4"]
video_names = []
for maindir, subdir, file_name_list in os.walk(path):
for filename in file_name_list:
apath = os.path.join(maindir, filename)
ext = os.path.splitext(apath)[1]
if ext in video_ext:
video_names.append(apath)
return video_names
# 截取裁剪需要的视频帧
def save_seg_video(video_name,frameToStart,frametoStop,videoWriter,bbox):
cap = cv2.VideoCapture(video_name)
count = 0
while True:
success, frame = cap.read()
if success:
count += 1
if count <= frametoStop and count > frameToStart: # 选取起始帧
print('correct= ', count)
#裁剪视频画面
frame_target = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])] # (split_height, split_width)
videoWriter.write(frame_target)
if not success or count >= frametoStop:
break
print('end')
# 获得字典中所有values值这个值是列表
def get_dict_values(lst):
"""
获取列表中所有字典的 values 如果值是列表
参数:
lst: 包含字典的列表
返回值:
values: 包含所有字典的 values 值的列表如果值是列表
"""
return [value for dictionary in lst for value in dictionary.values() if isinstance(value, list)]
# 解析检测后的结果,为检测后的结果排序
def analysis_sort_list(result_dict):
# print('result_dict:',result_dict)
# 获得检测列表
re_list = result_dict['start_bbox']
# print('re_list:',re_list)
# 获得列表中所有字典的values值
re_bbox_list = Process_tools.get_dict_values(re_list)
# 为检测出来的标注框排序
sorted_lst = sorted(re_bbox_list, key=lambda x: x[0])
return sorted_lst
#对比重叠率高的两个部分,并结合标注框,保存最大的标注框
def contrast_bbox(e_bbox,r_bbox):
e_bbox_min = e_bbox[:2]
r_bbox_min = r_bbox[:2]
bbox_min = [min(x, y) for x, y in zip(e_bbox_min, r_bbox_min)]
e_bbox_max = e_bbox[-2:]
r_bbox_max = r_bbox[-2:]
bbox_max = [max(x, y) for x, y in zip(e_bbox_max, r_bbox_max)]
bbox = bbox_min + bbox_max
return bbox
# 解析result_list列表
def analysis_re01_list(example_dict,result_dict):
# 第一次检测到目标的帧率和信息
example_dict_fps = list(example_dict.keys())[0]
example_sorted_lst = Process_tools.analysis_sort_list(example_dict)
# 当前帧检测结果中所有的检测结果数值
re_dict_fps = list(result_dict.keys())[0]
re_dict_sorted_lst = Process_tools.analysis_sort_list(result_dict)
# 保存前后帧率连续的范围、筛选出相同的部分
cut_list = []
example_temp = []
re_temp = []
for i,ex_bbox in enumerate(example_sorted_lst):
for j,re_bbox in enumerate(re_dict_sorted_lst):
iou = Process_tools.calculate_iou(box1=ex_bbox, box2=re_bbox)
# print(iou)
if iou > 0:
bbox = Process_tools.contrast_bbox(e_bbox=ex_bbox,r_bbox=re_bbox)
cut_list.append({i:bbox})
example_temp.append(ex_bbox)
re_temp.append(re_bbox)
break
else:
continue
example_sorted_lst = [item for item in example_sorted_lst if item not in example_temp]
re_dict_sorted_lst = [item for item in re_dict_sorted_lst if item not in re_temp]
return cut_list,example_sorted_lst,re_dict_sorted_lst
# 计算前后帧率重叠范围
def calculate_iou(box1, box2):
"""
计算两个边界框之间的IoU值
参数:
box1: 边界框1的坐标x1, y1, x2, y2
box2: 边界框2的坐标x1, y1, x2, y2
返回值:
iou: 两个边界框之间的IoU值
"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
# 计算交集区域面积
intersection_area = max(0, x2 - x1 + 1) * max(0, y2 - y1 + 1)
# 计算边界框1和边界框2的面积
box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)
# 计算并集区域面积
union_area = box1_area + box2_area - intersection_area
# 计算IoU值
iou = intersection_area / union_area
return iou
def para_correction(images_size,bbox,dertpara):
'''
修正检测后标注框过小的情况,如果有修正参数则使用修正参数如果没有就按照坐标值扩大两倍
'''
if dertpara:
pass
else:
w = (bbox[2] - bbox[0]) /2
h = (bbox[3] - bbox[1]) /2
bbox_extand_list_x = [bbox[0] - w,bbox[2] + w]
bbox_extand_list_y = [bbox[1] - h,bbox[3] + h]
bbox_list_x = Process_tools.contrast(size=images_size[0],bbox_extand_list=bbox_extand_list_x)
bbox_list_y = Process_tools.contrast(size=images_size[1],bbox_extand_list=bbox_extand_list_y)
bbox_list = bbox_list_x + bbox_list_y
return bbox_list
def contrast(size,bbox_extand_list):
'''
对比数值是否在这个范围内
'''
bbox_list = []
for x in bbox_extand_list:
if x in range(size):
bbox_list.append(x)
if x > size:
bbox_list.append(size)
if x < 0:
bbox_list.append(0)
return bbox_list

@ -2,24 +2,32 @@ import numpy as np
import cv2
import os
import time
from tqdm import tqdm
from ultralytics import YOLO
from ultralytics.yolo.utils.plotting import Annotator
from ultralytics import yolo
import queue
from yolov8_det import analysis_yolov8
import threading
from config import Q_SZ
from ModelDet.personDet import analysis_yolov8
from ModelDet.holisticDet import MediapipeProcess
class DealVideo():
def __init__(self,video_file,video_save_file):
def __init__(self,video_file,video_save_file,person_model,mediapipe_model,pptsmv2_model):
'''
加载数据
'''
self.video_file = video_file
self.video_save_file = video_save_file
# 初始化模型
self.person_model = person_model
self.mediapipe_model = mediapipe_model
self.pptsmv2_model = pptsmv2_model
# 图片检测后队列
self.videoQueue = queue.Queue(maxsize=Q_SZ)
@ -30,9 +38,13 @@ class DealVideo():
self.get_video_frameThread = threading.Thread(target=self.get_video_frame)
self.write_videoThread = threading.Thread(target=self.write_video)
# 图像文件夹
def get_video_list(self):
'''
获取数据文件
'''
if os.path.isdir(self.video_file):
video_ext = [".mp4", ".avi",".MP4"]
@ -46,18 +58,20 @@ class DealVideo():
else:
self.videoQueue.put(self.video_file)
def get_video_frame(self):
'''
对视频进行分帧每一帧都保存队列
'''
while True:
if ~self.videoQueue.empty():
try:
video_path = self.videoQueue.get()
video_basename = os.path.basename(video_path).split('.')[0]
# print('video_path:',video_path)
cap = cv2.VideoCapture(video_path)
frame_list = []
@ -76,9 +90,29 @@ class DealVideo():
video_dict = {'video_path':video_path,'frame_list':frame_list,'cap':cap}
self.frameQueue.put(video_dict)
except Exception as e:
print(e)
def person_det(self):
while True:
if ~self.frameQueue.empty():
video_dict = self.frameQueue.get()
person_det = analysis_yolov8(frame=video_dict,model_coco=self.person_model,confidence_set=0.5)
# pass
def write_video(self):
'''
保存成视频
'''
while True:
if ~self.frameQueue.empty():
video_frame_dict = self.frameQueue.get()
@ -116,173 +150,7 @@ class DealVideo():
self.write_videoThread.start()
class Process_tools():
# 图像文件夹
def get_video_list(path):
video_ext = [".mp4", ".avi",".MP4"]
video_names = []
for maindir, subdir, file_name_list in os.walk(path):
for filename in file_name_list:
apath = os.path.join(maindir, filename)
ext = os.path.splitext(apath)[1]
if ext in video_ext:
video_names.append(apath)
return video_names
# 截取裁剪需要的视频帧
def save_seg_video(video_name,frameToStart,frametoStop,videoWriter,bbox):
cap = cv2.VideoCapture(video_name)
count = 0
while True:
success, frame = cap.read()
if success:
count += 1
if count <= frametoStop and count > frameToStart: # 选取起始帧
print('correct= ', count)
#裁剪视频画面
frame_target = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])] # (split_height, split_width)
videoWriter.write(frame_target)
if not success or count >= frametoStop:
break
print('end')
# 获得字典中所有values值这个值是列表
def get_dict_values(lst):
"""
获取列表中所有字典的 values 如果值是列表
参数:
lst: 包含字典的列表
返回值:
values: 包含所有字典的 values 值的列表如果值是列表
"""
return [value for dictionary in lst for value in dictionary.values() if isinstance(value, list)]
# 解析检测后的结果,为检测后的结果排序
def analysis_sort_list(result_dict):
# print('result_dict:',result_dict)
# 获得检测列表
re_list = result_dict['start_bbox']
# print('re_list:',re_list)
# 获得列表中所有字典的values值
re_bbox_list = get_dict_values(re_list)
# 为检测出来的标注框排序
sorted_lst = sorted(re_bbox_list, key=lambda x: x[0])
return sorted_lst
#对比重叠率高的两个部分,并结合标注框,保存最大的标注框
def contrast_bbox(e_bbox,r_bbox):
e_bbox_min = e_bbox[:2]
r_bbox_min = r_bbox[:2]
bbox_min = [min(x, y) for x, y in zip(e_bbox_min, r_bbox_min)]
e_bbox_max = e_bbox[-2:]
r_bbox_max = r_bbox[-2:]
bbox_max = [max(x, y) for x, y in zip(e_bbox_max, r_bbox_max)]
bbox = bbox_min + bbox_max
return bbox
# 解析result_list列表
def analysis_re01_list(example_dict,result_dict):
# 第一次检测到目标的帧率和信息
example_dict_fps = list(example_dict.keys())[0]
example_sorted_lst = Process_tools.analysis_sort_list(example_dict)
# 当前帧检测结果中所有的检测结果数值
re_dict_fps = list(result_dict.keys())[0]
re_dict_sorted_lst = Process_tools.analysis_sort_list(result_dict)
# 保存前后帧率连续的范围、筛选出相同的部分
cut_list = []
example_temp = []
re_temp = []
for i,ex_bbox in enumerate(example_sorted_lst):
for j,re_bbox in enumerate(re_dict_sorted_lst):
iou = calculate_iou(box1=ex_bbox, box2=re_bbox)
# print(iou)
if iou > 0:
bbox = contrast_bbox(e_bbox=ex_bbox,r_bbox=re_bbox)
cut_list.append({i:bbox})
example_temp.append(ex_bbox)
re_temp.append(re_bbox)
break
else:
continue
example_sorted_lst = [item for item in example_sorted_lst if item not in example_temp]
re_dict_sorted_lst = [item for item in re_dict_sorted_lst if item not in re_temp]
return cut_list,example_sorted_lst,re_dict_sorted_lst
# 计算前后帧率重叠范围
def calculate_iou(box1, box2):
"""
计算两个边界框之间的IoU值
参数:
box1: 边界框1的坐标x1, y1, x2, y2
box2: 边界框2的坐标x1, y1, x2, y2
返回值:
iou: 两个边界框之间的IoU值
"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
# 计算交集区域面积
intersection_area = max(0, x2 - x1 + 1) * max(0, y2 - y1 + 1)
# 计算边界框1和边界框2的面积
box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)
# 计算并集区域面积
union_area = box1_area + box2_area - intersection_area
# 计算IoU值
iou = intersection_area / union_area
return iou

@ -107,6 +107,7 @@ class PP_TSMv2(object):
print(f"model ({model_name}) has been already saved in ({output_p}).")
return model
def predict(self,config,input_f,batch_size,model_f,params_f):
"""
@ -161,7 +162,6 @@ class PP_TSMv2(object):
def main():
config='/home/xznsh/data/PaddleVideo/configs/recognition/pptsm/v2/pptsm_lcnet_k400_16frames_uniform.yaml' #配置文件地址

Loading…
Cancel
Save