XZNSH-Code-AI/xznsh_flow/capture_queue.py

86 lines
2.0 KiB
Python

import time
import json
import cv2
from queue import Queue, Full
from threading import Thread
from log import logger
with open('cfg.json', 'r') as f:
cfg_dict = json.load(f)
CAMERA_QUEUE = {camera_name: Queue(1) for camera_name in cfg_dict['camera']}
def camera_start(camera_name, camera_source):
"""
启动摄像头
Args:
camera_name:
camera_source:
Returns:
"""
for line in range(1, 4):
try:
capture_obj = cv2.VideoCapture(camera_source)
result, _ = capture_obj.read()
if not result:
raise Exception
except Exception as e:
logger.error(f'{camera_name} {camera_source}:第{line}次启动失败')
continue
logger.info(f'{camera_name} {camera_source}:第{line}次启动成功')
return True, capture_obj
return False, None
def camera_add_queue(camera_name, camera_source, camera_queue):
"""
启动摄像头,添加帧到队列
Args:
camera_name:
camera_source:
camera_queue:
Returns:
"""
logger.info(f'{camera_name}线程启动')
start_flag, capture_obj = camera_start(camera_name, camera_source)
if not start_flag:
return
logger.info(f'开始添加帧到队列')
while True:
result, frame_picture = capture_obj.read()
if not result:
continue
try:
camera_queue.put_nowait(frame_picture)
except Full:
time.sleep(0.01)
continue
def camera_mul_thread():
"""
摄像头对应拉起线程
Returns:
"""
thread_pool = []
for camera_name in cfg_dict['camera']:
camera_th = Thread(target=camera_add_queue,
args=(camera_name, cfg_dict['camera']['camera_name'], CAMERA_QUEUE[camera_name]))
thread_pool.append(camera_th)
for camera_thread in thread_pool:
camera_thread.start()
if __name__ == '__main__':
camera_mul_thread()