import cv2 from tqdm import tqdm from ultralytics import YOLO from ultralytics.yolo.utils.plotting import Annotator import os import cv2 from mediapipe.python.solutions import drawing_utils from mediapipe.python.solutions import hands import time import os import queue import threading from yolov8_det import analysis_yolov8 from cut_img_bbox import cut_img_bbox from mediapipe_det import analysis_mediapipe class atm_det: def __init__(self,imgPath,savePath,modellist): self.imgPath = imgPath self.savePath = savePath self.imgList = os.listdir(self.imgPath) #定义加载好的模型 self.model_person = modellist[0] self.model_pp_hand = modellist[1] self.model_blue = modellist[2] self.model_screen = modellist[3] self.media_hands = modellist[4] # 队列 self.imgQueue1 = queue.Queue(maxsize=len(self.imgList)) self.imgQueue2 = queue.Queue(maxsize=len(self.imgList)) self.imgQueue3 = queue.Queue(maxsize=len(self.imgList)) self.imgQueue4 = queue.Queue(maxsize=len(self.imgList)) self.imgQueue5 = queue.Queue(maxsize=len(self.imgList)) self.imgQueue6 = queue.Queue(maxsize=len(self.imgList)) #线程 self.get_imgThread = threading.Thread(target=self.get_img) self.get_person_resultThread = threading.Thread(target=self.get_person_result) self.get_hand_landmarkerThread = threading.Thread(target=self.get_hand_landmarker) self.get_blue_resultThread = threading.Thread(target=self.get_blue_result) self.get_pph_resultThread = threading.Thread(target=self.get_pph_result) self.analysis_handThread = threading.Thread(target=self.analysis_hand) self.draw_imagesThread = threading.Thread(target=self.draw_images) self.analysis_hand_blueThread = threading.Thread(target=self.analysis_hand_blue) self.get_screen_resultThread = threading.Thread(target=self.get_screen_result) def get_img(self): for img in self.imgList: imgpath = os.path.join(self.imgPath,img) images = cv2.imread(imgpath) imagesDict = {img:images} self.imgQueue1.put(imagesDict) def get_person_result(self): while True: if ~self.imgQueue1.empty(): imagesDict = self.imgQueue1.get() images = list(imagesDict.values())[0] imgname = list(imagesDict.keys())[0] per_result = analysis_yolov8(images=images, model_coco=self.model_person, confidence=0.5 ) for per in per_result: per_bbox = list(per.values())[0] imgcut = cut_img_bbox(images,per_bbox) imgcutDict = {imgname:{"imgcut":imgcut,"per":per}} self.imgQueue2.put(imgcutDict) def get_blue_result(self): while True: if ~self.imgQueue1.empty(): imagesDict = self.imgQueue1.get() images = list(imagesDict.values())[0] imgname = list(imagesDict.keys())[0] blue_result = analysis_yolov8(images=images, model_coco=self.model_blue, confidence=0.5 ) blues_list = [] for blues in blue_result: blue = list(blues.values())[0] blues_list.append(blue) if blues_list: bluesDict = {imgname:blues_list} self.imgQueue4.put(bluesDict) def get_pph_result(self): while True: if ~self.imgQueue1.empty(): imagesDict = self.imgQueue1.get() images = list(imagesDict.values())[0] imgname = list(imagesDict.keys())[0] blue_result = analysis_yolov8(images=images, model_coco=self.model_pp_hand, confidence=0.5 ) pph_list = [] for blues in blue_result: blue = list(blues.values())[0] pph_list.append(blue) if pph_list: pphDict = {imgname:pph_list} self.imgQueue5.put(pphDict) def get_hand_landmarker(self): while True: if ~self.imgQueue2.empty(): imgcutDict = self.imgQueue2.get() imgcut = list(imgcutDict.values())[0]["imgcut"] hand_landmarker_result = analysis_mediapipe(images=imgcut, hands=self.media_hands, parameter=hands.HAND_CONNECTIONS) handDict = {"hand_landmarker_result":hand_landmarker_result} list(imgcutDict.values())[0].update(handDict) self.imgQueue3.put(imgcutDict) def get_screen_result(self): while True: if ~self.imgQueue1.empty(): imagesDict = self.imgQueue1.get() images = list(imagesDict.values())[0] imgname = list(imagesDict.keys())[0] screen_result = analysis_yolov8(images=images, model_coco=self.model_screen, confidence=0.5 ) print('screen_result:',screen_result) def analysis_hand(self): while True: if ~self.imgQueue3.empty(): imgcutDict2 = self.imgQueue3.get() imgname = list(imgcutDict2.keys())[0] re_list = list(imgcutDict2.values())[0] pre_list = re_list['per'] pre_list = list(pre_list.values())[0] # pre_x = int(pre_list[2] - pre_list[0]) pre_x = int(pre_list[0]) pre_y = int(pre_list[1]) # pre_y = int(pre_list[3] - pre_list[1]) hand_list = re_list['hand_landmarker_result'] point_list = [] for hand_point in hand_list: for point in hand_point: # print(point) point_x = int(point[0]) + pre_x point_y = int(point[1]) + pre_y point_list.append((point_x,point_y)) if point_list: imgcutDict2.update({imgname:point_list}) self.imgQueue6.put(imgcutDict2) def analysis_hand_blue(self): while True: if ~self.imgQueue4.empty() and ~self.imgQueue6.empty(): blue_list = self.imgQueue4.get() hand_list = self.imgQueue6.get() print('blue_list:',blue_list) print('hand_list:',hand_list) while list(blue_list.keys())[0] == list(hand_list.keys())[0]: print(list(blue_list.keys())[0]) def draw_images(self): while True: if ~self.imgQueue6.empty(): img_hand_point = self.imgQueue6.get() imgname = list(img_hand_point.keys())[0] img = cv2.imread(os.path.join(self.imgPath,imgname)) point_list = list(img_hand_point.values())[0] for point in point_list: cv2.circle(img, point, 1,(0,0,255), 2) cv2.imwrite(os.path.join(self.savePath,imgname),img) def run(self): self.get_imgThread.start() self.get_person_resultThread.start() self.get_hand_landmarkerThread.start() self.get_blue_resultThread.start() # self.get_pph_resultThread.start() self.analysis_handThread.start() # self.draw_imagesThread.start() self.analysis_hand_blueThread.start() self.get_screen_resultThread.start() if __name__ == '__main__': model_person = YOLO("model_files/bk1.pt") model_pp_hand = YOLO("model_files/best_pph.pt") model_blue = YOLO("model_files/best_butten.pt") model_screen = YOLO("model_files/best_screen.pt") media_hands = hands.Hands( static_image_mode=True, max_num_hands=4, min_detection_confidence=0.1, min_tracking_confidence=0.1) modelList = [model_person,model_pp_hand,model_blue,model_screen,media_hands] q = atm_det(imgPath='E:/BANK_XZ/data_file', savePath='E:/BANK_XZ/output_data', modellist=modelList) q.run()