import os import math import time import cv2 import json from queue import Queue, Empty from threading import Thread from log import logger from ultralytics import YOLO from yolov8_det import analysis_yolov8 from capture_queue import CAMERA_QUEUE, camera_mul_thread from cal_utils import * with open('cfg.json', 'r') as f: cfg_dict = json.load(f) class FrameToVideo(Thread): """ 农商行员工打瞌睡,玩手机分析类 """ def __int__(self, camera_name): super(FrameToVideo, self).__init__() self.camera = camera_name self.queue_img = CAMERA_QUEUE[camera_name] self.yolo_model = {'person': YOLO(cfg_dict['model_path']['person']), 'head': YOLO(cfg_dict['model_path']['head']), 'phone': YOLO(cfg_dict['model_path']['phone'])} self.target_list = [] # self.count = 0 @staticmethod def save_video(frames, fps, fourcc, video_path, w_h_size): """ 截取后图像保存视频 """ encoding = list(fourcc) video_fourcc = cv2.VideoWriter_fourcc(encoding[0], encoding[1], encoding[2], encoding[3]) video_write_obj = cv2.VideoWriter(video_path, video_fourcc, fps, w_h_size) for frame in frames: video_write_obj.write(frame) def target_match(self, coord, frame_img, new_target_list): """ 遍历目标进行匹配 """ match_flag = False for target in self.target_list: if target['flag']: continue if all([abs(coord[n] - target['coord'][n]) <= 5 for n in range(4)]): # 误差判断 frame_split = frame_img[target['split_y']['min']:target['split_y']['max'], target['split_x']['min']:target['split_x']['max']] target['frame'].append(frame_split) target['count'] += 1 target['flag'] = True new_target_list.append(target) match_flag = True break else: continue return match_flag def frame_analysis(self): while True: try: frame_img = self.queue_img.get_nowait() except Empty: time.sleep(0.01) continue new_target_list = [] # 调用模型,逐帧检测 person_coord_list = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['person'], confidence=cfg_dict['confidence']['person']) if not self.target_list: for line in range(len(person_coord_list)): # site = str(line) coord = person_coord_list[line]['person'] split_y = {'min': int(coord[1] - cfg_dict['error_y']), 'max': int(coord[3] + cfg_dict['error_y'])} split_x = {'min': int(coord[0] - cfg_dict['error_x']), 'max': int(coord[2] + cfg_dict['error_x'])} # 裁剪大一圈,固定裁剪范围 frame_split = frame_img[split_y['min']:split_y['max'], split_x['min']:split_x['max']] self.target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x, 'split_y': split_y, 'flag': False}) else: for line in person_coord_list: coord = line['person'] match_flag = self.target_match(coord, frame_img, new_target_list) if not match_flag: split_y = {'min': int(coord[1] - cfg_dict['error_y']), 'max': int(coord[3] + cfg_dict['error_y'])} split_x = {'min': int(coord[0] - cfg_dict['error_x']), 'max': int(coord[2] + cfg_dict['error_x'])} # 裁剪大一圈,固定裁剪范围 frame_split = frame_img[split_y['min']:split_y['max'], split_x['min']:split_x['max']] new_target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x, 'split_y': split_y, 'flag': False}) # 判断帧数,生成视频 for target in self.target_list: if len(target['frame']) == cfg_dict['video_length']: frame_w = target['split_x']['max'] - target['split_x']['min'] frame_h = target['split_y']['max'] - target['split_y']['min'] self.save_video(target['frame'], cfg_dict['fps'], cfg_dict['video_encoding'], cfg_dict['video_path'], (frame_w, frame_h)) continue # 过滤中断没有匹配到的目标 if target['flag']: target['flag'] = False new_target_list.append(target) self.target_list = new_target_list # 目标清理,新增目标 # head_img = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['head'], # confidence=cfg_dict['confidence']['head']) # phone_img = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['phone'], # confidence=cfg_dict['confidence']['phone']) # todo 根据逻辑生成视频,这部分逻辑 def run(self): pass class ViolationJudgmentSend(Thread): """ 农商行员工打瞌睡,玩手机分析类 """ def __int__(self, camera_name): super(ViolationJudgmentSend, self).__init__() self.video_path = camera_name self.action_model = {'person': YOLO(cfg_dict['model_path']['person']), 'head': YOLO(cfg_dict['model_path']['head']), 'phone': YOLO(cfg_dict['model_path']['phone'])} self.person_coord_list = [] @staticmethod def save_video(frame_split, fps, fourcc, video_path, w_h_size): """ 截取后图像保存视频 """ encoding = list(fourcc) video_fourcc = cv2.VideoWriter_fourcc(encoding[0], encoding[1], encoding[2], encoding[3]) video_write_obj = cv2.VideoWriter(video_path, video_fourcc, fps, w_h_size) video_write_obj.write(frame_split) def video_analysis(self): while True: pass def run(self): self.video_analysis() def process_run(): camera_mul_thread() logger.info('程序启动') # todo 分析流程 if __name__ == '__main__': process_run()