# -*- coding: utf-8 -*- import os import subprocess from fastapi import FastAPI, File, UploadFile from fastapi.responses import FileResponse from hashlib import md5 app = FastAPI() def save_upload_file(upload_file: UploadFile, filename: str): with open(filename, "wb") as buffer: buffer.write(upload_file.file.read()) def generate_video_command(result_dir: str, img_path: str, audio_path: str, video_path: str): return [ "python", "script.py", "--source_image", img_path, "--result_dir", result_dir, "--driven_audio", audio_path, "--ref_eyeblink", video_path, ] def get_latest_sub_dir(result_dir: str): sub_dirs = [os.path.join(result_dir, d) for d in os.listdir(result_dir) if os.path.isdir(os.path.join(result_dir, d))] if not sub_dirs: return None return max(sub_dirs, key=os.path.getmtime) def get_video_duration(video_path: str): video_duration_command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] result = subprocess.run(video_duration_command, capture_output=True, text=True) return float(result.stdout.strip()) def trim_video(input_video_path: str, output_video_path: str, duration: float): trim_command = [ "ffmpeg", "-i", input_video_path, "-t", str(duration - 2), "-c", "copy", output_video_path ] subprocess.run(trim_command, check=True) def remove_audio(input_video_path: str, output_video_path: str): remove_audio_command = [ "ffmpeg", "-i", input_video_path, "-an", "-vcodec", "copy", output_video_path ] subprocess.run(remove_audio_command, check=True) def save_to_final_destination(source_path: str, destination_dir: str): os.makedirs(destination_dir, exist_ok=True) destination_path = os.path.join(destination_dir, os.path.basename(source_path)) os.rename(source_path, destination_path) return destination_path def get_file_md5(file_path: str): hash_md5 = md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def record_video_mapping(image_path: str, video_path: str, record_file: str): with open(record_file, "a") as f: f.write(f"{image_path} {video_path}\n") def check_existing_video(image_md5: str, record_file: str): if not os.path.exists(record_file): return None with open(record_file, "r") as f: for line in f: recorded_image_md5, video_path = line.strip().split() if recorded_image_md5 == image_md5: return video_path return None @app.post("/dynamic-video") async def generate_video(image: UploadFile = File(...)): img_path = os.path.join("dynamic", image.filename) save_upload_file(image, img_path) image_md5 = get_file_md5(img_path) record_file = "dynamic_video_record.txt" existing_video = check_existing_video(image_md5, record_file) if existing_video: return FileResponse(existing_video, media_type='video/mp4') audio_path = "./examples/driven_audio/dynamic_audio.wav" video_path = "./examples/ref_video/dynamic.mp4" result_dir = os.path.join("results") os.makedirs(result_dir, exist_ok=True) command = generate_video_command(result_dir, img_path, audio_path, video_path) subprocess.run(command, check=True) latest_sub_dir = get_latest_sub_dir(result_dir) if not latest_sub_dir: return {"error": "No subdirectory found in result directory"} result_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##dynamic_audio_enhanced.mp4") silent_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##dynamic_audio_enhanced_dynamic.mp4") if os.path.exists(result_video_path): remove_audio(result_video_path, silent_video_path) final_destination = save_to_final_destination(silent_video_path, "results/dynamic-video") record_video_mapping(image_md5, final_destination, record_file) return FileResponse(final_destination, media_type='video/mp4') else: return {"error": "Video file not found"} @app.post("/silent-video") async def generate_and_trim_video(image: UploadFile = File(...)): img_path = os.path.join("silent", image.filename) save_upload_file(image, img_path) image_md5 = get_file_md5(img_path) record_file = "silent_video_record.txt" existing_video = check_existing_video(image_md5, record_file) if existing_video: return FileResponse(existing_video, media_type='video/mp4') audio_path = "./examples/driven_audio/silent_audio.wav" video_path = "./examples/ref_video/silent.mp4" result_dir = os.path.join("results") os.makedirs(result_dir, exist_ok=True) command = generate_video_command(result_dir, img_path, audio_path, video_path) subprocess.run(command, check=True) latest_sub_dir = get_latest_sub_dir(result_dir) if not latest_sub_dir: return {"error": "No subdirectory found in result directory"} result_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##silent_audio_enhanced.mp4") trimmed_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##silent_audio_enhanced_trimmed.mp4") if os.path.exists(result_video_path): video_duration = get_video_duration(result_video_path) trim_video(result_video_path, trimmed_video_path, video_duration) final_destination = save_to_final_destination(trimmed_video_path, "results/silent-video") record_video_mapping(image_md5, final_destination, record_file) return FileResponse(final_destination, media_type='video/mp4') else: return {"error": "Video file not found"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)