# -*- coding: utf-8 -*- import os import subprocess from fastapi import FastAPI, File, UploadFile from fastapi.responses import FileResponse from datetime import datetime app = FastAPI() def save_upload_file(upload_file: UploadFile, filename: str): with open(filename, "wb") as buffer: buffer.write(upload_file.file.read()) def generate_video_command(result_dir: str, img_path: str, audio_path: str, video_path: str): return [ "python", "script.py", "--source_image", img_path, "--result_dir", result_dir, "--driven_audio", audio_path, "--ref_eyeblink", video_path, ] def get_latest_sub_dir(result_dir: str): sub_dirs = [os.path.join(result_dir, d) for d in os.listdir(result_dir) if os.path.isdir(os.path.join(result_dir, d))] if not sub_dirs: return None return max(sub_dirs, key=os.path.getmtime) def get_video_duration(video_path: str): video_duration_command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] result = subprocess.run(video_duration_command, capture_output=True, text=True) return float(result.stdout.strip()) def trim_video(input_video_path: str, output_video_path: str, duration: float): trim_command = [ "ffmpeg", "-i", input_video_path, "-t", str(duration - 2), "-c", "copy", output_video_path ] subprocess.run(trim_command, check=True) def remove_audio(input_video_path: str, output_video_path: str): remove_audio_command = [ "ffmpeg", "-i", input_video_path, "-an", "-vcodec", "copy", output_video_path ] subprocess.run(remove_audio_command, check=True) @app.post("/dynamic-video") async def generate_video( image: UploadFile = File(...), ): img_path = "dynamic/dynamic_image.png" save_upload_file(image, img_path) audio_path = "./examples/driven_audio/dynamic_audio.wav" video_path = "./examples/ref_video/dynamic.mp4" result_dir = os.path.join("results") os.makedirs(result_dir, exist_ok=True) command = generate_video_command(result_dir, img_path, audio_path, video_path) subprocess.run(command, check=True) latest_sub_dir = get_latest_sub_dir(result_dir) if not latest_sub_dir: return {"error": "No subdirectory found in result directory"} result_video_path = os.path.join(latest_sub_dir, "dynamic_image##dynamic_audio_enhanced.mp4") silent_video_path = os.path.join(latest_sub_dir, "dynamic_image##dynamic_audio_enhanced_dynamic.mp4") if os.path.exists(result_video_path): remove_audio(result_video_path, silent_video_path) return FileResponse(silent_video_path, media_type='video/mp4') else: return {"error": "Video file not found"} @app.post("/silent-video") async def generate_and_trim_video( image: UploadFile = File(...), ): img_path = "silent/silent_image.png" save_upload_file(image, img_path) audio_path = "./examples/driven_audio/silent_audio.wav" video_path = "./examples/ref_video/silent.mp4" result_dir = os.path.join("results") os.makedirs(result_dir, exist_ok=True) command = generate_video_command(result_dir, img_path, audio_path, video_path) subprocess.run(command, check=True) latest_sub_dir = get_latest_sub_dir(result_dir) if not latest_sub_dir: return {"error": "No subdirectory found in result directory"} result_video_path = os.path.join(latest_sub_dir, "silent_image##silent_audio_enhanced.mp4") trimmed_video_path = os.path.join(latest_sub_dir, "silent_image##silent_audio_enhanced_trimmed.mp4") if os.path.exists(result_video_path): video_duration = get_video_duration(result_video_path) trim_video(result_video_path, trimmed_video_path, video_duration) return FileResponse(trimmed_video_path, media_type='video/mp4') else: return {"error": "Video file not found"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)