You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

308 lines
8.9 KiB
Python

import cv2
from tqdm import tqdm
from ultralytics import YOLO
from ultralytics.yolo.utils.plotting import Annotator
import os
import cv2
from mediapipe.python.solutions import drawing_utils
from mediapipe.python.solutions import hands
import time
import os
import queue
import threading
from yolov8_det import analysis_yolov8
from cut_img_bbox import cut_img_bbox
from mediapipe_det import analysis_mediapipe
class atm_det:
def __init__(self,imgPath,savePath,modellist):
self.imgPath = imgPath
self.savePath = savePath
self.imgList = os.listdir(self.imgPath)
#定义加载好的模型
self.model_person = modellist[0]
self.model_pp_hand = modellist[1]
self.model_blue = modellist[2]
self.model_screen = modellist[3]
self.media_hands = modellist[4]
# 队列
self.imgQueue1 = queue.Queue(maxsize=len(self.imgList))
self.imgQueue2 = queue.Queue(maxsize=len(self.imgList))
self.imgQueue3 = queue.Queue(maxsize=len(self.imgList))
self.imgQueue4 = queue.Queue(maxsize=len(self.imgList))
self.imgQueue5 = queue.Queue(maxsize=len(self.imgList))
self.imgQueue6 = queue.Queue(maxsize=len(self.imgList))
#线程
self.get_imgThread = threading.Thread(target=self.get_img)
self.get_person_resultThread = threading.Thread(target=self.get_person_result)
self.get_hand_landmarkerThread = threading.Thread(target=self.get_hand_landmarker)
self.get_blue_resultThread = threading.Thread(target=self.get_blue_result)
self.get_pph_resultThread = threading.Thread(target=self.get_pph_result)
self.analysis_handThread = threading.Thread(target=self.analysis_hand)
self.draw_imagesThread = threading.Thread(target=self.draw_images)
self.analysis_hand_blueThread = threading.Thread(target=self.analysis_hand_blue)
self.get_screen_resultThread = threading.Thread(target=self.get_screen_result)
def get_img(self):
for img in self.imgList:
imgpath = os.path.join(self.imgPath,img)
images = cv2.imread(imgpath)
imagesDict = {img:images}
self.imgQueue1.put(imagesDict)
def get_person_result(self):
while True:
if ~self.imgQueue1.empty():
imagesDict = self.imgQueue1.get()
images = list(imagesDict.values())[0]
imgname = list(imagesDict.keys())[0]
per_result = analysis_yolov8(images=images,
model_coco=self.model_person,
confidence=0.5
)
for per in per_result:
per_bbox = list(per.values())[0]
imgcut = cut_img_bbox(images,per_bbox)
imgcutDict = {imgname:{"imgcut":imgcut,"per":per}}
self.imgQueue2.put(imgcutDict)
def get_blue_result(self):
while True:
if ~self.imgQueue1.empty():
imagesDict = self.imgQueue1.get()
images = list(imagesDict.values())[0]
imgname = list(imagesDict.keys())[0]
blue_result = analysis_yolov8(images=images,
model_coco=self.model_blue,
confidence=0.5
)
blues_list = []
for blues in blue_result:
blue = list(blues.values())[0]
blues_list.append(blue)
if blues_list:
bluesDict = {imgname:blues_list}
self.imgQueue4.put(bluesDict)
def get_pph_result(self):
while True:
if ~self.imgQueue1.empty():
imagesDict = self.imgQueue1.get()
images = list(imagesDict.values())[0]
imgname = list(imagesDict.keys())[0]
blue_result = analysis_yolov8(images=images,
model_coco=self.model_pp_hand,
confidence=0.5
)
pph_list = []
for blues in blue_result:
blue = list(blues.values())[0]
pph_list.append(blue)
if pph_list:
pphDict = {imgname:pph_list}
self.imgQueue5.put(pphDict)
def get_hand_landmarker(self):
while True:
if ~self.imgQueue2.empty():
imgcutDict = self.imgQueue2.get()
imgcut = list(imgcutDict.values())[0]["imgcut"]
hand_landmarker_result = analysis_mediapipe(images=imgcut,
hands=self.media_hands,
parameter=hands.HAND_CONNECTIONS)
handDict = {"hand_landmarker_result":hand_landmarker_result}
list(imgcutDict.values())[0].update(handDict)
self.imgQueue3.put(imgcutDict)
def get_screen_result(self):
while True:
if ~self.imgQueue1.empty():
imagesDict = self.imgQueue1.get()
images = list(imagesDict.values())[0]
imgname = list(imagesDict.keys())[0]
screen_result = analysis_yolov8(images=images,
model_coco=self.model_screen,
confidence=0.5
)
print('screen_result:',screen_result)
def analysis_hand(self):
while True:
if ~self.imgQueue3.empty():
imgcutDict2 = self.imgQueue3.get()
imgname = list(imgcutDict2.keys())[0]
re_list = list(imgcutDict2.values())[0]
pre_list = re_list['per']
pre_list = list(pre_list.values())[0]
# pre_x = int(pre_list[2] - pre_list[0])
pre_x = int(pre_list[0])
pre_y = int(pre_list[1])
# pre_y = int(pre_list[3] - pre_list[1])
hand_list = re_list['hand_landmarker_result']
point_list = []
for hand_point in hand_list:
for point in hand_point:
# print(point)
point_x = int(point[0]) + pre_x
point_y = int(point[1]) + pre_y
point_list.append((point_x,point_y))
if point_list:
imgcutDict2.update({imgname:point_list})
self.imgQueue6.put(imgcutDict2)
def analysis_hand_blue(self):
while True:
if ~self.imgQueue4.empty() and ~self.imgQueue6.empty():
blue_list = self.imgQueue4.get()
hand_list = self.imgQueue6.get()
print('blue_list:',blue_list)
print('hand_list:',hand_list)
while list(blue_list.keys())[0] == list(hand_list.keys())[0]:
print(list(blue_list.keys())[0])
def draw_images(self):
while True:
if ~self.imgQueue6.empty():
img_hand_point = self.imgQueue6.get()
imgname = list(img_hand_point.keys())[0]
img = cv2.imread(os.path.join(self.imgPath,imgname))
point_list = list(img_hand_point.values())[0]
for point in point_list:
cv2.circle(img, point, 1,(0,0,255), 2)
cv2.imwrite(os.path.join(self.savePath,imgname),img)
def run(self):
self.get_imgThread.start()
self.get_person_resultThread.start()
self.get_hand_landmarkerThread.start()
self.get_blue_resultThread.start()
# self.get_pph_resultThread.start()
self.analysis_handThread.start()
# self.draw_imagesThread.start()
self.analysis_hand_blueThread.start()
self.get_screen_resultThread.start()
if __name__ == '__main__':
model_person = YOLO("model_files/bk1.pt")
model_pp_hand = YOLO("model_files/best_pph.pt")
model_blue = YOLO("model_files/best_butten.pt")
model_screen = YOLO("model_files/best_screen.pt")
media_hands = hands.Hands(
static_image_mode=True,
max_num_hands=4,
min_detection_confidence=0.1,
min_tracking_confidence=0.1)
modelList = [model_person,model_pp_hand,model_blue,model_screen,media_hands]
q = atm_det(imgPath='E:/BANK_XZ/data_file',
savePath='E:/BANK_XZ/output_data',
modellist=modelList)
q.run()