person head 截取视频

V0.1.0
zhouyang 2 years ago
parent acf7db79c5
commit ad4dcf11e7

@ -1 +1 @@
{"log_path": "xznsh.log", "frame": 0.05, "camera": {"camera_01": "rtsp://admin:@192.168.10.18", "camera_02": "rtsp://admin:@192.168.10.12"}, "model_path": {"person": "person.pt", "head": "xxx", "phone": "xxx"}, "confidence": {"person": 0.5, "head": 0.5, "phone": 0.5}, "video_encoding": "MP42", "video_path": "xxx", "username": "eWF4aW4=", "password": "eWF4aW5AMTIz", "url": "", "video_length": 150, "error_x": 10, "error_y": 10, "fps": 30}
{"log_path": "xznsh.log", "frame": 0.05, "camera": {"camera_01": "rtsp://admin:@192.168.10.18", "camera_02": "rtsp://admin:@192.168.10.12"}, "video_path": {"person": "video/person/", "head": "video/head/"}, "username": "eWF4aW4=", "password": "eWF4aW5AMTIz", "url": "", "model_path": {"person": "person.pt", "head": "person_cut_hh.pt", "phone": "xxx", "action": "xxx"}, "confidence": {"person": 0.5, "head": 0.5, "phone": 0.5}, "video_encoding": "MP42", "video_length": 50, "error_x": 200, "error_y": 200, "fps": 30, "test_video_path": "0711-4.mp4"}

@ -9,7 +9,6 @@ from log import logger
from ultralytics import YOLO
from yolov8_det import analysis_yolov8
from capture_queue import CAMERA_QUEUE, camera_mul_thread
from cal_utils import *
with open('cfg.json', 'r') as f:
cfg_dict = json.load(f)
@ -20,15 +19,14 @@ class FrameToVideo(Thread):
农商行员工打瞌睡玩手机分析类
"""
def __int__(self, camera_name):
def __init__(self, camera_name):
super(FrameToVideo, self).__init__()
self.camera = camera_name
self.queue_img = CAMERA_QUEUE[camera_name]
# self.queue_img = CAMERA_QUEUE[camera_name]
self.yolo_model = {'person': YOLO(cfg_dict['model_path']['person']),
'head': YOLO(cfg_dict['model_path']['head']),
'phone': YOLO(cfg_dict['model_path']['phone'])}
self.target_list = []
# self.count = 0
'head': YOLO(cfg_dict['model_path']['head'])}
self.person_target_list = []
self.head_target_list = []
@staticmethod
def save_video(frames, fps, fourcc, video_path, w_h_size):
@ -40,13 +38,30 @@ class FrameToVideo(Thread):
video_write_obj = cv2.VideoWriter(video_path, video_fourcc, fps, w_h_size)
for frame in frames:
video_write_obj.write(frame)
logger.info(f'生成视频:{video_path}')
def target_match(self, coord, frame_img, new_target_list):
@staticmethod
def boundary_treat(frame_x, frame_y, coord):
"""
边界处理裁剪可能会超出范围
"""
y_min = coord[1] - cfg_dict['error_y']
y_max = coord[3] + cfg_dict['error_y']
x_min = coord[0] - cfg_dict['error_x']
x_max = coord[2] + cfg_dict['error_x']
split_y = {'min': int(y_min) if y_min >= 0 else 0,
'max': int(y_max) if y_max <= frame_y else frame_y}
split_x = {'min': int(x_min) if y_min >= 0 else 0,
'max': int(x_max) if x_max <= frame_x else frame_x}
return split_x, split_y
@staticmethod
def target_match(target_list, coord, frame_img, new_target_list):
"""
遍历目标进行匹配
"""
match_flag = False
for target in self.target_list:
for target in target_list:
if target['flag']:
continue
if all([abs(coord[n] - target['coord'][n]) <= 5 for n in range(4)]): # 误差判断
@ -62,108 +77,108 @@ class FrameToVideo(Thread):
continue
return match_flag
def frame_analysis(self):
while True:
try:
frame_img = self.queue_img.get_nowait()
except Empty:
time.sleep(0.01)
continue
def target_analysis(self, target_list, new_target_list, person_coord_list, frame_x, frame_y, frame_img, label):
if not target_list:
for line in person_coord_list:
coord = line['person']
split_x, split_y = self.boundary_treat(frame_x, frame_y, coord)
new_target_list = []
# 裁剪大一圈,固定裁剪范围
frame_split = frame_img[split_y['min']:split_y['max'], split_x['min']:split_x['max']]
# 调用模型,逐帧检测
person_coord_list = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['person'],
confidence=cfg_dict['confidence']['person'])
target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x,
'split_y': split_y, 'flag': False})
else:
for line in person_coord_list:
coord = line['person']
if not self.target_list:
for line in range(len(person_coord_list)):
# site = str(line)
coord = person_coord_list[line]['person']
split_y = {'min': int(coord[1] - cfg_dict['error_y']), 'max': int(coord[3] + cfg_dict['error_y'])}
split_x = {'min': int(coord[0] - cfg_dict['error_x']), 'max': int(coord[2] + cfg_dict['error_x'])}
match_flag = self.target_match(target_list, coord, frame_img, new_target_list)
if not match_flag:
split_x, split_y = self.boundary_treat(frame_x, frame_y, coord)
# 裁剪大一圈,固定裁剪范围
frame_split = frame_img[split_y['min']:split_y['max'], split_x['min']:split_x['max']]
self.target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x,
'split_y': split_y, 'flag': False})
else:
for line in person_coord_list:
coord = line['person']
match_flag = self.target_match(coord, frame_img, new_target_list)
if not match_flag:
split_y = {'min': int(coord[1] - cfg_dict['error_y']),
'max': int(coord[3] + cfg_dict['error_y'])}
split_x = {'min': int(coord[0] - cfg_dict['error_x']),
'max': int(coord[2] + cfg_dict['error_x'])}
# 裁剪大一圈,固定裁剪范围
frame_split = frame_img[split_y['min']:split_y['max'], split_x['min']:split_x['max']]
new_target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x,
'split_y': split_y, 'flag': False})
# 判断帧数,生成视频
for target in self.target_list:
if len(target['frame']) == cfg_dict['video_length']:
frame_w = target['split_x']['max'] - target['split_x']['min']
frame_h = target['split_y']['max'] - target['split_y']['min']
self.save_video(target['frame'], cfg_dict['fps'], cfg_dict['video_encoding'],
cfg_dict['video_path'], (frame_w, frame_h))
continue
# 过滤中断没有匹配到的目标
if target['flag']:
target['flag'] = False
new_target_list.append(target)
self.target_list = new_target_list
# 目标清理,新增目标
# head_img = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['head'],
# confidence=cfg_dict['confidence']['head'])
# phone_img = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['phone'],
# confidence=cfg_dict['confidence']['phone'])
# todo 根据逻辑生成视频,这部分逻辑
new_target_list.append({'frame': [frame_split], 'coord': coord, 'count': 0, 'split_x': split_x,
'split_y': split_y, 'flag': False})
# 判断帧数,生成视频
for target in target_list:
if len(target['frame']) == cfg_dict['video_length']:
print(target)
frame_w = target['split_x']['max'] - target['split_x']['min']
frame_h = target['split_y']['max'] - target['split_y']['min']
self.save_video(target['frame'], cfg_dict['fps'], cfg_dict['video_encoding'],
cfg_dict['video_path'][label] + self.camera + str(int(time.time())) + '.mp4v',
(frame_w, frame_h))
continue
# 过滤中断没有匹配到的目标
if target['flag']:
target['flag'] = False
new_target_list.append(target)
def frame_analysis(self):
video_caputre = cv2.VideoCapture(cfg_dict['test_video_path']) # 本地测试用
while True:
result, frame_img = video_caputre.read() # 本地测试用
# try:
# frame_img = self.queue_img.get_nowait()
# except Empty:
# time.sleep(0.01)
# continue
new_person_target_list = []
new_head_target_list = []
# 调用模型,逐帧检测
person_coord_list = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['person'],
confidence=cfg_dict['confidence']['person'])
head_coord_list = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['head'],
confidence=cfg_dict['confidence']['head'])
frame_y, frame_x, _ = frame_img.shape
self.target_analysis(self.person_target_list, new_person_target_list, person_coord_list,
frame_x, frame_y, frame_img, 'person')
self.target_analysis(self.head_target_list, new_head_target_list, head_coord_list, frame_x,
frame_y, frame_img, 'head')
self.person_target_list = new_person_target_list
self.head_target_list = new_head_target_list
def run(self):
pass
self.frame_analysis()
class ViolationJudgmentSend(Thread):
"""
农商行员工打瞌睡玩手机分析类
农商行员工打瞌睡玩手机结果分析发送类
开线程运行
"""
def __int__(self, camera_name):
def __int__(self):
super(ViolationJudgmentSend, self).__init__()
self.video_path = camera_name
self.action_model = {'person': YOLO(cfg_dict['model_path']['person']),
'head': YOLO(cfg_dict['model_path']['head']),
'phone': YOLO(cfg_dict['model_path']['phone'])}
self.person_coord_list = []
self.action_model = cfg_dict['model_path']['action']
@staticmethod
def save_video(frame_split, fps, fourcc, video_path, w_h_size):
"""
截取后图像保存视频
"""
encoding = list(fourcc)
video_fourcc = cv2.VideoWriter_fourcc(encoding[0], encoding[1], encoding[2], encoding[3])
video_write_obj = cv2.VideoWriter(video_path, video_fourcc, fps, w_h_size)
video_write_obj.write(frame_split)
def video_analysis(self):
def video_analysis_sand(self):
while True:
pass
def run(self):
self.video_analysis()
self.video_analysis_sand()
# 程序启动
def process_run():
camera_mul_thread()
logger.info('程序启动')
# todo 分析流程
# 接入监控线程
camera_mul_thread()
# 截取视频线程
frame_to_video_obj = [FrameToVideo(camera) for camera in CAMERA_QUEUE]
for line in frame_to_video_obj:
line.start()
# 发送结果线程
send_obj = ViolationJudgmentSend()
send_obj.start()
send_obj.join()
if __name__ == '__main__':
process_run()
fv = FrameToVideo('camera_01')
fv.frame_analysis()
# process_run()

Loading…
Cancel
Save