From a051f44b9d0e9f9ffe91a6c124ae2d72b3477f51 Mon Sep 17 00:00:00 2001 From: wangying Date: Thu, 19 Oct 2023 17:14:54 +0800 Subject: [PATCH] =?UTF-8?q?1019=E6=9B=B4=E6=96=B0=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Bank_second_part/xbank_detect_process/main.py | 38 +- .../xbank_detect_process/main_process.py | 480 +++++++++++------- 2 files changed, 318 insertions(+), 200 deletions(-) diff --git a/Bank_second_part/xbank_detect_process/main.py b/Bank_second_part/xbank_detect_process/main.py index 7b5d61c..1d0e2d9 100644 --- a/Bank_second_part/xbank_detect_process/main.py +++ b/Bank_second_part/xbank_detect_process/main.py @@ -2,6 +2,16 @@ from main_process import data_load from analysis_data.config_load import get_configs from multiprocessing import Pool +from loguru import logger +import os +import time + +# 日志配置 +log_path = os.path.join(__file__, "../logs/xbank.log") +logger.add( + log_path, rotation="60 MB", enqueue=True, backtrace=True, diagnose=True, retention=30 +) +logger.info("*************************** xbank start ***************************") def get_args_list(args_data): @@ -18,19 +28,35 @@ def get_args_list(args_data): return args_list +def start_worker(args_list): + while True: + try: + logger.info(f" process {args_list} crashed. Starting...") -if __name__ == "__main__": + data_load(args_list) + + except Exception as e: + + logger.info(f" process {args_list} crashed. Restarting...") + logger.debug(e) + + +def main(): # 加载配置文件 - args = './config_det.yaml' - args_data = get_configs(yaml_files=args) + args = '../config_det.yaml' + args_data = get_configs(ymal_files=args) args_list = get_args_list(args_data) process_num = len(args_list) - print(f"args_list:{args_list}") - with Pool(process_num) as pool: - pool.map(data_load, args_list) + pool.map(start_worker, args_list) + + +if __name__ == "__main__": + + main() + diff --git a/Bank_second_part/xbank_detect_process/main_process.py b/Bank_second_part/xbank_detect_process/main_process.py index f11d57c..84edae9 100644 --- a/Bank_second_part/xbank_detect_process/main_process.py +++ b/Bank_second_part/xbank_detect_process/main_process.py @@ -3,25 +3,31 @@ from analysis_result.same_model_img import same_model_img_analysis_labels, model from model_load.model_load import Load_model from drawing_img.drawing_img import drawing_frame from analysis_data.data_rtsp import rtsp_para -from analysis_data.data_dir_file import get_dir_file, get_imgframe +from analysis_data.data_dir_file import get_dir_file from analysis_data.config_load import get_configs from add_xml import add_xml from create_xml import create_xml +from analysis_data.change_video import mp4_to_H264 -import yaml import cv2 import os -from pathlib import Path import time from datetime import datetime -import glob import json +from loguru import logger + +import logging +import logstash + +host = '192.168.10.96' + +xbank_logger = logging.getLogger('python-logstash-logger') +xbank_logger.setLevel(logging.INFO) +xbank_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1)) def data_load(args): - # print('正在运行的进程',msg) - # print(args) source = args[0] model_ymal = args[1] @@ -38,189 +44,256 @@ def data_load(args): if rtsp_source: - cap = cv2.VideoCapture(source) + rtsp_detect_process(source=source, model_data=model_data, + model_inference=model_inference) - # 视频流信息 - fps = int(cap.get(cv2.CAP_PROP_FPS)) - fps_num = fps*model_data['detect_time'] - fps_num_small = fps*model_data['detect_time_small'] - size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), - int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) - try: - i = 0 - j = 0 + if dir_source: + dir_source_process(source, model_inference, model_data) - det_t_num = 0 - nodet_t_num = 0 + if file_source: - det_img = [] + file_source_process(source, model_inference, model_data) - video_name_time = 0 - det_fps_time = [] - while True: - ret, frame = cap.read() +def rtsp_detect_process(source, model_data, model_inference): - if not ret: - continue # 如果未成功读取到视频帧,则继续读取下一帧 + cap = cv2.VideoCapture(source) + logger.info(f"视频流{source}读取中...") - i = i + 1 - j = j + 1 + # 视频流信息 + fps = int(cap.get(cv2.CAP_PROP_FPS)) + fps_num = fps*model_data['detect_time'] + fps_num_small = fps*model_data['detect_time_small'] + size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), + int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) - # 读取到当前视频帧时间 - data_now = datetime.now() - get_time = str(data_now.strftime("%H")) + \ - str(data_now.strftime("%M")) + str(data_now.strftime("%S")) + \ - str(data_now.strftime("%f")) + i = 0 + j = 0 + n = 0 - # 视频保存 - if video_name_time == 0: + det_t_num = 0 + nodet_t_num = 0 - video_name_time = get_time - savePath = os.path.join(model_data['save_videos'], (str(data_now.strftime( - "%Y")) + str(data_now.strftime("%m")) + str(data_now.strftime("%d")))) + det_img = [] - if not os.path.exists(savePath): - os.makedirs(savePath) + video_name_time = 0 + det_fps_time = [] - video_path = os.path.join( - savePath, video_name_time + '.avi') - print('video_path:', video_path) + while True: - out_video = cv2.VideoWriter( - video_path, cv2.VideoWriter_fourcc(*'DIVX'), fps, size) + try: + ret, frame = cap.read() - print(source, data_now, i,j) - t1 = time.time() + i += 1 + j += 1 - imgframe_dict = {"path": source, - 'frame': frame, - 'get_fps': j} + # 读取到当前视频帧时间 + data_now = datetime.now() + get_time = str(data_now.strftime("%H")) + \ + str(data_now.strftime("%M")) + str(data_now.strftime("%S")) + \ + str(data_now.strftime("%f")) - images_det_result = img_process(imgframe_dict, - model_inference, - model_data) + imgframe_dict = {"path": source, 'frame': frame, + 'get_fps': j, 'get_time': get_time} - images_update = save_process(imgframe_dict, - images_det_result, - model_data) + # 视频暂时保存路径 + if video_name_time == 0: - print('images_det_result:',len(images_det_result)) + video_name_time = get_time + video_path = video_name( + video_name_base=video_name_time, save_path=model_data['save_videos'], save_file='temp') - if images_det_result: + out_video = cv2.VideoWriter( + video_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, size) - det_t_num = det_t_num + 1 + logger.info(f"视频{video_path}已经暂时保存...") - # print(len(det_img)) - if len(det_img) == 0: - img_dict = images_update.copy() - del img_dict['frame'] - det_img.append(img_dict) + # 模型推理 + images_det_result = img_process( + imgframe_dict, model_inference, model_data) - if not images_det_result and len(det_img) > 0: + images_update = save_process( + imgframe_dict, images_det_result, model_data) - nodet_t_num = nodet_t_num + 1 + # 结果判断,t + if images_det_result: - if (det_t_num + nodet_t_num) >= fps_num_small: + det_t_num += 1 - para = determine_time(det_num=det_t_num, - nodet_num=nodet_t_num, - ratio_set=model_data['detect_ratio']) + if len(det_img) == 0: + img_dict = images_update.copy() + det_img.append(img_dict) - if para: + if not images_det_result and len(det_img) > 0: - first_fps_time = det_img[0] - print({"dert_fps": (j-int(first_fps_time['get_fps'])+1)}) - first_fps_time.update({"dert_fps": (j-int(first_fps_time['get_fps'])+1)}) - det_fps_time.append(first_fps_time) + nodet_t_num += 1 + + if (det_t_num + nodet_t_num) >= fps_num_small: + + para = determine_time( + det_num=det_t_num, nodet_num=nodet_t_num, ratio_set=model_data['detect_ratio']) + + if para: - det_img.clear() - det_t_num = 0 - nodet_t_num = 0 + first_fps_time = det_img[0] + first_fps_time.update( + {"dert_fps": (j-int(first_fps_time['get_fps'])+1)}) - # print('det_img:', len(det_img), det_t_num, nodet_t_num) + det_fps_time.append(first_fps_time) - out_video.write(images_update['frame']) + det_img.clear() + det_t_num = 0 + nodet_t_num = 0 - if j >= fps_num: + # 视频保存 + out_video.write(images_update['frame']) + # 结果判断 ,T + if j >= fps_num: + + try: out_video.release() - - if det_img: - first_fps_time = det_img[0] - print({"dert_fps": (j-int(first_fps_time['get_fps'])+1)}) - first_fps_time.update({"dert_fps": (j-int(first_fps_time['get_fps'])+1)}) - det_fps_time.append(first_fps_time) + except Exception: + logger.exception(f"视频release失败") + else: + logger.info("视频release成功") - print('det_fps_time:',det_fps_time) + # T时间截至,判断t时间结果。 + if det_img: - if det_fps_time: - re_list = json_get(time_list=det_fps_time,video_path=video_path) - json_save(re_list) + para = determine_time( + det_num=det_t_num, nodet_num=nodet_t_num, ratio_set=model_data['detect_ratio']) - else: - print(video_path) - os.remove(video_path) - print('clear videos') - + first_fps_time = det_img[0] + time_1 = (j-int(first_fps_time['get_fps'])+1) - det_fps_time.clear() - video_name_time = 0 - j = 0 + if para and time_1 >= (fps_num_small/2): + + first_fps_time.update( + {"dert_fps": (j-int(first_fps_time['get_fps'])+1)}) + det_fps_time.append(first_fps_time) - print('det_fps_time:', len(det_fps_time), i, j) + if det_fps_time: - # break - t2 = time.time() - tx = t2 - t1 - print('检测一张图片的时间为:', tx) + det_fps_time = determine_duration(result_list=det_fps_time) + # 转换后视频保存路径 + save_video_name = os.path.basename(video_path) + only_video_name = save_video_name.split('.')[0] + save_video_path = os.path.join(model_data['save_videos'], (str(data_now.strftime( + "%Y")) + str(data_now.strftime("%m")) + str(data_now.strftime("%d")))) + save_video_path = os.path.join( + save_video_path, only_video_name) + + # 路径 + save_video = os.path.join(save_video_path, save_video_name) + json_path = os.path.join( + save_video_path, only_video_name + '.json') + images_path = os.path.join(save_video_path, 'images') + + # 转换视频、保存视频 + change_video = mp4_to_H264() + change_video.convert_byfile(video_path, save_video) + + # 保存图片 + update_det_fps = video_cut_images_save( + det_list=det_fps_time, images_path=images_path) + + # print(update_det_fps) + + # 保存json文件 + re_list, result_lables = json_get( + time_list=update_det_fps, video_path=save_video, fps=fps) + result_path = json_save(re_list, json_path) + + send_message(update_det_fps=update_det_fps, result_path=result_path, + source=source, result_lables=result_lables) + + else: + # print(video_path) + os.remove(video_path) + logger.info(f"未检测到目标信息的视频{video_path}删除成功") + + logger.info('开始信息重置') + det_img.clear() + det_fps_time.clear() + det_t_num = 0 + nodet_t_num = 0 + video_name_time = 0 + j = 0 + + # print('det_fps_time:', det_fps_time,'det_img:',det_img) + + # t2 = time.time() + # tx = t2 - t1 + # logger.info(f'检测一张图片的时间为:{tx}.') except Exception as e: - # 处理异常或错误 - print(str(e)) - cap.release() + logger.debug(f"读帧率失败{source}未读到...") + logger.debug(e) + cap.release() + cap = cv2.VideoCapture(source) + logger.info(f"摄像头{source}重新读取") - if dir_source: + # break + + +def video_name(video_name_base, save_path, save_file): - img_ext = [".jpg", ".JPG", ".bmp"] - video_ext = [".mp4", ".avi", ".MP4"] + video_name_base = video_name_base - img_list = get_dir_file(source, img_ext) - video_list = get_dir_file(source, video_ext) + savePath = os.path.join(save_path, save_file) - if img_list: + if not os.path.exists(savePath): + os.makedirs(savePath) - for img in img_list: + video_path = os.path.join( + savePath, video_name_base + '.mp4') - t1 = time.time() - images = cv2.imread(img) + return video_path - imgframe_dict = {"path": img, 'frame': images} - images_update = img_process( - imgframe_dict, model_inference, model_data) +def dir_source_process(source, model_inference, model_data): - t2 = time.time() - tx = t2 - t1 - print('检测一张图片的时间为:', tx) + img_ext = [".jpg", ".JPG", ".bmp"] + video_ext = [".mp4", ".avi", ".MP4"] - if video_list: + img_list = get_dir_file(source, img_ext) + video_list = get_dir_file(source, video_ext) - pass + if img_list: - if file_source: + for img in img_list: - img_para = True + t1 = time.time() + images = cv2.imread(img) - if img_para: - images = cv2.imread(source) - imgframe_dict = {"path": source, 'frame': images} + imgframe_dict = {"path": img, 'frame': images} images_update = img_process( imgframe_dict, model_inference, model_data) + t2 = time.time() + tx = t2 - t1 + print('检测一张图片的时间为:', tx) + + if video_list: + + pass + + +def file_source_process(source, model_inference, model_data): + + img_para = True + + if img_para: + images = cv2.imread(source) + imgframe_dict = {"path": source, 'frame': images} + + images_update = img_process( + imgframe_dict, model_inference, model_data) + def img_process(images, model_inference, model_data): @@ -231,6 +304,8 @@ def img_process(images, model_inference, model_data): confidence=model_data["model_parameter"]['confidence'], label_name_list=model_data["model_parameter"]['label_names']) + # print(results) + # print(images['path']) # 根据需要挑选标注框信息 @@ -247,17 +322,16 @@ def img_process(images, model_inference, model_data): else: determine_bbox = select_labels_list - - print(determine_bbox) - - if model_data['model_parameter']['object_num_min'] : - if len(determine_bbox) <= model_data["model_parameter"]['object_num_min']: + # print(determine_bbox) + + if model_data['model_parameter']['object_num_min']: + if len(determine_bbox) >= model_data["model_parameter"]['object_num_min']: - print(len(determine_bbox)) determine_bbox.clear() - + # logger.debug(f"正确获得检测后的信息{determine_bbox}...") + # 返回检测后结果 return determine_bbox @@ -268,6 +342,10 @@ def save_process(images, determine_bbox, model_data): images.update({"results": determine_bbox}) + if model_data['save_path_original']: + imgname_original = images_save(images=images, + save_path=model_data["save_path_original"]) + img_save = drawing_frame( images_frame=images['frame'], result_list=determine_bbox) @@ -278,10 +356,6 @@ def save_process(images, determine_bbox, model_data): imgname = images_save( images=images, save_path=model_data["save_path"]) - if model_data['save_path_original']: - imgname_original = images_save(images=images, - save_path=model_data["save_path_original"]) - if model_data["save_annotations"]: if not os.path.exists(model_data["save_annotations"]): @@ -315,22 +389,12 @@ def save_process(images, determine_bbox, model_data): def images_save(images, save_path): # 保存时候时间为图片名 - # data_now = datetime.now() - # images_name = str(data_now.strftime("%Y")) + str(data_now.strftime("%m")) + str(data_now.strftime("%d")) + str(data_now.strftime("%H")) + \ - # str(data_now.strftime("%M")) + str(data_now.strftime("%S")) + \ - # str(data_now.strftime("%f")) + '.jpg' - # img_save_path = save_path + '/' + str( - # data_now.year) + '/' + str(data_now.month) + '_' + str(data_now.day) + '/' - img_save_path = os.path.join(save_path, str(images['path'].split('.')[-1])) images_name = images['get_time'] + '.jpg' - # img_save_path = save_path + '/' + str(images['path'].split('.')[-1]) + '/' - # print(img_save_path) + if not os.path.exists(save_path): + os.makedirs(save_path) - if not os.path.exists(img_save_path): - os.makedirs(img_save_path) - - full_name = os.path.join(img_save_path, images_name) + full_name = os.path.join(save_path, images_name) cv2.imwrite(full_name, images['frame']) @@ -360,41 +424,49 @@ def determine_time(det_num, nodet_num, ratio_set): ratio = det_num / (det_num + nodet_num) - print(det_num, nodet_num, ratio) - if ratio >= ratio_set: - return True - else: - return False -def video_synthesis(imglist, savePath, size, fps, videoname): +def determine_duration(result_list): + i = 0 - if not os.path.exists(savePath): - os.makedirs(savePath) + while i < len(result_list) - 1: + dict_i = result_list[i] + dict_j = result_list[i + 1] + + if 'get_fps' in dict_i and 'dert_fps' in dict_i and 'get_fps' in dict_j: + num_i = int(dict_i['get_fps']) + dura_i = int(dict_i['dert_fps']) + num_j = int(dict_j['get_fps']) - print(videoname) - video_path = os.path.join(savePath, videoname + '.avi') - out = cv2.VideoWriter( - video_path, cv2.VideoWriter_fourcc(*'DIVX'), fps, size) + if num_i + dura_i == num_j: + dura_j = int(dict_j['dert_fps']) + dura_update = dura_i + dura_j + + dict_i['dert_fps'] = dura_update + result_list.pop(i + 1) + else: + i += 1 + else: + i += 1 - sorted_list = sorted(imglist, key=lambda x: x['get_time']) + return result_list - for filename in sorted_list: - out.write(filename['frame']) - out.release() + # print('2:', result_list) -def json_get(time_list,video_path): +def json_get(time_list, video_path, fps): - result_dict = {'video_path':video_path} - for i,det_dict in enumerate(time_list): + result_dict = {'info': {'video_path': video_path, 'fps': fps}} + re_dict = {} + for i, det_dict in enumerate(time_list): - list_hands = ["Keypad","hands","keyboard", "mouse","phone"] - list_sleep = ["person","sleep"] + list_hands = ["Keypad", "hands", "keyboard", "mouse", "phone"] + list_sleep = ["sleep"] + list_person = ["person"] if list(det_dict['results'][0].keys())[0] in list_hands: @@ -404,39 +476,59 @@ def json_get(time_list,video_path): result_lables = "sleep" - fps_dict = {'time': det_dict['get_fps'],'duration':det_dict['dert_fps'],'result':result_lables} - result_dict.update({('id_'+ str(i)):fps_dict}) - - return result_dict + if list(det_dict['results'][0].keys())[0] in list_person: -# def json_analysis(re_list): + result_lables = "person" -# update_list = [] -# copy_list = [x for x in re_list not in update_list] + fps_dict = {'time': det_dict['get_fps'], + 'duration': det_dict['dert_fps'], + 'images_path': det_dict['images_path']} -# for i in range(len(copy_list)-1): + re_dict.update({('id_' + str(i)): fps_dict}) -# j = i + 1 + result_dict.update({'result': re_dict}) -# re_i = int(re_list[i]['fps']) -# re_i_add = int(re_list[i]['dert_fps']) -# re_j = int(re_list[j]['fps']) + return result_dict, result_lables -# if re_i + re_i_add == re_j: -# update_list.append(re_i,re_j) -# print() +def json_save(result_dict, json_path): -def json_save(result_dict): + result = json.dumps(result_dict) - json_path = result_dict['video_path'].split('.')[0] + '.json' - del result_dict['video_path'] - result = json.dumps(result_dict) - - f = open(json_path,'w') + f = open(json_path, 'w') f.write(result + '\n') f.close + return json_path + + +def video_cut_images_save(det_list, images_path): + + for det_dict in det_list: + + images_path_full = images_save(images=det_dict, save_path=images_path) + + del det_dict['frame'] + del det_dict['get_time'] + det_dict.update({'images_path': images_path_full}) + + return det_list + + +def send_message(update_det_fps, result_path, source, result_lables): + + for det_dict in update_det_fps: + + extra = { + 'worker': 'xbank', + 'time': det_dict['get_fps'], + 'config_file': result_path, + 'source': source, + 'type': result_lables + } + + xbank_logger.info('xBank_infer', extra=extra) + logger.info(f'发送信息{extra}') # if __name__ == '__main__':