0822上传模型标注代码

V0.1.0
王莹 2 years ago
parent 8d57b523d8
commit 60a14775b7

@ -0,0 +1,95 @@
from xml.etree.ElementTree import ElementTree, Element
# xml换行
def indent(elem, level=0):
i = "\n" + level*"\t"
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + "\t"
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
indent(elem, level+1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
def add_xml(inforsDict,xmlFilePath):
result = inforsDict
for re in result:
# if re['score'] > 0.5:
# 获得标注信息
ObjName = list(re.keys())[0]
xmin = int(list(re.values())[0][0])
ymin = int(list(re.values())[0][1])
xmax = int(list(re.values())[0][2])
ymax = int(list(re.values())[0][3])
# xmax = xmin + r
# ymax = ymin + z
#if ObjName == 'person':
tree = ElementTree()
tree.parse(xmlFilePath)
# 得到根目录
root = tree.getroot()
# 创建一级目录
elementOjb = Element('object')
elementBox = Element('bndbox')
# 创建二级目录
one = Element('name')
one.text = ObjName # 二级目录的值 #结果展示:<id>1</id>
elementOjb.append(one) # 将二级目录加到一级目录里
two = Element('pose')
two.text = "Unspecified"
elementOjb.append(two)
three = Element('truncated')
three.text = "0"
elementOjb.append(three)
four = Element('difficult')
four.text = "0"
elementOjb.append(four)
five = Element('xmin')
five.text = str(xmin)
elementBox.append(five)
six = Element('xmax')
six.text = str(xmax)
elementBox.append(six)
seven = Element('ymin')
seven.text = str(ymin)
elementBox.append(seven)
eight = Element('ymax')
eight.text = str(ymax)
elementBox.append(eight)
# 将一级目录加到根目录里
elementOjb.append(elementBox)
root.append(elementOjb)
# 换行缩进
indent(elementOjb)
indent(elementBox)
# 让结果保存进文件就可以了
tree.write(xmlFilePath, encoding='utf-8', xml_declaration=True)

@ -0,0 +1,152 @@
# Copyright 2020 The MediaPipe Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MediaPipe solution drawing utils."""
import math
from typing import List, Mapping, Optional, Tuple, Union
import cv2
import dataclasses
import matplotlib.pyplot as plt
import numpy as np
from mediapipe.framework.formats import detection_pb2
from mediapipe.framework.formats import location_data_pb2
from mediapipe.framework.formats import landmark_pb2
_PRESENCE_THRESHOLD = 0.5
_VISIBILITY_THRESHOLD = 0.5
_BGR_CHANNELS = 3
WHITE_COLOR = (224, 224, 224)
BLACK_COLOR = (0, 0, 0)
RED_COLOR = (0, 0, 255)
GREEN_COLOR = (0, 128, 0)
BLUE_COLOR = (255, 0, 0)
@dataclasses.dataclass
class DrawingSpec:
# Color for drawing the annotation. Default to the white color.
color: Tuple[int, int, int] = WHITE_COLOR
# Thickness for drawing the annotation. Default to 2 pixels.
thickness: int = 2
# Circle radius. Default to 2 pixels.
circle_radius: int = 2
def _normalized_to_pixel_coordinates(
normalized_x: float, normalized_y: float, image_width: int,
image_height: int) -> Union[None, Tuple[int, int]]:
"""Converts normalized value pair to pixel coordinates."""
# Checks if the float value is between 0 and 1.
def is_valid_normalized_value(value: float) -> bool:
return (value > 0 or math.isclose(0, value)) and (value < 1 or
math.isclose(1, value))
if not (is_valid_normalized_value(normalized_x) and
is_valid_normalized_value(normalized_y)):
# TODO: Draw coordinates even if it's outside of the image bounds.
return None
x_px = min(math.floor(normalized_x * image_width), image_width - 1)
y_px = min(math.floor(normalized_y * image_height), image_height - 1)
return x_px, y_px
def draw_landmarks(
image: np.ndarray,
landmark_list: landmark_pb2.NormalizedLandmarkList,
connections: Optional[List[Tuple[int, int]]] = None):
"""Draws the landmarks and the connections on the image.
Args:
image: A three channel BGR image represented as numpy ndarray.
landmark_list: A normalized landmark list proto message to be annotated on
the image.
connections: A list of landmark index tuples that specifies how landmarks to
be connected in the drawing.
landmark_drawing_spec: Either a DrawingSpec object or a mapping from hand
landmarks to the DrawingSpecs that specifies the landmarks' drawing
settings such as color, line thickness, and circle radius. If this
argument is explicitly set to None, no landmarks will be drawn.
connection_drawing_spec: Either a DrawingSpec object or a mapping from hand
connections to the DrawingSpecs that specifies the connections' drawing
settings such as color and line thickness. If this argument is explicitly
set to None, no landmark connections will be drawn.
Raises:
ValueError: If one of the followings:
a) If the input image is not three channel BGR.
b) If any connetions contain invalid landmark index.
"""
if not landmark_list:
return
if image.shape[2] != _BGR_CHANNELS:
raise ValueError('Input image must contain three channel bgr data.')
image_rows, image_cols, _ = image.shape
# 所有的点转换成坐标的字典
idx_to_coordinates = {}
for idx, landmark in enumerate(landmark_list.landmark):
# print('landmark:',landmark)
if ((landmark.HasField('visibility') and
landmark.visibility < _VISIBILITY_THRESHOLD) or
(landmark.HasField('presence') and
landmark.presence < _PRESENCE_THRESHOLD)):
continue
landmark_px = _normalized_to_pixel_coordinates(landmark.x, landmark.y,
image_cols, image_rows)
# print('landmark_px:',landmark_px)
if landmark_px:
idx_to_coordinates[idx] = landmark_px
if connections:
num_landmarks = len(landmark_list.landmark)
# print('connections:',connections)
# Draws the connections if the start and end landmarks are both visible.
start_list = []
end_list = []
for connection in connections:
# print(connection)
start_idx = connection[0]
end_idx = connection[1]
start_list.append(start_idx)
end_list.append(end_idx)
point_list = []
for point_idx in end_list:
# if point_idx not in start_list:
# print(point_idx)
point_list.append(point_idx)
point_axis_list = []
for point in point_list:
if point in list(idx_to_coordinates.keys()):
point_axis_list.append(idx_to_coordinates[point])
return point_axis_list

@ -0,0 +1,58 @@
from lxml.etree import Element, SubElement, tostring
def create_xml(boxs,img_shape,xml_path):
"""
创建xml文件依次写入xml文件必备关键字
:param boxs: txt文件中的box
:param img_shape: 图片信息xml中需要写入WHC
:return:
"""
node_root = Element('annotation')
node_folder = SubElement(node_root, 'folder')
node_folder.text = 'Images'
node_filename = SubElement(node_root, 'filename')
node_filename.text = str(img_shape[3])
node_size = SubElement(node_root, 'size')
node_width = SubElement(node_size, 'width')
node_width.text = str(img_shape[1])
node_height = SubElement(node_size, 'height')
node_height.text = str(img_shape[0])
node_depth = SubElement(node_size, 'depth')
node_depth.text = str(img_shape[2])
if len(boxs)>=1: # 循环写入box
for box in boxs:
node_object = SubElement(node_root, 'object')
node_name = SubElement(node_object, 'name')
# if str(list_[4]) == "person": # 根据条件筛选需要标注的标签,例如这里只标记person这类不符合则直接跳过
# node_name.text = str(list_[4])
# else:
# continue
node_name.text = str(list(box.keys())[0])
node_difficult = SubElement(node_object, 'difficult')
node_difficult.text = '0'
node_bndbox = SubElement(node_object, 'bndbox')
node_xmin = SubElement(node_bndbox, 'xmin')
node_xmin.text = str(int(list(box.values())[0][0]))
node_ymin = SubElement(node_bndbox, 'ymin')
node_ymin.text = str(int(list(box.values())[0][1]))
node_xmax = SubElement(node_bndbox, 'xmax')
node_xmax.text = str(int(list(box.values())[0][2]))
node_ymax = SubElement(node_bndbox, 'ymax')
node_ymax.text = str(int(list(box.values())[0][3]))
xml = tostring(node_root, pretty_print=True) # 格式化显示,该换行的换行
file_name = img_shape[3].split(".")[0]
# filename = xml_path+"/{}.xml".format(file_name)
f = open(xml_path, "wb")
f.write(xml)
f.close()

@ -0,0 +1,106 @@
import cv2
import mediapipe as mp
import analysisPoint as mp_drawing
mp_holistic = mp.solutions.holistic
import numpy as np
class MediapipeProcess:
def mediapipe_det(image,holistic):
'''
调用模型推理获得检测结果
'''
image.flags.writeable = False
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = holistic.process(image)
return results
def get_analysis_result(image,results):
'''
images: 检测的图片
results: 图片的检测结果
对上述结果进行分析
'''
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
# face_result = mp_drawing.draw_landmarks(
# image,
# results.face_landmarks,
# mp_holistic.FACEMESH_CONTOURS)
right_hand_result = mp_drawing.draw_landmarks(
image,
results.right_hand_landmarks,
mp_holistic.HAND_CONNECTIONS)
left_hand_result = mp_drawing.draw_landmarks(
image,
results.left_hand_landmarks,
mp_holistic.HAND_CONNECTIONS)
# face_bbox = MediapipeProcess.point_to_bbox(face_result)
right_hand_bbox = MediapipeProcess.point_to_bbox(right_hand_result)
left_hand_bbox = MediapipeProcess.point_to_bbox(left_hand_result)
result_dict = {'hand_bbox':[right_hand_bbox,left_hand_bbox]}
# print('hand_bbox:',result_dict)
return result_dict
def point_to_bbox(result_list):
'''
根据关键点坐标获取坐标点的最小外接矩形
'''
result_array = np.array(result_list)
if result_array.all():
rect = cv2.minAreaRect(result_array) # 得到最小外接矩形的(中心(x,y), (宽,高), 旋转角度)
bbox = cv2.boxPoints(rect) # 获取最小外接矩形的4个顶点坐标(ps: cv2.boxPoints(rect) for OpenCV 3.x)
bbox = np.int0(bbox)
bbox=bbox.tolist()
left_top = [min(bbox, key=lambda p: p[0])[0], min(bbox, key=lambda p: p[1])[1]]
right_bottom = [max(bbox, key=lambda p: p[0])[0], max(bbox, key=lambda p: p[1])[1]]
bbox_list = left_top + right_bottom
# print('bbox:',bbox)
# print('bbox_list:',bbox_list)
# bbox_list = []
# bbox_list.append(bbox[0][0])
# bbox_list.append(bbox[0][1])
# bbox_list.append(bbox[2][0])
# bbox_list.append(bbox[2][1])
return bbox_list
else:
pass
# if __name__ == '__main__':
# # media_holistic(video_file='E:/Bank_files/Bank_02/dataset/video_person/after_1/0711-1_199_0.avi',
# video_save_path='E:/Bank_files/Bank_02/videos_mediapipe/test_data/0725_test')

@ -0,0 +1,181 @@
import cv2
import os
import mediapipe as mp
from ultralytics import YOLO
from personDet import analysis_yolov8
import tools_function
from holisticDet import MediapipeProcess
from add_xml import add_xml
from create_xml import create_xml
import queue
class DetProcess():
def __init__(self,person_det_model,hand_det_model):
self.person_det_model = person_det_model
self.hand_det_model = hand_det_model
def get_person_cut(self,frame,det_dict,imgsize):
# person_det_dict = [perdict for perdict in det_dict if list(perdict.keys())[0] == 'person']
# print('person_det_dict:',det_dict)
person_list = tools_function.get_dict_values(det_dict)
# 坐标参数修正
person_bbox_list = tools_function.para_list_correction(images_size=imgsize,bbox_list=person_list,dertpara=5)
frame_list = []
for per_bbox in person_bbox_list:
# 裁剪后人的图片
person_cut_frame = tools_function.img_cut(frame=frame,bbox=per_bbox)
frame_cut_dict = {tuple(per_bbox):person_cut_frame}
frame_list.append(frame_cut_dict)
return frame_list
def hand_det(self,person_cut_frame,per_bbox):
# print('11111')
hands_result = MediapipeProcess.mediapipe_det(image=person_cut_frame,holistic=self.hand_det_model)
hands_result_dict = MediapipeProcess.get_analysis_result(image=person_cut_frame,results=hands_result)
hands_list = tools_function.select_list(hands_result_dict['hand_bbox'])
# print('hands_list:',hands_list)
imgsize2 = person_cut_frame.shape
# 手部坐标修正
hands_bbox_list = tools_function.para_list_correction(images_size=imgsize2,bbox_list=hands_list,dertpara=5)
# print('hands_bbox_list:',hands_bbox_list)
hand_bbox_list = []
for hand in hands_bbox_list:
hands_result_list = tools_function.change_bbox(bbox_person=[per_bbox[0],per_bbox[1]],bbox_hand=hand)
# print('hands_result_list:',hands_result_list)
re_dict = {'hands':hands_result_list}
hand_bbox_list.append(re_dict)
# hands_result_original_dict = {'results':hand_bbox_list}
# print(hands_result_original_dict)
return hand_bbox_list
def save_annotations_xml(self,xml_save_file,save_infors,images):
# images = save_infors['images']
results = save_infors
img = os.path.basename(images)
img_frame = cv2.imread(images)
xml_save_path = os.path.join(xml_save_file,img.split('.')[0] + '.xml')
w,h,d = img_frame.shape
img_shape = (w,h,d,img)
if os.path.isfile(xml_save_path):
add_labels = add_xml(inforsDict=results,xmlFilePath=xml_save_path)
else:
create_new = create_xml(boxs=results,img_shape=img_shape,xml_path=xml_save_path)
def person_cut_process(self,images,img_save_files):
frame = cv2.imread(images)
imgsize = frame.shape
labels_name_list = ['person']
per_det_dict = analysis_yolov8(frame=frame,
model_coco=self.person_det_model,
labels_names=labels_name_list,
confidence_set=0.2)
per_frame_cut = self.get_person_cut(frame=frame,det_dict=per_det_dict,imgsize=imgsize)
per_frame_list = [value for dictionary in per_frame_cut for value in dictionary.values()]
# print('per_frame_list:',per_frame_list)
for id_num,cut_frame in enumerate(per_frame_list):
cut_frame_save = tools_function.img_write(frame=cut_frame,img_file=images,id_num=id_num,save_file=img_save_files)
def hands_det_process(self,images,xml_save_file):
frame = cv2.imread(images)
imgsize = frame.shape
labels_name_list = ['person']
per_det_dict = analysis_yolov8(frame=frame,
model_coco=self.person_det_model,
labels_names=labels_name_list,
confidence_set=0.2)
per_frame_cut = self.get_person_cut(frame=frame,det_dict=per_det_dict,imgsize=imgsize)
for frame_dict in per_frame_cut:
per_bbox = list(frame_dict.keys())[0]
person_cut_frame = list(frame_dict.values())[0]
hands_dict = self.hand_det(person_cut_frame=person_cut_frame,per_bbox=per_bbox)
self.save_annotations_xml(xml_save_file=xml_save_file,save_infors=hands_dict,images=images)
# person_det = self.detect_yolo(images_path=images,labels_name_list=labels_name_list)
def det_process(self,images,xml_save_file):
frame = cv2.imread(images)
imgsize = frame.shape
labels_name_list = ['cell phone','mouse','keyboard']
per_det_dict = analysis_yolov8(frame=frame,
model_coco=self.person_det_model,
labels_names=labels_name_list,
confidence_set=0.2)
self.save_annotations_xml(xml_save_file=xml_save_file,save_infors=per_det_dict,images=images)
if __name__ == '__main__':
images_files = 'images'
images_list = tools_function.get_path_list(images_files)
img_save_files = 'images_cut'
xml_save_file = 'annotations'
# 初始化目标检测
person_model = YOLO("model_files/yolov8x.pt")
# 初始化mediapipe
mp_holistic = mp.solutions.holistic
holistic = mp_holistic.Holistic(min_detection_confidence=0.1,min_tracking_confidence=0.1)
Det = DetProcess(person_det_model=person_model,hand_det_model=holistic)
for images in images_list:
# Det.person_cut_process(images=images,img_save_files=img_save_files)
Det.hands_det_process(images=images,xml_save_file=xml_save_file)
Det.det_process(images=images,xml_save_file=xml_save_file)

@ -0,0 +1,44 @@
def analysis_yolov8(frame, model_coco,labels_names,confidence_set):
# 第一步用COCO数据集推理
results_coco = model_coco(frame)
re_list = []
if results_coco:
for r in results_coco:
boxes = r.boxes
idx = 0
for box in boxes:
idx += 1
b = box.xyxy[0] # get box coordinates in (top, left, bottom, right) format
c = box.cls
# 保存标签和坐标值作为返回结果
blist = b.tolist()
labels_name = model_coco.names[int(c)]
# print('labels_name:',labels_name)
confidence = float(box.conf)
confidence = round(confidence, 2)
# 过滤置信度以下目标
if confidence < confidence_set:
continue
if labels_name in labels_names:
# 一个结果字典
re_dict = {labels_name:blist}
re_list.append(re_dict)
return re_list

@ -0,0 +1,185 @@
import os
import cv2
def get_path_list(file):
retutn_list = []
if os.path.isdir(file):
video_ext = [".mp4", ".avi",".MP4",'.jpg']
for maindir, subdir, file_name_list in os.walk(file):
for filename in file_name_list:
apath = os.path.join(maindir, filename)
ext = os.path.splitext(apath)[1]
if ext in video_ext:
retutn_list.append(apath)
else:
retutn_list.append(file)
return retutn_list
# 获得字典中所有values值这个值是列表)
def get_dict_values(lst):
"""
获取列表中所有字典的 values 如果值是列表
参数:
lst: 包含字典的列表
返回值:
values: 包含所有字典的 values 值的列表如果值是列表
"""
return [value for dictionary in lst for value in dictionary.values() if isinstance(value, list)]
# 解析检测后的结果,为检测后的结果排序
def img_cut(frame,bbox):
frame_target = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
return frame_target
def img_write(frame,img_file,id_num,save_file):
filename = os.path.basename(img_file).split('.')[0] +'_'+ f"person_{id_num}.jpg"
person_image_path = os.path.join(save_file, filename)
cv2.imwrite(person_image_path, frame)
return person_image_path
def change_bbox(bbox_person,bbox_hand):
'''
将小图上手的坐标转换到原图大图的坐标
'''
bbox_xmin = bbox_hand[0] + bbox_person[0]
bbox_xmax = bbox_hand[2] + bbox_person[0]
bbox_ymin = bbox_hand[1] + bbox_person[1]
bbox_ymax = bbox_hand[3] + bbox_person[1]
bbox = [bbox_xmin,bbox_ymin,bbox_xmax,bbox_ymax]
return bbox
def select_list(result_list):
'''
筛选列表中的空列表
'''
if result_list:
result_only = []
for result in result_list:
if result == None:
pass
else:
# result_bbox = select_bbox(result)
result_only.append(result)
return result_only
def para_list_correction(images_size, bbox_list, dertpara):
# bbox_list_iou = del_bbox(bbox_list)
updata_result_list = []
for bbox in bbox_list:
updata_bbox = para_correction(images_size, bbox, dertpara)
size = [(updata_bbox[2] - updata_bbox[0]),
(updata_bbox[3] - updata_bbox[1])]
if determine_zero(size):
if check_bbox(size):
updata_result_list.append(updata_bbox)
return updata_result_list
def para_correction(images_size, bbox, dertpara):
'''
修正检测后标注框过小的情况,如果有修正参数则使用修正参数如果没有就按照坐标值扩大两倍
'''
# if dertpara == 0:
# pass
# else:
w = (bbox[2] - bbox[0]) / int(dertpara)
h = (bbox[3] - bbox[1]) / int(dertpara)
bbox_extand_list_x = [bbox[0] - w, bbox[2] + w]
bbox_extand_list_y = [bbox[1] - h, bbox[3] + h]
bbox_list_x = contrast(
size=images_size[1], bbox_extand_list=bbox_extand_list_x)
bbox_list_y = contrast(
size=images_size[0], bbox_extand_list=bbox_extand_list_y)
bbox_list = [bbox_list_x[0], bbox_list_y[0],
bbox_list_x[1], bbox_list_y[1]]
return bbox_list
def contrast(size, bbox_extand_list):
'''
对比数值是否在这个范围内
'''
# print('bbox_extand_list:',bbox_extand_list)
# print('size:',size)
bbox_list = []
for x in bbox_extand_list:
# print('size:',size)
if 0 <= int(x) <= int(size):
# print('in:',x,size)
bbox_list.append(x)
if int(x) > int(size):
# print('>:',x,size)
bbox_list.append(size)
if int(x) < 0:
# print('<:',x,size)
bbox_list.append(0)
# print('bbox_list:',bbox_list)
return bbox_list
def determine_zero(num_list):
for num in num_list:
if num == 0:
return False
return True
def check_bbox(size):
ratio = size[0]/size[1]
if ratio >= 5:
return False
if ratio <= 1/5:
return False
else:
return True
Loading…
Cancel
Save