xznsh项目中巡检机制

V0.1.0
杜思睿 2 years ago
parent 9890a4834e
commit dcebdb3616

Binary file not shown.

@ -0,0 +1,130 @@
import os
import cv2
import argparse
import numpy as np
class SCRFD():
def __init__(self, onnxmodel, confThreshold=0.5, nmsThreshold=0.5):
self.inpWidth = 640
self.inpHeight = 640
self.confThreshold = confThreshold
self.nmsThreshold = nmsThreshold
self.net = cv2.dnn.readNet(onnxmodel)
self.keep_ratio = True
self.fmc = 3
self._feat_stride_fpn = [8, 16, 32]
self._num_anchors = 2
def resize_image(self, srcimg):
padh, padw, newh, neww = 0, 0, self.inpHeight, self.inpWidth
if self.keep_ratio and srcimg.shape[0] != srcimg.shape[1]:
hw_scale = srcimg.shape[0] / srcimg.shape[1]
if hw_scale > 1:
newh, neww = self.inpHeight, int(self.inpWidth / hw_scale)
img = cv2.resize(srcimg, (neww, newh), interpolation=cv2.INTER_AREA)
padw = int((self.inpWidth - neww) * 0.5)
img = cv2.copyMakeBorder(img, 0, 0, padw, self.inpWidth - neww - padw, cv2.BORDER_CONSTANT,
value=0) # add border
else:
newh, neww = int(self.inpHeight * hw_scale) + 1, self.inpWidth
img = cv2.resize(srcimg, (neww, newh), interpolation=cv2.INTER_AREA)
padh = int((self.inpHeight - newh) * 0.5)
img = cv2.copyMakeBorder(img, padh, self.inpHeight - newh - padh, 0, 0, cv2.BORDER_CONSTANT, value=0)
else:
img = cv2.resize(srcimg, (self.inpWidth, self.inpHeight), interpolation=cv2.INTER_AREA)
return img, newh, neww, padh, padw
def distance2bbox(self, points, distance, max_shape=None):
x1 = points[:, 0] - distance[:, 0]
y1 = points[:, 1] - distance[:, 1]
x2 = points[:, 0] + distance[:, 2]
y2 = points[:, 1] + distance[:, 3]
if max_shape is not None:
x1 = x1.clamp(min=0, max=max_shape[1])
y1 = y1.clamp(min=0, max=max_shape[0])
x2 = x2.clamp(min=0, max=max_shape[1])
y2 = y2.clamp(min=0, max=max_shape[0])
return np.stack([x1, y1, x2, y2], axis=-1)
def distance2kps(self, points, distance, max_shape=None):
preds = []
for i in range(0, distance.shape[1], 2):
px = points[:, i % 2] + distance[:, i]
py = points[:, i % 2 + 1] + distance[:, i + 1]
if max_shape is not None:
px = px.clamp(min=0, max=max_shape[1])
py = py.clamp(min=0, max=max_shape[0])
preds.append(px)
preds.append(py)
return np.stack(preds, axis=-1)
def detect(self, srcimg,face_flag,count):
img, newh, neww, padh, padw = self.resize_image(srcimg)
blob = cv2.dnn.blobFromImage(img, 1.0 / 128, (self.inpWidth, self.inpHeight), (127.5, 127.5, 127.5), swapRB=True)
# Sets the input to the network
self.net.setInput(blob)
# Runs the forward pass to get output of the output layers
outs = self.net.forward(self.net.getUnconnectedOutLayersNames())
# inference output
scores_list, bboxes_list, kpss_list = [], [], []
for idx, stride in enumerate(self._feat_stride_fpn):
scores = outs[idx * self.fmc][0]
bbox_preds = outs[idx * self.fmc + 1][0] * stride
kps_preds = outs[idx * self.fmc + 2][0] * stride
height = blob.shape[2] // stride
width = blob.shape[3] // stride
anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32)
anchor_centers = (anchor_centers * stride).reshape((-1, 2))
if self._num_anchors > 1:
anchor_centers = np.stack([anchor_centers] * self._num_anchors, axis=1).reshape((-1, 2))
pos_inds = np.where(scores >= self.confThreshold)[0]
bboxes = self.distance2bbox(anchor_centers, bbox_preds)
pos_scores = scores[pos_inds]
pos_bboxes = bboxes[pos_inds]
scores_list.append(pos_scores)
bboxes_list.append(pos_bboxes)
kpss = self.distance2kps(anchor_centers, kps_preds)
# kpss = kps_preds
kpss = kpss.reshape((kpss.shape[0], -1, 2))
pos_kpss = kpss[pos_inds]
kpss_list.append(pos_kpss)
scores = np.vstack(scores_list).ravel()
# bboxes = np.vstack(bboxes_list) / det_scale
# kpss = np.vstack(kpss_list) / det_scale
bboxes = np.vstack(bboxes_list)
kpss = np.vstack(kpss_list)
bboxes[:, 2:4] = bboxes[:, 2:4] - bboxes[:, 0:2]
ratioh, ratiow = srcimg.shape[0] / newh, srcimg.shape[1] / neww
bboxes[:, 0] = (bboxes[:, 0] - padw) * ratiow
bboxes[:, 1] = (bboxes[:, 1] - padh) * ratioh
bboxes[:, 2] = bboxes[:, 2] * ratiow
bboxes[:, 3] = bboxes[:, 3] * ratioh
kpss[:, :, 0] = (kpss[:, :, 0] - padw) * ratiow
kpss[:, :, 1] = (kpss[:, :, 1] - padh) * ratioh
indices = cv2.dnn.NMSBoxes(bboxes.tolist(), scores.tolist(), self.confThreshold, self.nmsThreshold)
# 根据阈值拦截后的人脸框元组
# print("indices", indices)
if indices:
face_flag["face"] += 1
face_flag["frame"].append(count)
# print("frame", count)
for i in indices:
i = i[0]
# 这个就是xmin, ymin, xamx, ymax (人脸框
xmin, ymin, xamx, ymax = int(bboxes[i, 0]), int(bboxes[i, 1]), int(bboxes[i, 0] + bboxes[i, 2]), int(bboxes[i, 1] + bboxes[i, 3])
cv2.rectangle(srcimg, (xmin, ymin), (xamx, ymax), (0, 0, 255), thickness=2)
for j in range(5):
cv2.circle(srcimg, (int(kpss[i, j, 0]), int(kpss[i, j, 1])), 1, (0,255,0), thickness=-1)
cv2.putText(srcimg, str(round(scores[i], 3)), (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), thickness=1)
return srcimg
def face_detection(onnxmodel,image,crop_image,x1,y1,x2,y2,face_flag,count):
# onnxmodel = r'E:\pythonProject\xznsh\scrfd-opencv\weights\scrfd_10g_kps.onnx'
mynet = SCRFD(onnxmodel, confThreshold=0.8, nmsThreshold=0.5)
outimg= mynet.detect(crop_image,face_flag,count)
image[y1:y2,x1:x2] = outimg
# cv2.imshow("img",image)
# cv2.waitKey(0)
# cv2.imwrite(os.path.join(outpath,file), outimg)
return image

@ -0,0 +1,130 @@
import cv2
import time
from tqdm import tqdm
from ultralytics import YOLO
from ultralytics.yolo.utils.plotting import Annotator
from d_face import face_detection
# from dec_duration import detect_duration
# model_police = YOLO('models/police0508.pt')
# model_electromobile = YOLO('models/electromobile0509.pt')
# model_coco = YOLO(r"E:\pythonProject\xznsh\scrfd-opencv\weights\best20230606.pt")
# action_model = YOLO(r'E:\pythonProject\xznsh\scrfd-opencv\weights\action_recognition.pt')
def analysis_video(source_path,output_path,people_modle_path,face_modle_path,action_modle_path):
# def analysis_video(source_path, output_path):
model_coco = YOLO(people_modle_path)
action_model = YOLO(action_modle_path)
# start_time = time.time()
cap = cv2.VideoCapture(source_path)
# 直接从视频的第 frameToStart 帧开始
frameToStart = 1430
cap.set(cv2.CAP_PROP_POS_FRAMES, frameToStart)
# total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# tqdm_desc = 'Processing video frames'
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_movie = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if frameToStart != 0:
count = frameToStart
else:
count = 0
# 标记有没有出现过拉门的动作
action_flag = {
"action":0,
"action_frame":[]
}
# 标记有没有出现过人脸
face_flag = {
"face":0,
"frame":[]
}
# 定义帧数字典
XJ_dict = {
"head":0,
"tail":0
}
# with tqdm(total=total_frames, desc=tqdm_desc) as pbar:
while cap.isOpened():
# Read a frame from the video
success, frame = cap.read()
# cv2.imshow("f",frame)
# cv2.waitKey(0)
count += 1
if success:
# 第一步用COCO数据集推理
results_coco = model_coco(frame)
action_result = action_model(frame)
# print("*"*100, results_coco.masks)
for r in results_coco:
annotator = Annotator(frame, line_width=1)
# print("*"*100, r.boxes)
boxes = r.boxes
for box in boxes:
b = box.xyxy[0] # get box coordinates in (x1,y1,x2,y2) format #tensor([ 677.5757, 147.2737, 1182.3381, 707.2565])
b_i = b.int() + 1
c = box.cls # tensor([0.])
confidence = float(box.conf)
confidence = round(confidence, 2)
# 过滤置信度0.5以下目标
if confidence < 0.5:
continue
if c.int() == 1:
if XJ_dict['head'] == 0 :
XJ_dict['head'] = count
else:
XJ_dict['tail'] = count
# print ("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!巡检人员")
crop_img = frame[b_i[1]:b_i[3],b_i[0]:b_i[2]]
frame = face_detection(face_modle_path,frame,crop_img,b_i[0],b_i[1],b_i[2],b_i[3],face_flag,count)
# annotator = Annotator(frame, line_width=1)
annotator.box_label(b, model_coco.names[int(c)]+str(confidence),(0,0,255))
for r_a in action_result:
annotator_a = Annotator(frame, line_width=1)
# print("*"*100, r.boxes)
boxes_a = r_a.boxes
if boxes_a:
action_flag["action"] += 1
action_flag["action_frame"].append(count)
for box_a in boxes_a:
b_a = box_a.xyxy[0] # get box coordinates in (x1,y1,x2,y2) format #tensor([ 677.5757, 147.2737, 1182.3381, 707.2565])
# b_i_a = b_a.int() + 1
c_a = box_a.cls # tensor([0.])
confidence_a = float(box_a.conf)
confidence_a = round(confidence_a, 2)
# 过滤置信度0.5以下目标
if confidence_a < 0.5:
continue
# annotator = Annotator(frame, line_width=1)
annotator_a.box_label(b_a, action_model.names[int(c_a)]+str(confidence_a),(255,0,0))
annotated_frame_coco = annotator.result()
annotated_a_frame_coco = annotator_a.result()
output_movie.write(annotated_a_frame_coco)
# pbar.update(1)
else:
# Break the loop if the end of the video is reached
break
cap.release()
output_movie.release()
diff = round((XJ_dict["tail"]-XJ_dict["head"])/fps,2)
fina_frame = [round(_ /fps,2) for _ in face_flag["frame"]]
s = ', '.join(map(str, fina_frame))
action_frame = [round(_ /fps,2) for _ in action_flag["action_frame"]]
s_action = ', '.join(map(str, action_frame))
return diff,face_flag,s,action_flag,s_action
# if __name__ == '__main__':
# analysis_video(r"E:\pythonProject\xznsh\scrfd-opencv\data\xj_video.mp4", r"E:\pythonProject\xznsh\scrfd-opencv\result\video.mp4")

@ -0,0 +1,20 @@
from d_people import analysis_video
# 原视频文件路径
video_path = r"xznsh\scrfd-opencv\xznsh_paddle_scrfd\data\xj_video.mp4"
# result存放路径
result_video_path = r"xznsh\scrfd-opencv\xznsh_paddle_scrfd\result\video.mp4"
# 使用目标检测巡检人员模型 best20230606.pt
people_modle_path = r"xznsh\scrfd-opencv\xznsh_paddle_scrfd\weights\people_modle.pt"
# 人脸检测模型 使用paddle_scrfd模型中的scrfd_10g_kps.onnx
face_modle_path = r'xznsh\scrfd-opencv\xznsh_paddle_scrfd\weights\face_model.onnx'
# 使用拉门模型
action_modle_path = r'xznsh\scrfd-opencv\xznsh_paddle_scrfd\weights\action_recognition.pt'
diff_xj,face_flag_xj,face_time,action_frame_xj,action_s_xj = analysis_video(video_path,result_video_path,people_modle_path,face_modle_path,action_modle_path)
print("这个视频中巡检人员实际停留的时间为:{}".format(diff_xj))
print("视频中出现过{}次巡检员环视的动作,分别位于视频第{}".format(face_flag_xj["face"],face_time))
print("视频中出现过{}次巡检员巡检ATM机的动作,分别位于视频第{}".format(action_frame_xj["action"],action_s_xj))
Loading…
Cancel
Save