from analysis_result.get_model_result import det_img from analysis_result.same_model_img import same_model_img_analysis_labels, model_labels_selet from model_load.model_load import load_model from drawing_img.drawing_img import drawing_frame from analysis_data.data_dir_file import get_dir_file from analysis_data.config_load import get_configs from utils import is_image_file, is_rtsp_or_video import cv2 import os import time from datetime import datetime def data_load(args): # print('正在运行的进程',msg) # print(args) source = args[0] model_yaml = args[1] # 数据加载 rtsp_or_video_source = is_rtsp_or_video(source) dir_source = os.path.isdir(source) img_source = is_image_file(source) # # 模型加载 model_data = get_configs(model_yaml) model_inference = load_model(model_file=model_data["model"], device=model_data["model_parameter"]['device'], cache_file=model_data["model_cache"]) if rtsp_or_video_source: cap = cv2.VideoCapture(source) try: i = 0 while True: ret, frame = cap.read() if not ret: continue # 如果未成功读取到视频帧,则继续读取下一帧 print(source,datetime.today(), i) # if source == 'rtsp://admin:@192.168.10.18': # cv2.imshow('18',frame) img_frame_dict = {"path": source, 'frame': frame} images_update = img_process( img_frame_dict, model_inference, model_data) # print(type(images_update['frame'])) # if source == 'rtsp://admin:@192.168.10.18': # cv2.namedWindow('18',0) # cv2.imshow('18',images_update['frame']) i = i+1 except Exception as e: # 处理异常或错误 print(str(e)) cap.release() if dir_source: img_ext = [".jpg", ".JPG", ".bmp"] video_ext = [".mp4", ".avi", ".MP4"] img_list = get_dir_file(source, img_ext) video_list = get_dir_file(source, video_ext) if img_list: for img in img_list: t1 = time.time() images = cv2.imread(img) img_frame_dict = {"path": img, 'frame': images} images_update = img_process( img_frame_dict, model_inference, model_data) t2 = time.time() tx = t2 - t1 print('检测一张图片的时间为:',tx) if video_list: pass if img_source: img_para = True if img_para: images = cv2.imread(source) img_frame_dict = {"path": source, 'frame': images} images_update = img_process( img_frame_dict, model_inference, model_data) def img_process(images, model_inference, model_data): start_point = time.perf_counter() # 检测每帧图片,返回推理结果 results = det_img(model_inference=model_inference, images_frame=images['frame'], confidence=model_data["model_parameter"]['confidence'], label_name_list=model_data["model_parameter"]['label_names']) # print(images['path']) # 根据需要挑选标注框信息 select_labels_list = model_labels_selet(example_list=model_data["model_parameter"]['compara_label_names'], result_dict_list=results) if model_data["model_parameter"]['compara_relevancy']: # 需要根据的逻辑判断标注框信息 determine_bbox = same_model_img_analysis_labels(example_list=model_data["model_parameter"]['compara_label_names'], result_dicts_list=select_labels_list, relevancy=model_data["model_parameter"]['compara_relevancy'], relevancy_para=model_data["model_parameter"]['relevancy_para']) else: determine_bbox = select_labels_list # 判断后的信息返回结果,这里是画图,可返回结果 if determine_bbox: images.update({"results": determine_bbox}) img_save = drawing_frame( images_frame=images['frame'], result_list=determine_bbox) images.update({"frame": img_save}) img_name = images_save( images=images['frame'], save_path=model_data["save_path"]) print('sleep:', images['path'], img_name) if model_data['save_path_original']: images_save(images=images['frame'], save_path=model_data["save_path_original"]) else: pass else: # 没检测出来的图片是否保存 if model_data["test_path"]: img_name = images_save( images=images['frame'], save_path=model_data["test_path"]) # print('no:',images['path'],img_name) else: pass # 展示显示 # if images['path'] == 'rtsp://admin:@192.168.10.11': # cv2.namedWindow('11', cv2.WINDOW_NORMAL) # cv2.imshow('11',images['frame']) # cv2.waitKey(1) # cv2.destroyAllWindows() # t2 = time.time() end_point = time.perf_counter() cost = end_point - start_point print(f"Predicted in {cost * 1000:.2f}ms. {1.0 / cost:.2f} FPS") return images def images_save(images, save_path): # 保存时候时间为图片名 data_now = datetime.today() images_name = str(data_now.year) + str(data_now.month) + str(data_now.day) + str(data_now.hour) + \ str(data_now.minute) + str(data_now.second) + \ str(data_now.microsecond) + '.jpg' img_save_path = save_path + '/' + str( data_now.year) + '/' + str(data_now.month) + '_' + str(data_now.day) + '/' if not os.path.exists(img_save_path): os.makedirs(img_save_path) full_name = img_save_path + images_name cv2.imwrite(full_name, images) return full_name