You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
6.6 KiB
Python

import os
import pymysql
import datetime
from ftplib import FTP
class AutoDownLoad:
def __init__(self, host, username, password, port, mysql_host, mysql_user, mysql_password, database):
self.host = host
self.username = username
self.password = password
self.port = port
self.ftp = None
self.mysql_host = mysql_host
self.mysql_user = mysql_user
self.mysql_password = mysql_password
self.database = database
self.db = None
def ftp_connect(self):
if not self.ftp:
for i in range(3):
try:
ftp = FTP()
# ftp.encoding = 'gbk'
ftp.set_debuglevel(0)
ftp.connect(self.host, self.port)
ftp.login(self.username, self.password)
self.ftp = ftp
break
except Exception as e:
print(f"{i + 1}次登录失败,原因为:{e}")
return self.ftp
def ftp_quit(self):
if self.ftp:
self.ftp.quit()
def mysql_connect(self):
self.db = pymysql.connect(
host=self.mysql_host,
user=self.mysql_user,
password=self.mysql_password,
database=self.database
)
return self.db
def find_ftp_directories(self, target_directory_name, current_directory='/home/TP', result=[]):
# 获取当前目录下的文件和子目录列表
file_list = []
ftp = self.ftp_connect()
ftp.retrlines('LIST ' + current_directory, file_list.append)
subdirectories = [line.split()[-1] for line in file_list if line.startswith('d')]
# print(subdirectories)
# 检查当前目录是否包含目标目录
if target_directory_name in subdirectories:
result.append(current_directory + '/' + target_directory_name)
# 递归搜索子目录
for subdirectory in subdirectories:
if subdirectory not in ['.', '..']:
new_directory = current_directory + '/' + subdirectory
# print(new_directory)
self.find_ftp_directories(target_directory_name, new_directory, result)
return result
def query_direct_mp4(self, target_directory_name, current_directory='/home/TP'):
# 获取当前目录下的文件和子目录列表
result = []
file_list = []
ftp = self.ftp_connect()
ftp.retrlines('LIST ' + current_directory, file_list.append)
subdirectories = [line.split()[-1] for line in file_list if line.startswith('d')]
# print(subdirectories)
# 检查当前目录是否包含目标目录
if target_directory_name in subdirectories:
result.append(current_directory + '/' + target_directory_name)
# 递归搜索子目录
for subdirectory in subdirectories:
if subdirectory not in ['.', '..']:
new_directory = current_directory + '/' + subdirectory
# print(new_directory)
self.find_ftp_directories(target_directory_name, new_directory, result)
print(result)
mp4_files = []
# 获取目录下的文件列表
for dir in result:
file_list = []
ftp.retrlines('LIST ' + dir, file_list.append)
files = [line.split()[-1] for line in file_list]
# 筛选出.mp4文件
for file in files:
if file.lower().endswith('.txt'):
mp4_files.append(os.path.join(dir, file).replace('\\', '/'))
# TODO 把mp4files 存入数据库
try:
db = self.mysql_connect()
cursor = db.cursor()
db.begin()
for mp4 in mp4_files:
sql = f"INSERT INTO tp_videos (video_name, ftp_path, status, download_status, create_time, update_time, video_date) VALUE ('{mp4.split('/')[-1]}', '{mp4}', 0, 0, '{datetime.datetime.now()}', '{datetime.datetime.now()}', '{target_directory_name}');"
print(sql)
cursor.execute(sql)
db.commit()
# 关闭游标和数据库连接
cursor.close()
db.close()
print("数据插入成功!")
except Exception as e:
# 出现异常时,回滚事务
db.rollback()
print("数据插入失败:", str(e))
return mp4_files
def download_mp4(self, video_date, save_dir):
# 获取未下载的ftp视频路径 list
ftp = self.ftp_connect()
try:
# 建立数据库连接
db = self.mysql_connect()
# 创建游标
cursor = db.cursor()
# 执行 SQL 语句
sql = f"SELECT id, ftp_path FROM tp_videos WHERE download_status = 0 AND video_date = '{video_date}'"
cursor.execute(sql)
# 获取结果集
results = cursor.fetchall()
ftp_paths = {key: value for key, value in results}
# 打印结果
print("FTP Paths for download_status=0:", ftp_paths)
for id, ftp_path in ftp_paths.items():
save_path = save_dir + '/' + str(id) + '-' + ftp_path.split('/')[-1]
print(save_path)
with open(save_path, 'wb') as f:
ftp.retrbinary('RETR ' + ftp_path, f.write)
update_sql = f"UPDATE tp_videos SET download_status = 2, video_path = '{save_path}' WHERE id = {id};"
cursor.execute(update_sql)
db.commit()
# 关闭游标和数据库连接
cursor.close()
db.close()
return
except Exception as e:
print(f'下载失败:{e}')
return e
# FTP服务器连接信息
ftp_host = '192.168.10.96'
ftp_user = 'TP'
ftp_pass = '#Yaxin0504'
port = 21
# mysql信息
mysql_host = '192.168.10.59'
mysql_user = 'root'
mysql_password = 'Xfc980516'
database = 'tp_test'
# 下载信息
target_directory_name = '2023-10-11' # 目标文件夹名
current_directory = '/home/TP' # ftp文件夹根路径
save_dir = '/home/ftp_download' # 保存的路径
def run():
auto_obj = AutoDownLoad(host=ftp_host, username=ftp_user, password=ftp_pass, port=port, mysql_host=mysql_host, mysql_user=mysql_user, mysql_password=mysql_password, database=database)
mp4_files = auto_obj.query_direct_mp4(target_directory_name=target_directory_name, current_directory=current_directory)
print(mp4_files)
res = auto_obj.download_mp4(target_directory_name, save_dir)
print(res)
return
if __name__ == '__main__':
run()