You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

453 lines
13 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from analysis_result.get_model_result import det_img
from analysis_result.same_model_img import same_model_img_analysis_labels, model_labels_selet
from model_load.model_load import Load_model
from drawing_img.drawing_img import drawing_frame
from analysis_data.data_rtsp import rtsp_para
from analysis_data.data_dir_file import get_dir_file, get_imgframe
from analysis_data.config_load import get_configs
from add_xml import add_xml
from create_xml import create_xml
import yaml
import cv2
import os
from pathlib import Path
import time
from datetime import datetime
import glob
import json
def data_load(args):
source = args[0]
model_ymal = args[1]
# 数据加载
rtsp_source = rtsp_para(source)
dir_source = os.path.isdir(source)
file_source = os.path.isfile(source)
# # 模型加载
model_data = get_configs(model_ymal)
model_inference = Load_model(model_file=model_data["model"],
device=model_data["model_parameter"]['device'],
cache_file=model_data["model_cache"])
if rtsp_source:
rtsp_detect_process(source=source, model_data=model_data,
model_inference=model_inference)
if dir_source:
dir_source_process(source, model_inference, model_data)
if file_source:
file_source_process(source, model_inference, model_data)
def rtsp_detect_process(source, model_data, model_inference):
cap = cv2.VideoCapture(source)
# 视频流信息
fps = int(cap.get(cv2.CAP_PROP_FPS))
fps_num = fps*model_data['detect_time']
fps_num_small = fps*model_data['detect_time_small']
size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
try:
i = 0
j = 0
det_t_num = 0
nodet_t_num = 0
det_img = []
video_name_time = 0
det_fps_time = []
while True:
ret, frame = cap.read()
t1 = time.time()
if not ret:
continue # 如果未成功读取到视频帧,则继续读取下一帧
i = i + 1
j = j + 1
# 读取到当前视频帧时间
data_now = datetime.now()
get_time = str(data_now.strftime("%H")) + \
str(data_now.strftime("%M")) + str(data_now.strftime("%S")) + \
str(data_now.strftime("%f"))
# 视频保存
if video_name_time == 0:
video_name_time = get_time
savePath = os.path.join(model_data['save_videos'], (str(data_now.strftime(
"%Y")) + str(data_now.strftime("%m")) + str(data_now.strftime("%d"))))
if not os.path.exists(savePath):
os.makedirs(savePath)
video_path = os.path.join(
savePath, video_name_time + '.avi')
print('video_path:', video_path)
out_video = cv2.VideoWriter(
video_path, cv2.VideoWriter_fourcc(*'DIVX'), fps, size)
print(source, data_now, i, j,video_path)
# 推理部分
imgframe_dict = {"path": source, 'frame': frame, 'get_fps': j}
images_det_result = img_process(
imgframe_dict, model_inference, model_data)
images_update = save_process(
imgframe_dict, images_det_result, model_data)
# print('images_det_result:', len(images_det_result))
# 结果判断t
if images_det_result:
det_t_num = det_t_num + 1
if len(det_img) == 0:
img_dict = images_update.copy()
del img_dict['frame']
det_img.append(img_dict)
if not images_det_result and len(det_img) > 0:
nodet_t_num = nodet_t_num + 1
if (det_t_num + nodet_t_num) >= fps_num_small:
para = determine_time(
det_num=det_t_num, nodet_num=nodet_t_num, ratio_set=model_data['detect_ratio'])
if para:
first_fps_time = det_img[0]
first_fps_time.update(
{"dert_fps": (j-int(first_fps_time['get_fps'])+1)})
det_fps_time.append(first_fps_time)
det_img.clear()
det_t_num = 0
nodet_t_num = 0
# 视频保存
out_video.write(images_update['frame'])
# 结果判断 T
if j >= fps_num:
out_video.release()
# T时间截至判断t时间结果。
if det_img:
para = determine_time(
det_num=det_t_num, nodet_num=nodet_t_num, ratio_set=model_data['detect_ratio'])
first_fps_time = det_img[0]
# print(j-int(first_fps_time['get_fps'])+1)
# print(fps_num_small/2)
if (j-int(first_fps_time['get_fps'])+1) >= (fps_num_small/2):
first_fps_time.update(
{"dert_fps": (j-int(first_fps_time['get_fps'])+1)})
det_fps_time.append(first_fps_time)
# print('det_fps_time:', det_fps_time)
if det_fps_time:
re_list = json_get(
time_list=det_fps_time, video_path=video_path,fps=fps)
json_save(re_list)
else:
print(video_path)
os.remove(video_path)
print('----------------------------------------------clear videos-----------------------------------------------')
# 重置
print('----------------------------------------------next-----------------------------------------------')
det_img.clear()
det_fps_time.clear()
det_t_num = 0
nodet_t_num = 0
video_name_time = 0
j = 0
# print('det_fps_time:', det_fps_time,'det_img:',det_img)
t2 = time.time()
tx = t2 - t1
print('检测一张图片的时间为:', tx)
except Exception as e:
# 处理异常或错误
print(str(e))
cap.release()
def dir_source_process(source, model_inference, model_data):
img_ext = [".jpg", ".JPG", ".bmp"]
video_ext = [".mp4", ".avi", ".MP4"]
img_list = get_dir_file(source, img_ext)
video_list = get_dir_file(source, video_ext)
if img_list:
for img in img_list:
t1 = time.time()
images = cv2.imread(img)
imgframe_dict = {"path": img, 'frame': images}
images_update = img_process(
imgframe_dict, model_inference, model_data)
t2 = time.time()
tx = t2 - t1
print('检测一张图片的时间为:', tx)
if video_list:
pass
def file_source_process(source, model_inference, model_data):
img_para = True
if img_para:
images = cv2.imread(source)
imgframe_dict = {"path": source, 'frame': images}
images_update = img_process(
imgframe_dict, model_inference, model_data)
def img_process(images, model_inference, model_data):
# t1 = time.time()
# 检测每帧图片,返回推理结果
results = det_img(model_inference=model_inference,
images_frame=images['frame'],
confidence=model_data["model_parameter"]['confidence'],
label_name_list=model_data["model_parameter"]['label_names'])
# print(images['path'])
# 根据需要挑选标注框信息
select_labels_list = model_labels_selet(example_list=model_data["model_parameter"]['compara_label_names'],
result_dict_list=results)
if model_data["model_parameter"]['compara_relevancy']:
# 需要根据的逻辑判断标注框信息
determine_bbox = same_model_img_analysis_labels(example_list=model_data["model_parameter"]['compara_label_names'],
result_dicts_list=select_labels_list,
relevancy=model_data["model_parameter"]['compara_relevancy'],
relevancy_para=model_data["model_parameter"]['relevancy_para'])
else:
determine_bbox = select_labels_list
# print(determine_bbox)
if model_data['model_parameter']['object_num_min']:
if len(determine_bbox) >= model_data["model_parameter"]['object_num_min']:
# print(len(determine_bbox))
determine_bbox.clear()
# 返回检测后结果
return determine_bbox
def save_process(images, determine_bbox, model_data):
if determine_bbox:
images.update({"results": determine_bbox})
img_save = drawing_frame(
images_frame=images['frame'], result_list=determine_bbox)
images.update({"frame": img_save})
if model_data["save_path"]:
imgname = images_save(
images=images, save_path=model_data["save_path"])
if model_data['save_path_original']:
imgname_original = images_save(images=images,
save_path=model_data["save_path_original"])
if model_data["save_annotations"]:
if not os.path.exists(model_data["save_annotations"]):
os.makedirs(model_data["save_annotations"])
save_annotations_xml(
xml_save_file=model_data["save_annotations"], save_infors=determine_bbox, images=images['path'])
else:
pass
else:
# 没检测出来的图片是否保存
if model_data["test_path"]:
imgname = images_save(
images=images, save_path=model_data["test_path"])
# print('no:',images['path'],imgname)
# 展示显示
# if images['path'] == 'rtsp://admin:@192.168.10.11':
# cv2.namedWindow('11', cv2.WINDOW_NORMAL)
# cv2.imshow('11',images['frame'])
# cv2.waitKey(1)
# cv2.destroyAllWindows()
# t2 = time.time()
return images
def images_save(images, save_path):
# 保存时候时间为图片名
# data_now = datetime.now()
# images_name = str(data_now.strftime("%Y")) + str(data_now.strftime("%m")) + str(data_now.strftime("%d")) + str(data_now.strftime("%H")) + \
# str(data_now.strftime("%M")) + str(data_now.strftime("%S")) + \
# str(data_now.strftime("%f")) + '.jpg'
# img_save_path = save_path + '/' + str(
# data_now.year) + '/' + str(data_now.month) + '_' + str(data_now.day) + '/'
img_save_path = os.path.join(save_path, str(images['path'].split('.')[-1]))
images_name = images['get_time'] + '.jpg'
if not os.path.exists(img_save_path):
os.makedirs(img_save_path)
full_name = os.path.join(img_save_path, images_name)
cv2.imwrite(full_name, images['frame'])
return full_name
def save_annotations_xml(xml_save_file, save_infors, images):
results = save_infors
img = os.path.basename(images)
img_frame = cv2.imread(images)
xml_save_path = os.path.join(xml_save_file, img.split('.')[0] + '.xml')
w, h, d = img_frame.shape
img_shape = (w, h, d, img)
if os.path.isfile(xml_save_path):
add_xml(inforsDict=results,
xmlFilePath=xml_save_path)
else:
create_xml(boxs=results,
img_shape=img_shape,
xml_path=xml_save_path)
def determine_time(det_num, nodet_num, ratio_set):
ratio = det_num / (det_num + nodet_num)
# print(det_num, nodet_num, ratio)
if ratio >= ratio_set:
return True
else:
return False
def video_synthesis(imglist, savePath, size, fps, videoname):
if not os.path.exists(savePath):
os.makedirs(savePath)
print(videoname)
video_path = os.path.join(savePath, videoname + '.avi')
out = cv2.VideoWriter(
video_path, cv2.VideoWriter_fourcc(*'DIVX'), fps, size)
sorted_list = sorted(imglist, key=lambda x: x['get_time'])
for filename in sorted_list:
out.write(filename['frame'])
out.release()
def json_get(time_list, video_path,fps):
result_dict ={'info': {'video_path': video_path,'fps':fps}}
re_dict = {}
for i, det_dict in enumerate(time_list):
list_hands = ["Keypad", "hands", "keyboard", "mouse", "phone"]
list_sleep = ["person", "sleep"]
if list(det_dict['results'][0].keys())[0] in list_hands:
result_lables = 'playing_phone'
if list(det_dict['results'][0].keys())[0] in list_sleep:
result_lables = "sleep"
fps_dict = {'time': det_dict['get_fps'],
'duration': det_dict['dert_fps'], 'result': result_lables}
re_dict.update({('id_' + str(i)): fps_dict})
result_dict.update({'result':re_dict})
return result_dict
def json_save(result_dict):
json_path = result_dict['info`']['video_path'].split('.')[0] + '.json'
result = json.dumps(result_dict)
f = open(json_path, 'w')
f.write(result + '\n')
f.close
# if __name__ == '__main__':
# data_load(['rtsp://admin:@192.168.10.203',
# 'E:/Bank_files/Bank_03/xbank_poc_test_use/config_phone.yaml'])