del
parent
565928969d
commit
01a80e5440
@ -1,307 +0,0 @@
|
|||||||
import cv2
|
|
||||||
from tqdm import tqdm
|
|
||||||
from ultralytics import YOLO
|
|
||||||
from ultralytics.yolo.utils.plotting import Annotator
|
|
||||||
import os
|
|
||||||
|
|
||||||
import cv2
|
|
||||||
from mediapipe.python.solutions import drawing_utils
|
|
||||||
from mediapipe.python.solutions import hands
|
|
||||||
import time
|
|
||||||
import os
|
|
||||||
import queue
|
|
||||||
import threading
|
|
||||||
|
|
||||||
from yolov8_det import analysis_yolov8
|
|
||||||
from cut_img_bbox import cut_img_bbox
|
|
||||||
from mediapipe_det import analysis_mediapipe
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class atm_det:
|
|
||||||
|
|
||||||
def __init__(self,imgPath,savePath,modellist):
|
|
||||||
|
|
||||||
self.imgPath = imgPath
|
|
||||||
self.savePath = savePath
|
|
||||||
|
|
||||||
self.imgList = os.listdir(self.imgPath)
|
|
||||||
|
|
||||||
#定义加载好的模型
|
|
||||||
self.model_person = modellist[0]
|
|
||||||
self.model_pp_hand = modellist[1]
|
|
||||||
self.model_blue = modellist[2]
|
|
||||||
self.model_screen = modellist[3]
|
|
||||||
|
|
||||||
self.media_hands = modellist[4]
|
|
||||||
|
|
||||||
# 队列
|
|
||||||
self.imgQueue1 = queue.Queue(maxsize=len(self.imgList))
|
|
||||||
self.imgQueue2 = queue.Queue(maxsize=len(self.imgList))
|
|
||||||
self.imgQueue3 = queue.Queue(maxsize=len(self.imgList))
|
|
||||||
self.imgQueue4 = queue.Queue(maxsize=len(self.imgList))
|
|
||||||
self.imgQueue5 = queue.Queue(maxsize=len(self.imgList))
|
|
||||||
self.imgQueue6 = queue.Queue(maxsize=len(self.imgList))
|
|
||||||
|
|
||||||
#线程
|
|
||||||
self.get_imgThread = threading.Thread(target=self.get_img)
|
|
||||||
self.get_person_resultThread = threading.Thread(target=self.get_person_result)
|
|
||||||
self.get_hand_landmarkerThread = threading.Thread(target=self.get_hand_landmarker)
|
|
||||||
self.get_blue_resultThread = threading.Thread(target=self.get_blue_result)
|
|
||||||
self.get_pph_resultThread = threading.Thread(target=self.get_pph_result)
|
|
||||||
self.analysis_handThread = threading.Thread(target=self.analysis_hand)
|
|
||||||
self.draw_imagesThread = threading.Thread(target=self.draw_images)
|
|
||||||
self.analysis_hand_blueThread = threading.Thread(target=self.analysis_hand_blue)
|
|
||||||
self.get_screen_resultThread = threading.Thread(target=self.get_screen_result)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_img(self):
|
|
||||||
|
|
||||||
for img in self.imgList:
|
|
||||||
|
|
||||||
imgpath = os.path.join(self.imgPath,img)
|
|
||||||
images = cv2.imread(imgpath)
|
|
||||||
|
|
||||||
imagesDict = {img:images}
|
|
||||||
|
|
||||||
self.imgQueue1.put(imagesDict)
|
|
||||||
|
|
||||||
|
|
||||||
def get_person_result(self):
|
|
||||||
|
|
||||||
while True:
|
|
||||||
|
|
||||||
if ~self.imgQueue1.empty():
|
|
||||||
|
|
||||||
imagesDict = self.imgQueue1.get()
|
|
||||||
images = list(imagesDict.values())[0]
|
|
||||||
imgname = list(imagesDict.keys())[0]
|
|
||||||
|
|
||||||
per_result = analysis_yolov8(images=images,
|
|
||||||
model_coco=self.model_person,
|
|
||||||
confidence=0.5
|
|
||||||
)
|
|
||||||
|
|
||||||
for per in per_result:
|
|
||||||
|
|
||||||
per_bbox = list(per.values())[0]
|
|
||||||
|
|
||||||
imgcut = cut_img_bbox(images,per_bbox)
|
|
||||||
imgcutDict = {imgname:{"imgcut":imgcut,"per":per}}
|
|
||||||
|
|
||||||
self.imgQueue2.put(imgcutDict)
|
|
||||||
|
|
||||||
def get_blue_result(self):
|
|
||||||
while True:
|
|
||||||
|
|
||||||
if ~self.imgQueue1.empty():
|
|
||||||
|
|
||||||
imagesDict = self.imgQueue1.get()
|
|
||||||
images = list(imagesDict.values())[0]
|
|
||||||
imgname = list(imagesDict.keys())[0]
|
|
||||||
|
|
||||||
blue_result = analysis_yolov8(images=images,
|
|
||||||
model_coco=self.model_blue,
|
|
||||||
confidence=0.5
|
|
||||||
)
|
|
||||||
blues_list = []
|
|
||||||
for blues in blue_result:
|
|
||||||
|
|
||||||
blue = list(blues.values())[0]
|
|
||||||
blues_list.append(blue)
|
|
||||||
|
|
||||||
if blues_list:
|
|
||||||
bluesDict = {imgname:blues_list}
|
|
||||||
|
|
||||||
self.imgQueue4.put(bluesDict)
|
|
||||||
|
|
||||||
def get_pph_result(self):
|
|
||||||
while True:
|
|
||||||
|
|
||||||
if ~self.imgQueue1.empty():
|
|
||||||
|
|
||||||
imagesDict = self.imgQueue1.get()
|
|
||||||
images = list(imagesDict.values())[0]
|
|
||||||
imgname = list(imagesDict.keys())[0]
|
|
||||||
|
|
||||||
blue_result = analysis_yolov8(images=images,
|
|
||||||
model_coco=self.model_pp_hand,
|
|
||||||
confidence=0.5
|
|
||||||
)
|
|
||||||
pph_list = []
|
|
||||||
for blues in blue_result:
|
|
||||||
|
|
||||||
blue = list(blues.values())[0]
|
|
||||||
pph_list.append(blue)
|
|
||||||
|
|
||||||
if pph_list:
|
|
||||||
|
|
||||||
pphDict = {imgname:pph_list}
|
|
||||||
self.imgQueue5.put(pphDict)
|
|
||||||
|
|
||||||
def get_hand_landmarker(self):
|
|
||||||
while True:
|
|
||||||
|
|
||||||
if ~self.imgQueue2.empty():
|
|
||||||
|
|
||||||
imgcutDict = self.imgQueue2.get()
|
|
||||||
|
|
||||||
imgcut = list(imgcutDict.values())[0]["imgcut"]
|
|
||||||
|
|
||||||
hand_landmarker_result = analysis_mediapipe(images=imgcut,
|
|
||||||
hands=self.media_hands,
|
|
||||||
parameter=hands.HAND_CONNECTIONS)
|
|
||||||
|
|
||||||
handDict = {"hand_landmarker_result":hand_landmarker_result}
|
|
||||||
|
|
||||||
list(imgcutDict.values())[0].update(handDict)
|
|
||||||
|
|
||||||
self.imgQueue3.put(imgcutDict)
|
|
||||||
|
|
||||||
|
|
||||||
def get_screen_result(self):
|
|
||||||
while True:
|
|
||||||
if ~self.imgQueue1.empty():
|
|
||||||
|
|
||||||
imagesDict = self.imgQueue1.get()
|
|
||||||
images = list(imagesDict.values())[0]
|
|
||||||
imgname = list(imagesDict.keys())[0]
|
|
||||||
|
|
||||||
screen_result = analysis_yolov8(images=images,
|
|
||||||
model_coco=self.model_screen,
|
|
||||||
confidence=0.5
|
|
||||||
)
|
|
||||||
print('screen_result:',screen_result)
|
|
||||||
|
|
||||||
|
|
||||||
def analysis_hand(self):
|
|
||||||
while True:
|
|
||||||
|
|
||||||
if ~self.imgQueue3.empty():
|
|
||||||
|
|
||||||
imgcutDict2 = self.imgQueue3.get()
|
|
||||||
imgname = list(imgcutDict2.keys())[0]
|
|
||||||
|
|
||||||
re_list = list(imgcutDict2.values())[0]
|
|
||||||
|
|
||||||
pre_list = re_list['per']
|
|
||||||
pre_list = list(pre_list.values())[0]
|
|
||||||
|
|
||||||
# pre_x = int(pre_list[2] - pre_list[0])
|
|
||||||
pre_x = int(pre_list[0])
|
|
||||||
pre_y = int(pre_list[1])
|
|
||||||
# pre_y = int(pre_list[3] - pre_list[1])
|
|
||||||
|
|
||||||
hand_list = re_list['hand_landmarker_result']
|
|
||||||
|
|
||||||
point_list = []
|
|
||||||
for hand_point in hand_list:
|
|
||||||
|
|
||||||
for point in hand_point:
|
|
||||||
# print(point)
|
|
||||||
|
|
||||||
point_x = int(point[0]) + pre_x
|
|
||||||
point_y = int(point[1]) + pre_y
|
|
||||||
|
|
||||||
point_list.append((point_x,point_y))
|
|
||||||
|
|
||||||
if point_list:
|
|
||||||
imgcutDict2.update({imgname:point_list})
|
|
||||||
|
|
||||||
self.imgQueue6.put(imgcutDict2)
|
|
||||||
|
|
||||||
|
|
||||||
def analysis_hand_blue(self):
|
|
||||||
while True:
|
|
||||||
if ~self.imgQueue4.empty() and ~self.imgQueue6.empty():
|
|
||||||
|
|
||||||
blue_list = self.imgQueue4.get()
|
|
||||||
hand_list = self.imgQueue6.get()
|
|
||||||
|
|
||||||
print('blue_list:',blue_list)
|
|
||||||
print('hand_list:',hand_list)
|
|
||||||
|
|
||||||
while list(blue_list.keys())[0] == list(hand_list.keys())[0]:
|
|
||||||
print(list(blue_list.keys())[0])
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def draw_images(self):
|
|
||||||
while True:
|
|
||||||
if ~self.imgQueue6.empty():
|
|
||||||
|
|
||||||
img_hand_point = self.imgQueue6.get()
|
|
||||||
imgname = list(img_hand_point.keys())[0]
|
|
||||||
|
|
||||||
img = cv2.imread(os.path.join(self.imgPath,imgname))
|
|
||||||
|
|
||||||
point_list = list(img_hand_point.values())[0]
|
|
||||||
|
|
||||||
for point in point_list:
|
|
||||||
|
|
||||||
cv2.circle(img, point, 1,(0,0,255), 2)
|
|
||||||
|
|
||||||
cv2.imwrite(os.path.join(self.savePath,imgname),img)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
|
|
||||||
self.get_imgThread.start()
|
|
||||||
self.get_person_resultThread.start()
|
|
||||||
self.get_hand_landmarkerThread.start()
|
|
||||||
self.get_blue_resultThread.start()
|
|
||||||
# self.get_pph_resultThread.start()
|
|
||||||
self.analysis_handThread.start()
|
|
||||||
# self.draw_imagesThread.start()
|
|
||||||
self.analysis_hand_blueThread.start()
|
|
||||||
self.get_screen_resultThread.start()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
|
|
||||||
|
|
||||||
model_person = YOLO("model_files/bk1.pt")
|
|
||||||
model_pp_hand = YOLO("model_files/best_pph.pt")
|
|
||||||
model_blue = YOLO("model_files/best_butten.pt")
|
|
||||||
model_screen = YOLO("model_files/best_screen.pt")
|
|
||||||
|
|
||||||
|
|
||||||
media_hands = hands.Hands(
|
|
||||||
static_image_mode=True,
|
|
||||||
max_num_hands=4,
|
|
||||||
min_detection_confidence=0.1,
|
|
||||||
min_tracking_confidence=0.1)
|
|
||||||
|
|
||||||
modelList = [model_person,model_pp_hand,model_blue,model_screen,media_hands]
|
|
||||||
|
|
||||||
q = atm_det(imgPath='E:/BANK_XZ/data_file',
|
|
||||||
savePath='E:/BANK_XZ/output_data',
|
|
||||||
modellist=modelList)
|
|
||||||
|
|
||||||
q.run()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue