Compare commits

..

4 Commits
main ... V0.1.1

@ -0,0 +1,95 @@
from xml.etree.ElementTree import ElementTree, Element
# xml换行
def indent(elem, level=0):
i = "\n" + level*"\t"
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + "\t"
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
indent(elem, level+1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
def add_xml(inforsDict,xmlFilePath):
result = inforsDict
for re in result:
# if re['score'] > 0.5:
# 获得标注信息
ObjName = list(re.keys())[0]
xmin = int(list(re.values())[0][0])
ymin = int(list(re.values())[0][1])
xmax = int(list(re.values())[0][2])
ymax = int(list(re.values())[0][3])
# xmax = xmin + r
# ymax = ymin + z
#if ObjName == 'person':
tree = ElementTree()
tree.parse(xmlFilePath)
# 得到根目录
root = tree.getroot()
# 创建一级目录
elementOjb = Element('object')
elementBox = Element('bndbox')
# 创建二级目录
one = Element('name')
one.text = ObjName # 二级目录的值 #结果展示:<id>1</id>
elementOjb.append(one) # 将二级目录加到一级目录里
two = Element('pose')
two.text = "Unspecified"
elementOjb.append(two)
three = Element('truncated')
three.text = "0"
elementOjb.append(three)
four = Element('difficult')
four.text = "0"
elementOjb.append(four)
five = Element('xmin')
five.text = str(xmin)
elementBox.append(five)
six = Element('xmax')
six.text = str(xmax)
elementBox.append(six)
seven = Element('ymin')
seven.text = str(ymin)
elementBox.append(seven)
eight = Element('ymax')
eight.text = str(ymax)
elementBox.append(eight)
# 将一级目录加到根目录里
elementOjb.append(elementBox)
root.append(elementOjb)
# 换行缩进
indent(elementOjb)
indent(elementBox)
# 让结果保存进文件就可以了
tree.write(xmlFilePath, encoding='utf-8', xml_declaration=True)

@ -0,0 +1,43 @@
import os
import threading
class mp4_to_H264():
def __init__(self):
pass
def convert_avi(self, input_file, output_file, ffmpeg_exec="ffmpeg"):
ffmpeg = '{ffmpeg} -y -i "{infile}" -c:v libx264 -strict -2 "{outfile}"'.format(ffmpeg=ffmpeg_exec,
infile=input_file,
outfile=output_file)
f = os.popen(ffmpeg)
ffmpegresult = f.readline()
# s = os.stat(output_file)
# fsize = s.st_size
return ffmpegresult
def convert_avi_to_webm(self, input_file, output_file, ffmpeg_exec="ffmpeg"):
return self.convert_avi(input_file, output_file, ffmpeg_exec="ffmpeg")
def convert_avi_to_mp4(self, input_file, output_file, ffmpeg_exec="ffmpeg"):
return self.convert_avi(input_file, output_file, ffmpeg_exec="ffmpeg")
def convert_to_avcmp4(self, input_file, output_file, ffmpeg_exec="ffmpeg"):
email = threading.Thread(target=self.convert_avi, args=(input_file, output_file, ffmpeg_exec,))
email.start()
def convert_byfile(self, from_path, to_path):
if not os.path.exists(from_path):
print("Sorry, you must create the directory for the output files first")
if not os.path.exists(os.path.dirname(to_path)):
os.makedirs(os.path.dirname(to_path), exist_ok=True)
directory, file_name = os.path.split(from_path)
raw_name, extension = os.path.splitext(file_name)
print("Converting ", from_path)
self.convert_avi_to_mp4(from_path, to_path)
# a = mp4_to_H264()
# from_path = '/home/xbank/xbank_poc_test_use/video_save_path/hands/20231012/100932541744.mp4'
# to_path = '/home/xbank/xbank_poc_test_use/video_save_path/hands/20231012/100932541744.mp4'
# a.convert_byfile(from_path, to_path)

@ -0,0 +1,7 @@
import yaml
from pathlib import Path
def get_configs(ymal_files):
yaml_path = Path(__file__).parent / ymal_files
with yaml_path.open("r", encoding="utf-8") as f:
return yaml.load(f, Loader=yaml.FullLoader)

@ -0,0 +1,25 @@
import cv2
import os
def get_dir_file(path,ext_list):
# video_ext = [".mp4", ".avi", ".MP4"]
file_names = []
for maindir, subdir, file_name_list in os.walk(path):
for filename in file_name_list:
apath = os.path.join(maindir, filename)
ext = os.path.splitext(apath)[1]
if ext in ext_list:
file_names.append(apath)
return file_names
def get_imgframe(img_path_list):
imgframe_list = []
for img in img_path_list:
images = cv2.imread(img)
imgframe_dict = {"path":img,'frame':images}
imgframe_list.append(imgframe_dict)
return imgframe_list

@ -0,0 +1,9 @@
def rtsp_para(scource):
if scource.split('://')[0] == 'rtsp':
return True
else:
return False

@ -0,0 +1,23 @@
def det_img(model_inference,images_frame,confidence,label_name_list):
result = model_inference.predict(images_frame)
bbox_list = result.boxes
labels_ids_list = result.label_ids
scores_list = result.scores
result_list = []
for i in range(len(labels_ids_list)):
if scores_list[i] > confidence:
if int(labels_ids_list[i]) in range(len(label_name_list)):
result_dict = {label_name_list[int(labels_ids_list[i])]:bbox_list[i]}
result_list.append(result_dict)
return result_list

@ -0,0 +1,48 @@
from analysis_result.tools_analysis import select_bbox_by_labels, iou_result_two
def same_model_img_analysis_labels(example_list, result_dicts_list, relevancy, relevancy_para):
'''
example_list: 需要关联处理的目标标签集合
result_dicts_list: 当前图片检测结果格式为[{labels1:result1},{labels2:result2},{labels2:result2}...]
'''
# 获得当前结果的标签集
result_labels_list = [list(re.keys())[0] for re in result_dicts_list]
# print('result_labels_list:',result_labels_list)
# 判断是否存在example_list中的所有标签
continue_para = False
compara_labellist = [
label for label in example_list if label not in result_labels_list]
if compara_labellist:
return continue_para
else:
result_dicts_list_change = select_bbox_by_labels(
example_list, result_dicts_list)
if relevancy == 'overlap':
if len(example_list) == 2:
overlap_list = iou_result_two(
result_dicts_list_change, relevancy_para)
return overlap_list
if relevancy == 'in_bbox':
pass
def model_labels_selet(example_list, result_dict_list):
'''
直接从result中获得目标列表
'''
compara_labellist = [result_dict for result_dict in result_dict_list if list(
result_dict.keys())[0] in example_list]
return compara_labellist

@ -0,0 +1,74 @@
def iou_result_two(result_dict_list,iou_para):
result_0 = list(result_dict_list[0].values())[0]
result_1 = list(result_dict_list[1].values())[0]
label_0 = list(result_dict_list[0].keys())[0]
label_1 = list(result_dict_list[1].keys())[0]
return_lists = []
for re_0 in result_0:
for re_1 in result_1:
iou = calculate_iou(re_0, re_1)
if iou > iou_para:
label_dict_0 = {label_0: re_0}
label_dict_1 = {label_1: re_1}
return_lists.append(label_dict_0)
return_lists.append(label_dict_1)
return return_lists
def calculate_iou(box1, box2):
"""
计算两个边界框之间的IoU值
参数:
box1: 边界框1的坐标x1, y1, x2, y2
box2: 边界框2的坐标x1, y1, x2, y2
返回值:
iou: 两个边界框之间的IoU值
"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
# 计算交集区域面积
intersection_area = max(0, x2 - x1 + 1) * max(0, y2 - y1 + 1)
# 计算边界框1和边界框2的面积
box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)
# 计算并集区域面积
union_area = box1_area + box2_area - intersection_area
# 计算IoU值
iou = intersection_area / union_area
return iou
def select_bbox_by_labels(example_list, result_dicts_list):
'''
将result列表中的example列表内的bbox挑选出来,放到同一字典下
'''
result_dict_list_change = []
for label in example_list:
bbox_list = []
for result in result_dicts_list:
if list(result.keys())[0] == label:
bbox_list.append(list(result.values())[0])
result_dict_change = {label: bbox_list}
result_dict_list_change.append(result_dict_change)
return result_dict_list_change

@ -0,0 +1,32 @@
# detect_0:
# source : rtsp://admin:@192.168.10.203
# model: ../config_phone.yaml
detect_1:
source : rtsp://admin:@192.168.10.203
model: ../config_phone.yaml
# detect_2:
# source : E:/Bank_files/Bank_03/dataset/dataset_now/phone/4/images
# model: ../config_sleep.yaml
# detect_3:
# source : rtsp://admin:@192.168.10.11
# model: /home/yaxin/xbank/xbank_poc_test/config_sleep.yaml
# detect_4:
# source : rtsp://admin:@192.168.10.11
# model: /home/yaxin/xbank/xbank_poc_test/config_phone.yaml
# detect_5:
# source : rtsp://admin:@192.168.10.18
# model: /home/yaxin/xbank/xbank_poc_test/config_phone.yaml
# detect_6:
# source : /home/yaxin/xbank/xbank_poc_test/images_test/images_del
# model: /home/yaxin/xbank/xbank_poc_test/config_phone.yaml
# detect_7:
# source : /home/xbank/xbank_poc_test_use/images_test/images_del

@ -0,0 +1,30 @@
# load model file
model: ./model_file/yolov8_person.onnx
model_cache: ./tensort_cache/yolov8_person.trt
# label and bbox message set
model_parameter:
device : cpu
label_names: ["person","bicycle","car",motorcycle,airplane,bus,train,truck,boat,traffic light,fire hydrant,stop sign,parking meter,bench,bird,cat, dog,horse,sheep, cow,elephant, bear,zebra,giraffe, backpack,umbrella,handbag,tie,suitcase,frisbee,skis,snowboard,sports ball,kite,baseball bat,baseball glove,skateboard,surfboard,tennis racket, bottle, wine glass,cup,fork,knife,spoon, bowl,banana,apple,sandwich,orange,broccoli,carrot,hot dog, pizza,donut,cake,chair,couch,potted plant,bed,dining table,toilet,tv,laptop,mouse,remote,keyboard,cell phone,microwave, oven,toaster,sink,refrigerator,book,clock,vase,scissors,teddy bear,hair drier,toothbrush] # model labels
# label_names: ["person"]
compara_label_names: ["person"] #
compara_relevancy: False # 'object_num'
relevancy_para : False
object_num_min : False # 视频中最低人数
confidence : 0.2
save_path_original : False
test_path : False
save_annotations : False
save_path : False
# save videos
save_videos : False
# detect time set
detect_time : 30
detect_time_small : 10
detect_ratio : 0.5

@ -0,0 +1,30 @@
# load model file
model: E:/Bank_files/Bank_03/xbank_poc_test_use/model_file/yolov5_4best.onnx
model_cache: E:/Bank_files/Bank_03/xbank_poc_test_use/tensort_cache/yolov5_4best.trt
# label and bbox message set
model_parameter:
device : cpu
label_names: ["Keypad","hands","keyboard", "mouse","phone"] # model labels
compara_label_names: ["hands","phone"] #
compara_relevancy: overlap # 'in_bbox','overlap'
relevancy_para : 0.01
object_num_min : False
confidence : 0.2
save_path : False
test_path : False
save_path_original : False
save_annotations : False
# save videos
save_videos : E:/Bank_files/Bank_03/xbank_poc_test_use/video_save_path/sleep
# detect time set
detect_time : 30
detect_time_small : 10
detect_ratio : 0.5

@ -0,0 +1,29 @@
# load model file
model: ./model_file/yolov5_640_sleep.onnx
model_cache: ./tensort_cache/yolov5_640_sleep.trt
# label and bbox message set
model_parameter:
device : cpu
label_names: ["person","sleep"] # model labels
compara_label_names: ["sleep","person"] #
compara_relevancy: False # 'in_bbox'
relevancy_para : False
object_num_min : False
confidence : 0.2
save_path_original : False
test_path : False
save_annotations : False
save_path : False
# save videos
save_videos : ./video_save_path/sleep
# detect time set
detect_time : 30
detect_time_small : 10
detect_ratio : 0.5

@ -0,0 +1,53 @@
from lxml.etree import Element, SubElement, tostring
def create_xml(boxs, img_shape, xml_path):
"""
创建xml文件依次写入xml文件必备关键字
:param boxs: txt文件中的box
:param img_shape: 图片信息xml中需要写入WHC
:return:
"""
node_root = Element('annotation')
node_folder = SubElement(node_root, 'folder')
node_folder.text = 'Images'
node_filename = SubElement(node_root, 'filename')
node_filename.text = str(img_shape[3])
node_size = SubElement(node_root, 'size')
node_width = SubElement(node_size, 'width')
node_width.text = str(img_shape[1])
node_height = SubElement(node_size, 'height')
node_height.text = str(img_shape[0])
node_depth = SubElement(node_size, 'depth')
node_depth.text = str(img_shape[2])
if len(boxs) >= 1: # 循环写入box
for box in boxs:
node_object = SubElement(node_root, 'object')
node_name = SubElement(node_object, 'name')
# if str(list_[4]) == "person": # 根据条件筛选需要标注的标签,例如这里只标记person这类不符合则直接跳过
# node_name.text = str(list_[4])
# else:
# continue
node_name.text = str(list(box.keys())[0])
node_difficult = SubElement(node_object, 'difficult')
node_difficult.text = '0'
node_bndbox = SubElement(node_object, 'bndbox')
node_xmin = SubElement(node_bndbox, 'xmin')
node_xmin.text = str(int(list(box.values())[0][0]))
node_ymin = SubElement(node_bndbox, 'ymin')
node_ymin.text = str(int(list(box.values())[0][1]))
node_xmax = SubElement(node_bndbox, 'xmax')
node_xmax.text = str(int(list(box.values())[0][2]))
node_ymax = SubElement(node_bndbox, 'ymax')
node_ymax.text = str(int(list(box.values())[0][3]))
xml = tostring(node_root, pretty_print=True) # 格式化显示,该换行的换行
# file_name = img_shape[3].split(".")[0]
# filename = xml_path+"/{}.xml".format(file_name)
f = open(xml_path, "wb")
f.write(xml)
f.close()

@ -0,0 +1,36 @@
import cv2
def drawing_frame(images_frame,
result_list,
line_1=3,
line_2=1,
color_1=(0, 0, 255),
color_2=(0, 255, 255),
txt_num=0.75,
scores_list_id=False):
'''
result_dicts_list: 格式为[{labels1:result1},{labels2:result2},{labels2:result2}...]
'''
# img = images_frame.copy()
for result_list in result_list:
label_name = list(result_list.keys())[0]
bbox_list = list(result_list.values())[0]
print(bbox_list)
if scores_list_id:
scores_list = result_list['scores_list']
lables = str(label_name) + "__" + str(scores_list)
else:
lables = str(label_name)
cv2.rectangle(images_frame, (int(bbox_list[0]), int(bbox_list[1])), (int(
bbox_list[2]), int(bbox_list[3])), color_1, line_1)
cv2.putText(images_frame, lables, (int(bbox_list[0]) - 10, int(
bbox_list[1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, txt_num, color_2, line_2)
return images_frame

@ -0,0 +1,61 @@
#coding: utf-8
from main_process import data_load
from analysis_data.config_load import get_configs
from multiprocessing import Pool
from loguru import logger
import os
# 日志配置
log_path = os.path.join(__file__, "../logs/xbank.log")
logger.add(
log_path, rotation="60 MB", enqueue=True, backtrace=True, diagnose=True, retention=30
)
logger.info("*************************** xbank start ***************************")
def get_args_list(args_data):
args_list = []
for args in args_data:
args_det = args_data[args]
det_config = args_det['model']
det_source = args_det['source']
det_list = [det_source, det_config]
args_list.append(det_list)
return args_list
def start_worker(args_list_n):
while True:
try:
logger.info(f" process {args_list_n} crashed. Starting...")
data_load(args_list_n)
except Exception as e:
logger.info(f" process {args_list_n} crashed. Restarting...")
logger.debug(e)
def main():
# 加载配置文件
args = '../config_det.yaml'
args_data = get_configs(ymal_files=args)
args_list = get_args_list(args_data)
process_num = len(args_list)
with Pool(process_num) as pool:
pool.map(start_worker, args_list)
if __name__ == "__main__":
main()

@ -0,0 +1,536 @@
from analysis_result.get_model_result import det_img
from analysis_result.same_model_img import same_model_img_analysis_labels, model_labels_selet
from model_load.model_load import Load_model
from drawing_img.drawing_img import drawing_frame
from analysis_data.data_rtsp import rtsp_para
from analysis_data.data_dir_file import get_dir_file
from analysis_data.config_load import get_configs
from add_xml import add_xml
from create_xml import create_xml
from analysis_data.change_video import mp4_to_H264
import cv2
import os
import time
from datetime import datetime
import json
from loguru import logger
import logging
import logstash
host = '192.168.10.96'
xbank_logger = logging.getLogger('python-logstash-logger')
xbank_logger.setLevel(logging.INFO)
xbank_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1))
def data_load(args):
source = args[0]
model_ymal = args[1]
# 数据加载
rtsp_source = rtsp_para(source)
dir_source = os.path.isdir(source)
file_source = os.path.isfile(source)
# # 模型加载
model_data = get_configs(model_ymal)
model_inference = Load_model(model_file=model_data["model"],
device=model_data["model_parameter"]['device'],
cache_file=model_data["model_cache"])
if rtsp_source:
rtsp_detect_process(source=source, model_data=model_data,
model_inference=model_inference)
if dir_source:
dir_source_process(source, model_inference, model_data)
if file_source:
file_source_process(source, model_inference, model_data)
def rtsp_detect_process(source, model_data, model_inference):
cap = cv2.VideoCapture(source)
logger.info(f"视频流{source}读取中...")
# 视频流信息
fps = int(cap.get(cv2.CAP_PROP_FPS))
fps_num = fps*model_data['detect_time']
fps_num_small = fps*model_data['detect_time_small']
size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
i = 0
j = 0
n = 0
det_t_num = 0
nodet_t_num = 0
det_img = []
video_name_time = 0
det_fps_time = []
while True:
try:
ret, frame = cap.read()
i += 1
j += 1
# 读取到当前视频帧时间
data_now = datetime.now()
get_time = str(data_now.strftime("%H")) + \
str(data_now.strftime("%M")) + str(data_now.strftime("%S")) + \
str(data_now.strftime("%f"))
imgframe_dict = {"path": source, 'frame': frame,
'get_fps': j, 'get_time': get_time}
# 视频暂时保存路径
if video_name_time == 0:
video_name_time = get_time
video_path = video_name(
video_name_base=video_name_time, save_path=model_data['save_videos'], save_file='temp')
out_video = cv2.VideoWriter(
video_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, size)
logger.info(f"视频{video_path}已经暂时保存...")
# 模型推理
images_det_result = img_process(
imgframe_dict, model_inference, model_data)
images_update = save_process(
imgframe_dict, images_det_result, model_data)
# 结果判断t
if images_det_result:
det_t_num += 1
if len(det_img) == 0:
img_dict = images_update.copy()
det_img.append(img_dict)
if not images_det_result and len(det_img) > 0:
nodet_t_num += 1
if (det_t_num + nodet_t_num) >= fps_num_small:
para = determine_time(
det_num=det_t_num, nodet_num=nodet_t_num, ratio_set=model_data['detect_ratio'])
if para:
first_fps_time = det_img[0]
first_fps_time.update(
{"dert_fps": (j-int(first_fps_time['get_fps'])+1)})
det_fps_time.append(first_fps_time)
det_img.clear()
det_t_num = 0
nodet_t_num = 0
# 视频保存
out_video.write(images_update['frame'])
# 结果判断 T
if j >= fps_num:
try:
out_video.release()
except Exception:
logger.exception(f"视频release失败")
else:
logger.info("视频release成功")
# T时间截至判断t时间结果。
if det_img:
para = determine_time(
det_num=det_t_num, nodet_num=nodet_t_num, ratio_set=model_data['detect_ratio'])
first_fps_time = det_img[0]
time_1 = (j-int(first_fps_time['get_fps'])+1)
if para and time_1 >= (fps_num_small/2):
first_fps_time.update(
{"dert_fps": (j-int(first_fps_time['get_fps'])+1)})
det_fps_time.append(first_fps_time)
if det_fps_time:
det_fps_time = determine_duration(result_list=det_fps_time)
# 转换后视频保存路径
save_video_name = os.path.basename(video_path)
only_video_name = save_video_name.split('.')[0]
save_video_path = os.path.join(model_data['save_videos'], (str(data_now.strftime(
"%Y")) + str(data_now.strftime("%m")) + str(data_now.strftime("%d"))))
save_video_path = os.path.join(
save_video_path, only_video_name)
# 路径
save_video = os.path.join(save_video_path, save_video_name)
json_path = os.path.join(
save_video_path, only_video_name + '.json')
images_path = os.path.join(save_video_path, 'images')
# 转换视频、保存视频
change_video = mp4_to_H264()
change_video.convert_byfile(video_path, save_video)
# 保存图片
update_det_fps = video_cut_images_save(
det_list=det_fps_time, images_path=images_path)
# print(update_det_fps)
# 保存json文件
re_list, result_lables = json_get(
time_list=update_det_fps, video_path=save_video, fps=fps)
result_path = json_save(re_list, json_path)
# send_message(update_det_fps=update_det_fps, result_path=result_path,
# source=source, result_lables=result_lables)
else:
# print(video_path)
os.remove(video_path)
logger.info(f"未检测到目标信息的视频{video_path}删除成功")
logger.info('开始信息重置')
det_img.clear()
det_fps_time.clear()
det_t_num = 0
nodet_t_num = 0
video_name_time = 0
j = 0
# print('det_fps_time:', det_fps_time,'det_img:',det_img)
# t2 = time.time()
# tx = t2 - t1
# logger.info(f'检测一张图片的时间为:{tx}.')
except Exception as e:
logger.debug(f"读帧率失败{source}未读到...")
logger.debug(e)
cap.release()
cap = cv2.VideoCapture(source)
logger.info(f"摄像头{source}重新读取")
# break
def video_name(video_name_base, save_path, save_file):
video_name_base = video_name_base
savePath = os.path.join(save_path, save_file)
if not os.path.exists(savePath):
os.makedirs(savePath)
video_path = os.path.join(
savePath, video_name_base + '.mp4')
return video_path
def dir_source_process(source, model_inference, model_data):
img_ext = [".jpg", ".JPG", ".bmp"]
video_ext = [".mp4", ".avi", ".MP4"]
img_list = get_dir_file(source, img_ext)
video_list = get_dir_file(source, video_ext)
if img_list:
for img in img_list:
t1 = time.time()
images = cv2.imread(img)
imgframe_dict = {"path": img, 'frame': images}
images_update = img_process(
imgframe_dict, model_inference, model_data)
t2 = time.time()
tx = t2 - t1
print('检测一张图片的时间为:', tx)
if video_list:
pass
def file_source_process(source, model_inference, model_data):
img_para = True
if img_para:
images = cv2.imread(source)
imgframe_dict = {"path": source, 'frame': images}
images_update = img_process(
imgframe_dict, model_inference, model_data)
def img_process(images, model_inference, model_data):
# t1 = time.time()
# 检测每帧图片,返回推理结果
results = det_img(model_inference=model_inference,
images_frame=images['frame'],
confidence=model_data["model_parameter"]['confidence'],
label_name_list=model_data["model_parameter"]['label_names'])
# print(results)
# print(images['path'])
# 根据需要挑选标注框信息
select_labels_list = model_labels_selet(example_list=model_data["model_parameter"]['compara_label_names'],
result_dict_list=results)
if model_data["model_parameter"]['compara_relevancy']:
# 需要根据的逻辑判断标注框信息
determine_bbox = same_model_img_analysis_labels(example_list=model_data["model_parameter"]['compara_label_names'],
result_dicts_list=select_labels_list,
relevancy=model_data["model_parameter"]['compara_relevancy'],
relevancy_para=model_data["model_parameter"]['relevancy_para'])
else:
determine_bbox = select_labels_list
# print(determine_bbox)
if model_data['model_parameter']['object_num_min']:
if len(determine_bbox) >= model_data["model_parameter"]['object_num_min']:
determine_bbox.clear()
# logger.debug(f"正确获得检测后的信息{determine_bbox}...")
# 返回检测后结果
return determine_bbox
def save_process(images, determine_bbox, model_data):
if determine_bbox:
images.update({"results": determine_bbox})
if model_data['save_path_original']:
imgname_original = images_save(images=images,
save_path=model_data["save_path_original"])
img_save = drawing_frame(
images_frame=images['frame'], result_list=determine_bbox)
images.update({"img_save": img_save})
if model_data["save_path"]:
imgname = images_save(
images=images, save_path=model_data["save_path"])
if model_data["save_annotations"]:
if not os.path.exists(model_data["save_annotations"]):
os.makedirs(model_data["save_annotations"])
save_annotations_xml(
xml_save_file=model_data["save_annotations"], save_infors=determine_bbox, images=images['path'])
else:
pass
else:
# 没检测出来的图片是否保存
if model_data["test_path"]:
imgname = images_save(
images=images, save_path=model_data["test_path"])
# print('no:',images['path'],imgname)
# 展示显示
# if images['path'] == 'rtsp://admin:@192.168.10.11':
# cv2.namedWindow('11', cv2.WINDOW_NORMAL)
# cv2.imshow('11',images['frame'])
# cv2.waitKey(1)
# cv2.destroyAllWindows()
# t2 = time.time()
return images
def images_save(images, save_path):
# 保存时候时间为图片名
images_name = images['get_time'] + '.jpg'
if not os.path.exists(save_path):
os.makedirs(save_path)
full_name = os.path.join(save_path, images_name)
cv2.imwrite(full_name, images['img_save'])
return full_name
def save_annotations_xml(xml_save_file, save_infors, images):
results = save_infors
img = os.path.basename(images)
img_frame = cv2.imread(images)
xml_save_path = os.path.join(xml_save_file, img.split('.')[0] + '.xml')
w, h, d = img_frame.shape
img_shape = (w, h, d, img)
if os.path.isfile(xml_save_path):
add_xml(inforsDict=results,
xmlFilePath=xml_save_path)
else:
create_xml(boxs=results,
img_shape=img_shape,
xml_path=xml_save_path)
def determine_time(det_num, nodet_num, ratio_set):
ratio = det_num / (det_num + nodet_num)
if ratio >= ratio_set:
return True
else:
return False
def determine_duration(result_list):
i = 0
while i < len(result_list) - 1:
dict_i = result_list[i]
dict_j = result_list[i + 1]
if 'get_fps' in dict_i and 'dert_fps' in dict_i and 'get_fps' in dict_j:
num_i = int(dict_i['get_fps'])
dura_i = int(dict_i['dert_fps'])
num_j = int(dict_j['get_fps'])
if num_i + dura_i == num_j:
dura_j = int(dict_j['dert_fps'])
dura_update = dura_i + dura_j
dict_i['dert_fps'] = dura_update
result_list.pop(i + 1)
else:
i += 1
else:
i += 1
return result_list
# print('2:', result_list)
def json_get(time_list, video_path, fps):
result_dict = {'info': {'video_path': video_path, 'fps': fps}}
re_dict = {}
for i, det_dict in enumerate(time_list):
list_hands = ["Keypad", "hands", "keyboard", "mouse", "phone"]
list_sleep = ["sleep"]
list_person = ["person"]
if list(det_dict['results'][0].keys())[0] in list_hands:
result_lables = 'playing_phone'
if list(det_dict['results'][0].keys())[0] in list_sleep:
result_lables = "sleep"
if list(det_dict['results'][0].keys())[0] in list_person:
result_lables = "person"
fps_dict = {'time': det_dict['get_fps'],
'duration': det_dict['dert_fps'],
'images_path': det_dict['images_path']}
re_dict.update({('id_' + str(i)): fps_dict})
result_dict.update({'result': re_dict})
return result_dict, result_lables
def json_save(result_dict, json_path):
result = json.dumps(result_dict)
f = open(json_path, 'w')
f.write(result + '\n')
f.close
return json_path
def video_cut_images_save(det_list, images_path):
for det_dict in det_list:
images_path_full = images_save(images=det_dict, save_path=images_path)
del det_dict['frame']
del det_dict['get_time']
det_dict.update({'images_path': images_path_full})
return det_list
def send_message(update_det_fps, result_path, source, result_lables):
for det_dict in update_det_fps:
extra = {
'worker': 'xbank',
'time': det_dict['get_fps'],
'config_file': result_path,
'source': source,
'type': result_lables
}
xbank_logger.info('xBank_infer', extra=extra)
logger.info(f'发送信息{extra}')
# if __name__ == '__main__':
# data_load(['rtsp://admin:@192.168.10.203',
# 'E:/Bank_files/Bank_03/xbank_poc_test_use/config_phone.yaml'])

@ -0,0 +1,50 @@
import fastdeploy as fd
import os.path as op
def build_option(device, backend, cache_file):
"""
创建Runtime运行选项
device设备CPU or GPU
backend:TensorRT引擎
"""
option = fd.RuntimeOption()
option.use_cpu()
option.trt_option.serialize_file = cache_file
if device.lower() == "gpu":
option.use_gpu(0)
if backend.lower() == "trt":
assert device.lower(
) == "gpu", "TensorRT backend require inference on device GPU."
option.use_trt_backend()
return option
def Load_model(model_file, device, cache_file):
"""
加载模型的tensorRT引擎
model_file模型权重 格式".onnx"
device设备选择
"""
model_name = op.basename(model_file).split('_')[0]
# print(model_file)
runtime_option = build_option(
device=device, backend="trt", cache_file=cache_file)
model_inference = []
if model_name == "yolov5":
model = fd.vision.detection.YOLOv5(
model_file, runtime_option=runtime_option)
model_inference = model
elif model_name == "yolov8":
model = fd.vision.detection.YOLOv8(
model_file, runtime_option=runtime_option)
model_inference = model
# print(model_inference)
return model_inference

@ -1 +1,70 @@
这是一个新的仓库
1、主程序运行main.py文件
2、main.py文件中args加载检测配置文件config_det.yaml这部分更改了使用相对路径请按照示例格式设置路径
3、config_det.yaml中配置
# 名称随意,不可重复
detect_0:
# scource 设置rtsp流或者图片文件夹
source : rtsp://admin:@192.168.10.18
# 配置检测的模型配置文件
model: /home/yaxin/xbank/xbank_poc_test/config_phone.yaml
4、模型配置文件config_检测名称.yaml
# load model file
# 模型的绝对路径
model: /home/yaxin/xbank/xbank_poc_test/model_file/yolov5_XXX.onnx
# 设置cache不用更改
model_cache: /home/yaxin/xbank/xbank_poc_test/tensort_cache/yolov5_XXX.trt
# label and bbox message set
# 模型检测中需要的参数
model_parameter:
# 使用gpu检测
device : gpu
# 检测模型训练时的标签名列表,标签名顺序需要跟训练时候分配数据集时候的顺序一致,不用更改
label_names: ["Keypad","hands","keyboard", "mouse","phone"] # model labels
# 检测到的类别中需要挑选出来的目标
compara_label_names: ["hands","phone"]
# 是否对挑选出来进行推理的目标进行其标签框之间关系进行推理如果不需要就设置成FALSE默认不用更改
compara_relevancy: 'overlap' # 'in_bbox'
# 需要的参数
relevancy_para : 0
# 设置检测出来的目标过滤的置信度
confidence : 0.2
# 统计检测到的目标最低数量不需要统计则设置为False
object_num_min : 5
# 保存检测后的图片的路径不保存使用False
save_path : /home/yaxin/xbank/xbank_poc_test/save_path/hands
# 检测到的图片的原始图片的路径不保存使用False
save_path_original : /home/yaxin/xbank/xbank_poc_test/save_path_original/hands
# 未检测到目标的图片不保存设置为False默认保存
test_path : /home/yaxin/xbank/xbank_poc_test/test_save_path/hands
# 检测到的目标的图片是否保存检测信息为xml标注信息不保存设置为False
save_annotations : False
# 保存检测到目标的视频片段
# save videos
# 视频保存路径,
save_videos : /home/xbank/xbank_poc_test_use/video_save_path/person
# 检测中时长的判断
detect time set
# 保存的长视频的时长单位为s
detect_time : 60
# 判断是否为目标行为的时长单位为S
detect_time_small : 5
# 在单位时长中,判定为目标行为的目标信息在总检测信息中的比例
detect_ratio : 0.9
5、使用本地环境
conda activate fastdeploy
修改过配置文件后运行Python main.py

@ -1 +0,0 @@
V0.1.0
Loading…
Cancel
Save