发送视频方法

V0.1.0
zhouyang 2 years ago
parent ca989dfffd
commit 05acbff6a3

@ -1 +1 @@
{"log_path": "xznsh.log", "frame": 0.05, "camera": {"camera_01": "rtsp://admin:@192.168.10.18", "camera_02": "rtsp://admin:@192.168.10.12"}, "model_path": {"person": "person.pt", "head": "xxx", "phone": "xxx"}, "confidence": {"person": 0.5, "head": 0.5, "phone": 0.5}} {"log_path": "xznsh.log", "frame": 0.05, "camera": {"camera_01": "rtsp://admin:@192.168.10.18", "camera_02": "rtsp://admin:@192.168.10.12"}, "model_path": {"person": "person.pt", "head": "xxx", "phone": "xxx"}, "confidence": {"person": 0.5, "head": 0.5, "phone": 0.5}, "video_encoding": "MP42", "video_path": {"person": "xxx", "head": "xxx", "phone": "xxx"}, "username": "eWF4aW4=", "password": "eWF4aW5AMTIz", "url": ""}

@ -0,0 +1,40 @@
import json
import requests
import base64
with open('cfg.json', 'r') as f:
cfg_dict = json.load(f)
def send_result(file_path, url):
"""
发送视频
Args:
file_path:
url:
Returns:
"""
username = cfg_dict['username']
password = cfg_dict['password']
headers = {'Content-type': 'video/mp4', 'Authorization': 'Basic ' + f'<{username}>:<{password}>'}
files = {'file': open(file_path, 'rb')}
response = requests.post(url, files=files, headers=headers)
return response
def encode_info(info):
"""
加密
"""
encoded_info = base64.b64encode(info.encode('utf-8')).decode("utf-8")
return encoded_info
def decode_info(encoded_info):
"""
解密
"""
info = base64.b64decode(encoded_info.encode('utf-8')).decode('utf-8')
return info

@ -1,6 +1,6 @@
import os import os
import time import time
import cv2
import json import json
from queue import Queue, Empty from queue import Queue, Empty
from threading import Thread from threading import Thread
@ -13,19 +13,29 @@ with open('cfg.json', 'r') as f:
cfg_dict = json.load(f) cfg_dict = json.load(f)
class ModelInvoke(Thread): class ViolationJudgment(Thread):
""" """
农商行员工打瞌睡玩手机分析类 农商行员工打瞌睡玩手机分析类
""" """
def __int__(self, camera_name): def __int__(self, camera_name):
super(ModelInvoke, self).__init__() super(ViolationJudgment, self).__init__()
self.camera = camera_name self.camera = camera_name
self.queue_img = CAMERA_QUEUE[camera_name] self.queue_img = CAMERA_QUEUE[camera_name]
self.yolo_model = {'person': YOLO(cfg_dict['model_path']['person']), self.yolo_model = {'person': YOLO(cfg_dict['model_path']['person']),
'head': YOLO(cfg_dict['model_path']['head']), 'head': YOLO(cfg_dict['model_path']['head']),
'phone': YOLO(cfg_dict['model_path']['phone'])} 'phone': YOLO(cfg_dict['model_path']['phone'])}
@staticmethod
def save_video(frame_split, fps, fourcc, video_path, w_h_size):
"""
截取后图像保存视频
"""
encoding = list(fourcc)
video_fourcc = cv2.VideoWriter_fourcc(encoding[0], encoding[1], encoding[2], encoding[3])
video_write_obj = cv2.VideoWriter(video_path, video_fourcc, fps, w_h_size)
video_write_obj.write(frame_split)
def frame_analysis(self): def frame_analysis(self):
while True: while True:
try: try:
@ -37,13 +47,7 @@ class ModelInvoke(Thread):
# 调用模型,逐帧检测 # 调用模型,逐帧检测
results_img = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['person'], results_img = analysis_yolov8(frame=frame_img, model_coco=self.yolo_model['person'],
confidence=cfg_dict['confidence']['person']) confidence=cfg_dict['confidence']['person'])
# todo 根据逻辑生成视频,这部分逻辑
# try:
# self.process_frame(y_frame)
# except Exception:
# logger.exception(f"{self.name}出错")
# time.sleep(0.1)
# return
def run(self): def run(self):
pass pass

Loading…
Cancel
Save