# main.py import pymysql import time from dotenv import load_dotenv import os import requests import logging import hashlib # Load environment variables from .env file load_dotenv() logging.basicConfig( filename='download.log', level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) def get_file_hash(filename): if os.path.isfile(filename) is False: return None # make a hash object h_sha256 = hashlib.sha256() # open file for reading in binary mode with open(filename, "rb") as file: # read file in chunks and update hash chunk = 0 while chunk != b"": chunk = file.read(1024) h_sha256.update(chunk) # return the hex digest return h_sha256.hexdigest() def connect_to_mysql(): try: connection = pymysql.connect( host=os.getenv('MYSQL_HOST'), port=int(os.getenv('MYSQL_PORT')), user=os.getenv('MYSQL_USER'), password=os.getenv('MYSQL_PASSWORD'), database=os.getenv('MYSQL_DATABASE'), cursorclass=pymysql.cursors.DictCursor ) if connection.open: return connection except pymysql.Error as e: logger.error(f"Error: {e}") return None def fetch_oldest_video(connection): try: with connection.cursor() as cursor: query = "SELECT * FROM tp_videos WHERE download_status = 0 ORDER BY create_time LIMIT 1;" cursor.execute(query) result = cursor.fetchone() return result except pymysql.Error as e: logger.error(f"Error fetching data: {e}") return None def fetch_same_video(connection, video_hash): try: with connection.cursor() as cursor: sql = "SELECT * FROM tp_videos WHERE video_hash = %s ORDER BY id DESC LIMIT 1" # 执行 SQL 查询 cursor.execute(sql, (video_hash,)) result = cursor.fetchone() return result except pymysql.Error as e: logger.error(f"Error fetching data: {e}") return None def save_download_success(connection, id_, video_hash, is_same=False): try: with connection.cursor() as cursor: query = "UPDATE tp_videos SET download_status=2, video_hash=%s, video_status=%s WHERE id=%s;" if is_same: cursor.execute(query, (video_hash, 4, id_)) else: cursor.execute(query, (video_hash, 1, id_)) connection.commit() except pymysql.Error as e: with connection.cursor() as cursor: query = "UPDATE tp_videos SET download_status=3 WHERE id=%s;" cursor.execute(query, (id_)) connection.commit() logger.error(f"更新失败: {e}") except Exception as e1: logger.error(f"更新失败: {e1}") def save_download_error(connection, id_): try: with connection.cursor() as cursor: query = "UPDATE tp_videos SET download_status=3 WHERE id=%s;" cursor.execute(query, (id_, )) connection.commit() except pymysql.Error as e: logger.error(f"更新失败: {e}") def download_video(video_data): download_path = video_data.get('video_path', '') file_name = video_data.get('video_name', '') ftp_path = video_data.get('ftp_path', '') if download_path and file_name and ftp_path: try: # 发送带有流式传输的 GET 请求 response = requests.get(ftp_path, stream=True) # 检查响应状态码是否为 200 if response.status_code == 200: with open(download_path, 'wb') as file: # 逐块写入文件 for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) logger.info(f"下载完成: {download_path}") else: logger.error(f"下载失败:状态码:{response.status_code}") except requests.RequestException as e: logger.error(f"下载失败: {e}") raise requests.RequestException(f"下载失败: {e}") except IOError as e: logger.error(f"文件操作失败: {e}") raise IOError(f"文件操作失败: {e}") def main(): count = 0 try: while True: connection = connect_to_mysql() if not connection: time.sleep(10) continue video_data = fetch_oldest_video(connection) if video_data and video_data.get('id', None): logger.info(str(video_data)) # video_url = video_data['url'] # Assuming URL is in the 'url' column try: download_video(video_data) except Exception as e: logger.error(f"保存异常{e}") save_download_error(connection, video_data['id']) else: download_path = video_data.get('video_path', '') video_hash = get_file_hash(download_path) same_rsult = fetch_same_video(connection, video_hash) if same_rsult: try: os.remove(download_path) logger.info(f"删除重复视频:{download_path}") except Exception as e: logger.error(f"删除重复视频失败,原因为:{e}") save_download_success(connection, video_data['id'], video_hash, is_same=True) else: save_download_success(connection, video_data['id'], video_hash, is_same=False) time.sleep(2) else: print("No videos to download. Waiting...") time.sleep(5) count += 1 # if count == 500: # logger.info(f"重启") # break except KeyboardInterrupt: logger.info("Stopping the downloader.") finally: connection.close() if __name__ == "__main__": main()