You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

313 lines
13 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import cv2
import numpy as np
import paddlehub as hub
from paddlehub.common.logger import logger
import time
import math
import os
class HeadPostEstimation(object):
"""
头部姿态识别
"""
NOD_ACTION = 1
SHAKE_ACTION = 2
def __init__(self, face_detector=None):
self.module = hub.Module(name="face_landmark_localization", face_detector_module=face_detector)
# 头部3D关键点坐标
self.model_points = np.array([
[6.825897, 6.760612, 4.402142],
[1.330353, 7.122144, 6.903745],
[-1.330353, 7.122144, 6.903745],
[-6.825897, 6.760612, 4.402142],
[5.311432, 5.485328, 3.987654],
[1.789930, 5.393625, 4.413414],
[-1.789930, 5.393625, 4.413414],
[-5.311432, 5.485328, 3.987654],
[2.005628, 1.409845, 6.165652],
[-2.005628, 1.409845, 6.165652],
[2.774015, -2.080775, 5.048531],
[-2.774015, -2.080775, 5.048531],
[0.000000, -3.116408, 6.097667],
[0.000000, -7.415691, 4.070434],
[-7.308957, 0.913869, 0.000000],
[7.308957, 0.913869, 0.000000],
[0.746313,0.348381,6.263227],
[0.000000,0.000000,6.763430],
[-0.746313,0.348381,6.263227],
], dtype='float')
# 点头动作index是0 摇头动作index是1
# 当连续30帧上下点头动作幅度超过15度时认为发生了点头动作
# 当连续30帧上下点头动作幅度超过45度时认为发生了摇头动作由于摇头动作较为敏感故所需幅度更大
self._index_action = {0:'nod', 1:'shake'}
self._frame_window_size = 15
self._pose_threshold = {0: 15/180 * math.pi,
1: 45/180 * math.pi}
# 头部3D投影点
self.reprojectsrc = np.float32([
[10.0, 10.0, 10.0],
[10.0, 10.0, -10.0],
[10.0, -10.0, -10.0],
[10.0, -10.0, 10.0],
[-10.0, 10.0, 10.0],
[-10.0, 10.0, -10.0],
[-10.0, -10.0, -10.0],
[-10.0, -10.0, 10.0]])
# 头部3D投影点连线
self.line_pairs = [
[0, 1], [1, 2], [2, 3], [3, 0],
[4, 5], [5, 6], [6, 7], [7, 4],
[0, 4], [1, 5], [2, 6], [3, 7]
]
@property
def frame_window_size(self):
return self._frame_window_size
@frame_window_size.setter
def frame_window_size(self, value):
assert isinstance(value, int)
self._frame_window_size = value
@property
def pose_threshold(self):
return self._pose_threshold
@pose_threshold.setter
def pose_threshold(self, dict_value):
assert list(dict_value.keys()) == [0,1,2]
self._pose_threshold = dict_value
def get_face_landmark(self, image):
"""
预测人脸的68个关键点坐标
images(ndarray): 单张图片的像素数据
"""
try:
# 选择GPU运行use_gpu=True并且在运行整个教程代码之前设置CUDA_VISIBLE_DEVICES环境变量
res = self.module.keypoint_detection(images=[image], use_gpu=False)
return True, res[0]['data'][0]
except Exception as e:
logger.error("Get face landmark localization failed! Exception: %s " % e)
return False, None
def get_image_points_from_landmark(self, face_landmark):
"""
从face_landmark_localization的检测结果抽取姿态估计需要的点坐标
"""
image_points = np.array([
face_landmark[17], face_landmark[21],
face_landmark[22], face_landmark[26],
face_landmark[36], face_landmark[39],
face_landmark[42], face_landmark[45],
face_landmark[31], face_landmark[35],
face_landmark[48], face_landmark[54],
face_landmark[57], face_landmark[8],
face_landmark[14], face_landmark[2],
face_landmark[32], face_landmark[33],
face_landmark[34],
], dtype='float')
return image_points
def caculate_pose_vector(self, image_points):
"""
获取旋转向量和平移向量
"""
# 相机视角
center = (self.img_size[1]/2, self.img_size[0]/2)
focal_length = center[0] / np.tan(60/ 2 * np.pi / 180)
camera_matrix = np.array([
[focal_length, 0, center[0]],
[0, focal_length, center[1]],
[0, 0, 1]],
dtype = "float")
# 假设没有畸变
dist_coeffs = np.zeros((4,1))
success, rotation_vector, translation_vector= cv2.solvePnP(self.model_points,
image_points,
camera_matrix,
dist_coeffs)
reprojectdst, _ = cv2.projectPoints(self.reprojectsrc, rotation_vector, translation_vector, camera_matrix, dist_coeffs)
return success, rotation_vector, translation_vector, camera_matrix, dist_coeffs, reprojectdst
def caculate_euler_angle(self, rotation_vector, translation_vector):
"""
将旋转向量转换为欧拉角
"""
rvec_matrix = cv2.Rodrigues(rotation_vector)[0]
proj_matrix = np.hstack((rvec_matrix, translation_vector))
euler_angles = cv2.decomposeProjectionMatrix(proj_matrix)[6]
pitch, yaw, roll = [math.radians(_) for _ in euler_angles]
return pitch, yaw, roll
def classify_pose_in_euler_angles(self, video, poses=3):
"""
根据欧拉角分类头部姿态(点头nod/摇头shake)
video 表示不断产生图片的生成器
pose=1 表示识别点头动作
pose=2 表示识别摇头动作
pose=3 表示识别点头和摇头动作
"""
frames_euler = []
self.nod_time = self.totate_time = self.shake_time = time.time()
self.action_time = 0
index_action ={0:[self.NOD_ACTION], 1:[self.SHAKE_ACTION]}
for index, img in enumerate(video(), start=1):
self.img_size = img.shape
success, face_landmark = self.get_face_landmark(img)
for i, action in enumerate(index_action):
if i == 0:
index_action[action].append((20, int(self.img_size[0]/2 + 110)))
elif i == 1:
index_action[action].append((120, int(self.img_size[0]/2 + 110)))
if not success:
logger.info("Get face landmark localization failed! Please check your image!")
continue
image_points = self.get_image_points_from_landmark(face_landmark)
success, rotation_vector, translation_vector, camera_matrix, dist_coeffs, reprojectdst = self.caculate_pose_vector(image_points)
if not success:
logger.info("Get rotation and translation vectors failed!")
continue
# 画出投影正方体
alpha=0.3
if not hasattr(self, 'before'):
self.before = reprojectdst
else:
reprojectdst = alpha * self.before + (1-alpha)* reprojectdst
reprojectdst = tuple(map(tuple, reprojectdst.reshape(8, 2)))
for start, end in self.line_pairs:
# cv2.line(img, reprojectdst[start], reprojectdst[end], (0, 0, 255))
cv2.line(img, (int(reprojectdst[start][0]),int(reprojectdst[start][1])), (int(reprojectdst[end][0]),int(reprojectdst[end][1])), (0, 0, 255))
# 计算头部欧拉角
pitch, yaw, roll = self.caculate_euler_angle(rotation_vector, translation_vector)
cv2.putText(img, "pitch: " + "{:7.2f}".format(pitch), (20, int(self.img_size[0]/2 -10)), cv2.FONT_HERSHEY_SIMPLEX,
0.75, (0, 0, 255), thickness=2)
cv2.putText(img, "yaw: " + "{:7.2f}".format(yaw), (20, int(self.img_size[0]/2 + 30) ), cv2.FONT_HERSHEY_SIMPLEX,
0.75, (0, 0, 255), thickness=2)
cv2.putText(img, "roll: " + "{:7.2f}".format(roll), (20, int(self.img_size[0]/2 +70)), cv2.FONT_HERSHEY_SIMPLEX,
0.75, (0, 0, 255), thickness=2)
for index, action in enumerate(index_action):
cv2.putText(img, "{}".format(self._index_action[action]), index_action[action][1],
cv2.FONT_HERSHEY_SIMPLEX, 0.75, (50, 50, 50), thickness=2)
frames_euler.append([index, img, pitch, yaw, roll])
# 转换成摄像头可显示的格式
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
final_action = None
if len(frames_euler) > self.frame_window_size:
# 比较当前头部动作欧拉角与过去的欧拉角,只有动作幅度幅度超过阈值,则判定发生相应的动作
# picth值用来判断点头动作
# yaw值用来判断摇头动作
current = [pitch, yaw, roll]
tmp = [abs(pitch), abs(yaw)]
max_index = tmp.index(max(tmp))
max_probability_action = index_action[max_index][0]
for start_idx, start_img, p, y, r in frames_euler[0:int(self.frame_window_size/2)]:
start = [p, y, r]
if poses & max_probability_action and abs(start[max_index]-current[max_index]) >= self.pose_threshold[max_index]:
frames_euler = []
final_action = max_index
self.action_time = time.time()
yield {self._index_action[max_index]: [(start_idx, start_img), (index, img)]}
break
else:
# 丢弃过时的视频帧
frames_euler.pop(0)
# 动作判定发生则高亮显示0.5s
if self.action_time !=0 and time.time() - self.action_time < 0.5:
cv2.putText(img_rgb, "{}".format(self._index_action[max_index]), index_action[max_index][1],
cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), thickness=2)
# 本地显示预测视频框AIStudio项目不支持显示视频框
cv2.imshow('Pose Estimation', img_rgb)
# 写入预测结果
video_writer.write(img_rgb)
class MyFaceDetector(object):
"""
自定义人脸检测器
基于PaddleHub人脸检测模型ultra_light_fast_generic_face_detector_1mb_640加强稳定人脸检测框
"""
def __init__(self):
self.module = hub.Module(name="ultra_light_fast_generic_face_detector_1mb_640")
self.alpha = 0.75
self.start_flag =1
def face_detection(self,images, use_gpu=False, visualization=False):
# 使用GPU运行use_gpu=True并且在运行整个教程代码之前设置CUDA_VISIBLE_DEVICES环境变量
result = self.module.face_detection(images=images, use_gpu=use_gpu, visualization=visualization)
if not result[0]['data']:
return result
face = result[0]['data'][0]
if self.start_flag == 1:
self.left_s = result[0]['data'][0]['left']
self.right_s = result[0]['data'][0]['right']
self.top_s = result[0]['data'][0]['top']
self.bottom_s = result[0]['data'][0]['bottom']
self.start_flag=0
else:
# 加权平均上一帧和当前帧人脸检测框位置,以稳定人脸检测框
self.left_s = self.alpha * self.left_s + (1-self.alpha) * face['left']
self.right_s = self.alpha * self.right_s + (1-self.alpha) * face['right']
self.top_s = self.alpha * self.top_s + (1-self.alpha) * face['top']
self.bottom_s = self.alpha * self.bottom_s + (1-self.alpha) * face['bottom']
result[0]['data'][0]['left'] = self.left_s
result[0]['data'][0]['right'] = self.right_s
result[0]['data'][0]['top'] = self.top_s
result[0]['data'][0]['bottom'] = self.bottom_s
return result
# 定义人脸检测器
face_detector = MyFaceDetector()
# 打开摄像头
# capture = cv2.VideoCapture(0)
# capture = cv2.VideoCapture('./test_sample.mov')
capture = cv2.VideoCapture('cam1_1.mp4')
fps = capture.get(cv2.CAP_PROP_FPS)
size = (int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
# 将预测结果写成视频
video_writer = cv2.VideoWriter('result_cam1_1.mp4', cv2.VideoWriter_fourcc(*'mp4v'), fps, size)
def generate_image():
while True:
# frame_rgb即视频的一帧数据
ret, frame_rgb = capture.read()
if ret == True:
# 显示图像
cv2.imshow('frame',frame_rgb)
# 按q键即可退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if frame_rgb is None:
break
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
yield frame_bgr
capture.release()
video_writer.release()
cv2.destroyAllWindows()
head_post = HeadPostEstimation(face_detector)
for res in head_post.classify_pose_in_euler_angles(video=generate_image, poses=HeadPostEstimation.NOD_ACTION | HeadPostEstimation.SHAKE_ACTION):
print(list(res.keys()))