import os import pymysql import datetime from ftplib import FTP class AutoDownLoad: def __init__(self, host, username, password, port, mysql_host, mysql_user, mysql_password, database): self.host = host self.username = username self.password = password self.port = port self.ftp = None self.mysql_host = mysql_host self.mysql_user = mysql_user self.mysql_password = mysql_password self.database = database self.db = None def ftp_connect(self): if not self.ftp: for i in range(3): try: ftp = FTP() # ftp.encoding = 'gbk' ftp.set_debuglevel(0) ftp.connect(self.host, self.port) ftp.login(self.username, self.password) self.ftp = ftp break except Exception as e: print(f"第{i + 1}次登录失败,原因为:{e}") return self.ftp def ftp_quit(self): if self.ftp: self.ftp.quit() def mysql_connect(self): self.db = pymysql.connect( host=self.mysql_host, user=self.mysql_user, password=self.mysql_password, database=self.database ) return self.db def find_ftp_directories(self, target_directory_name, current_directory='/home/TP', result=[]): # 获取当前目录下的文件和子目录列表 file_list = [] ftp = self.ftp_connect() ftp.retrlines('LIST ' + current_directory, file_list.append) subdirectories = [line.split()[-1] for line in file_list if line.startswith('d')] # print(subdirectories) # 检查当前目录是否包含目标目录 if target_directory_name in subdirectories: result.append(current_directory + '/' + target_directory_name) # 递归搜索子目录 for subdirectory in subdirectories: if subdirectory not in ['.', '..']: new_directory = current_directory + '/' + subdirectory # print(new_directory) self.find_ftp_directories(target_directory_name, new_directory, result) return result def query_direct_mp4(self, target_directory_name, current_directory='/home/TP'): # 获取当前目录下的文件和子目录列表 result = [] file_list = [] ftp = self.ftp_connect() ftp.retrlines('LIST ' + current_directory, file_list.append) subdirectories = [line.split()[-1] for line in file_list if line.startswith('d')] # print(subdirectories) # 检查当前目录是否包含目标目录 if target_directory_name in subdirectories: result.append(current_directory + '/' + target_directory_name) # 递归搜索子目录 for subdirectory in subdirectories: if subdirectory not in ['.', '..']: new_directory = current_directory + '/' + subdirectory # print(new_directory) self.find_ftp_directories(target_directory_name, new_directory, result) print(result) mp4_files = [] # 获取目录下的文件列表 for dir in result: file_list = [] ftp.retrlines('LIST ' + dir, file_list.append) files = [line.split()[-1] for line in file_list] # 筛选出.mp4文件 for file in files: if file.lower().endswith('.txt'): mp4_files.append(os.path.join(dir, file).replace('\\', '/')) # TODO 把mp4files 存入数据库 try: db = self.mysql_connect() cursor = db.cursor() db.begin() for mp4 in mp4_files: sql = f"INSERT INTO tp_videos (video_name, ftp_path, status, download_status, create_time, update_time, video_date) VALUE ('{mp4.split('/')[-1]}', '{mp4}', 0, 0, '{datetime.datetime.now()}', '{datetime.datetime.now()}', '{target_directory_name}');" print(sql) cursor.execute(sql) db.commit() # 关闭游标和数据库连接 cursor.close() db.close() print("数据插入成功!") except Exception as e: # 出现异常时,回滚事务 db.rollback() print("数据插入失败:", str(e)) return mp4_files def download_mp4(self, video_date, save_dir): # 获取未下载的ftp视频路径 list ftp = self.ftp_connect() try: # 建立数据库连接 db = self.mysql_connect() # 创建游标 cursor = db.cursor() # 执行 SQL 语句 sql = f"SELECT id, ftp_path FROM tp_videos WHERE download_status = 0 AND video_date = '{video_date}'" cursor.execute(sql) # 获取结果集 results = cursor.fetchall() ftp_paths = {key: value for key, value in results} # 打印结果 print("FTP Paths for download_status=0:", ftp_paths) for id, ftp_path in ftp_paths.items(): save_path = save_dir + '/' + str(id) + '-' + ftp_path.split('/')[-1] print(save_path) with open(save_path, 'wb') as f: ftp.retrbinary('RETR ' + ftp_path, f.write) update_sql = f"UPDATE tp_videos SET download_status = 2, video_path = '{save_path}' WHERE id = {id};" cursor.execute(update_sql) db.commit() # 关闭游标和数据库连接 cursor.close() db.close() return except Exception as e: print(f'下载失败:{e}') return e # FTP服务器连接信息 ftp_host = '192.168.10.96' ftp_user = 'TP' ftp_pass = '#Yaxin0504' port = 21 # mysql信息 mysql_host = '192.168.10.59' mysql_user = 'root' mysql_password = 'Xfc980516' database = 'tp_test' # 下载信息 target_directory_name = '2023-10-11' # 目标文件夹名 current_directory = '/home/TP' # ftp文件夹根路径 save_dir = '/home/ftp_download' # 保存的路径 def run(): auto_obj = AutoDownLoad(host=ftp_host, username=ftp_user, password=ftp_pass, port=port, mysql_host=mysql_host, mysql_user=mysql_user, mysql_password=mysql_password, database=database) mp4_files = auto_obj.query_direct_mp4(target_directory_name=target_directory_name, current_directory=current_directory) print(mp4_files) res = auto_obj.download_mp4(target_directory_name, save_dir) print(res) return if __name__ == '__main__': run()