|
|
import os
|
|
|
import math
|
|
|
import time
|
|
|
import cv2
|
|
|
import json
|
|
|
from queue import Queue, Empty
|
|
|
from threading import Thread
|
|
|
from log import logger
|
|
|
from ultralytics import YOLO
|
|
|
from yolov8_det import analysis_yolov8
|
|
|
from capture_queue import CAMERA_QUEUE, camera_mul_thread
|
|
|
|
|
|
with open('cfg.json', 'r') as f:
|
|
|
cfg_dict = json.load(f)
|
|
|
|
|
|
|
|
|
class FrameToVideo(Thread):
|
|
|
"""
|
|
|
农商行员工打瞌睡,玩手机分析类
|
|
|
"""
|
|
|
|
|
|
def __init__(self, camera_name):
|
|
|
super(FrameToVideo, self).__init__()
|
|
|
self.camera = camera_name
|
|
|
# self.queue_img = CAMERA_QUEUE[camera_name]
|
|
|
# self.yolo_model = {'person': YOLO(cfg_dict['model_path']['person']),
|
|
|
# 'head': YOLO(cfg_dict['model_path']['head'])}
|
|
|
|
|
|
self.yolo_model = YOLO(cfg_dict['model_path']['all'])
|
|
|
|
|
|
self.person_target_list = []
|
|
|
self.head_target_list = []
|
|
|
self.phone_target_list = []
|
|
|
|
|
|
@staticmethod
|
|
|
def save_video(frames, fps, fourcc, video_path, w_h_size):
|
|
|
"""
|
|
|
截取后图像保存视频
|
|
|
"""
|
|
|
encoding = list(fourcc)
|
|
|
video_fourcc = cv2.VideoWriter_fourcc(encoding[0], encoding[1], encoding[2], encoding[3])
|
|
|
video_write_obj = cv2.VideoWriter(video_path, video_fourcc, fps, w_h_size)
|
|
|
for frame in frames:
|
|
|
video_write_obj.write(frame)
|
|
|
logger.info(f'生成视频:{video_path}')
|
|
|
|
|
|
@staticmethod
|
|
|
def boundary_treat(frame_x, frame_y, coord):
|
|
|
"""
|
|
|
边界处理,裁剪可能会超出范围
|
|
|
"""
|
|
|
y_min = coord[1] - cfg_dict['error_y']
|
|
|
y_max = coord[3] + cfg_dict['error_y']
|
|
|
x_min = coord[0] - cfg_dict['error_x']
|
|
|
x_max = coord[2] + cfg_dict['error_x']
|
|
|
split_y = {'min': int(y_min) if y_min >= 0 else 0,
|
|
|
'max': int(y_max) if y_max <= frame_y else frame_y}
|
|
|
split_x = {'min': int(x_min) if y_min >= 0 else 0,
|
|
|
'max': int(x_max) if x_max <= frame_x else frame_x}
|
|
|
return split_x, split_y
|
|
|
|
|
|
@staticmethod
|
|
|
def target_match(target_list, coord, frame_img, new_target_list):
|
|
|
"""
|
|
|
遍历目标进行匹配
|
|
|
"""
|
|
|
match_flag = False
|
|
|
for target in target_list:
|
|
|
if target['flag']:
|
|
|
continue
|
|
|
if all([abs(coord[n] - target['coord'][n]) <= 5 for n in range(4)]): # 误差判断
|
|
|
frame_split = frame_img[target['split_y']['min']:target['split_y']['max'],
|
|
|
target['split_x']['min']:target['split_x']['max']]
|
|
|
# cv2.imshow('yang', frame_split)
|
|
|
# cv2.waitKey(2000)
|
|
|
target['frame'].append(frame_split)
|
|
|
target['count'] += 1
|
|
|
target['flag'] = True
|
|
|
# new_target_list.append(target)
|
|
|
match_flag = True
|
|
|
break
|
|
|
else:
|
|
|
continue
|
|
|
return match_flag
|
|
|
|
|
|
def target_analysis(self, target_list, new_target_list, person_coord_list, frame_x, frame_y, frame_img, label):
|
|
|
if not target_list:
|
|
|
for line in person_coord_list:
|
|
|
# label that self define maybe different from model
|
|
|
if label == 'head':
|
|
|
coord = line['head']
|
|
|
elif label == 'person':
|
|
|
coord = line['person']
|
|
|
else:
|
|
|
coord = line['phone']
|
|
|
split_x, split_y = self.boundary_treat(frame_x, frame_y, coord)
|
|
|
|
|
|
# 裁剪大一圈,固定裁剪范围
|
|
|
frame_split = frame_img[split_y['min']:split_y['max'], split_x['min']:split_x['max']]
|
|
|
|
|
|
target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x,
|
|
|
'split_y': split_y, 'flag': False})
|
|
|
else:
|
|
|
for line in person_coord_list:
|
|
|
coord = line[label]
|
|
|
|
|
|
match_flag = self.target_match(target_list, coord, frame_img, new_target_list)
|
|
|
if not match_flag:
|
|
|
split_x, split_y = self.boundary_treat(frame_x, frame_y, coord)
|
|
|
# 裁剪大一圈,固定裁剪范围
|
|
|
frame_split = frame_img[split_y['min']:split_y['max'], split_x['min']:split_x['max']]
|
|
|
|
|
|
new_target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x,
|
|
|
'split_y': split_y, 'flag': False})
|
|
|
# 判断帧数,生成视频
|
|
|
for target in target_list:
|
|
|
if len(target['frame']) == cfg_dict['video_length']:
|
|
|
frame_w = target['split_x']['max'] - target['split_x']['min']
|
|
|
frame_h = target['split_y']['max'] - target['split_y']['min']
|
|
|
logger.info(f'开始输出视频:{label}')
|
|
|
self.save_video(target['frame'], cfg_dict['fps'], cfg_dict['video_encoding'],
|
|
|
cfg_dict['video_path'][label] + self.camera + str(int(time.time())) + '.mp4v',
|
|
|
(frame_w, frame_h))
|
|
|
logger.info(f'输出视频结束:{label}')
|
|
|
continue
|
|
|
# 过滤中断没有匹配到的目标
|
|
|
if target['flag']:
|
|
|
target['flag'] = False
|
|
|
new_target_list.append(target)
|
|
|
if label == 'person':
|
|
|
self.person_target_list = new_target_list
|
|
|
else:
|
|
|
self.head_target_list = new_target_list
|
|
|
|
|
|
def frame_analysis(self):
|
|
|
video_capture = cv2.VideoCapture(cfg_dict['test_video_path']) # 本地测试用
|
|
|
while True:
|
|
|
result, frame_img = video_capture.read() # 本地测试用
|
|
|
# try:
|
|
|
# frame_img = self.queue_img.get_nowait()
|
|
|
# except Empty:
|
|
|
# time.sleep(0.01)
|
|
|
# continue
|
|
|
|
|
|
new_person_target_list = []
|
|
|
new_head_target_list = []
|
|
|
new_phone_target_list = []
|
|
|
# 调用模型,逐帧检测
|
|
|
# person_coord_list = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['person'],
|
|
|
# confidence=cfg_dict['confidence']['person'])
|
|
|
# head_coord_list = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['head'],
|
|
|
# confidence=cfg_dict['confidence']['head'])
|
|
|
|
|
|
person_coord_list, head_coord_list, phone_coord_list = analysis_yolov8(frame=frame_img,
|
|
|
model_coco=self.yolo_model,
|
|
|
confidence=cfg_dict['confidence'])
|
|
|
|
|
|
frame_y, frame_x, _ = frame_img.shape
|
|
|
logger.debug(f'帧尺寸,y:{frame_y},x:{frame_x}')
|
|
|
self.target_analysis(self.person_target_list, new_person_target_list, person_coord_list,
|
|
|
frame_x, frame_y, frame_img, 'person')
|
|
|
self.target_analysis(self.head_target_list, new_head_target_list, head_coord_list, frame_x,
|
|
|
frame_y, frame_img, 'head')
|
|
|
self.target_analysis(self.phone_target_list, new_phone_target_list, phone_coord_list, frame_x,
|
|
|
frame_y, frame_img, 'phone')
|
|
|
|
|
|
def run(self):
|
|
|
self.frame_analysis()
|
|
|
|
|
|
|
|
|
class ViolationJudgmentSend(Thread):
|
|
|
"""
|
|
|
农商行员工打瞌睡,玩手机,结果分析、发送类
|
|
|
开线程运行
|
|
|
"""
|
|
|
|
|
|
def __int__(self):
|
|
|
super(ViolationJudgmentSend, self).__init__()
|
|
|
self.action_model = cfg_dict['model_path']['action']
|
|
|
|
|
|
def video_analysis_sand(self):
|
|
|
while True:
|
|
|
pass
|
|
|
|
|
|
def run(self):
|
|
|
self.video_analysis_sand()
|
|
|
|
|
|
|
|
|
# 程序启动
|
|
|
def process_run():
|
|
|
logger.info('程序启动')
|
|
|
# 接入监控线程
|
|
|
camera_mul_thread()
|
|
|
# 截取视频线程
|
|
|
frame_to_video_obj = [FrameToVideo(camera) for camera in CAMERA_QUEUE]
|
|
|
for line in frame_to_video_obj:
|
|
|
line.start()
|
|
|
# 发送结果线程
|
|
|
send_obj = ViolationJudgmentSend()
|
|
|
send_obj.start()
|
|
|
send_obj.join()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
fv = FrameToVideo('camera_01')
|
|
|
fv.frame_analysis()
|
|
|
# process_run()
|