You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SadTalker/main.py

162 lines
5.9 KiB
Python

# -*- coding: utf-8 -*-
import os
import subprocess
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
from hashlib import md5
app = FastAPI()
def save_upload_file(upload_file: UploadFile, filename: str):
with open(filename, "wb") as buffer:
buffer.write(upload_file.file.read())
def generate_video_command(result_dir: str, img_path: str, audio_path: str, video_path: str):
return [
"python", "script.py",
"--source_image", img_path,
"--result_dir", result_dir,
"--driven_audio", audio_path,
"--ref_eyeblink", video_path,
]
def get_latest_sub_dir(result_dir: str):
sub_dirs = [os.path.join(result_dir, d) for d in os.listdir(result_dir) if os.path.isdir(os.path.join(result_dir, d))]
if not sub_dirs:
return None
return max(sub_dirs, key=os.path.getmtime)
def get_video_duration(video_path: str):
video_duration_command = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
result = subprocess.run(video_duration_command, capture_output=True, text=True)
return float(result.stdout.strip())
def trim_video(input_video_path: str, output_video_path: str, duration: float):
trim_command = [
"ffmpeg",
"-i", input_video_path,
"-t", str(duration - 2),
"-c", "copy",
output_video_path
]
subprocess.run(trim_command, check=True)
def remove_audio(input_video_path: str, output_video_path: str):
remove_audio_command = [
"ffmpeg",
"-i", input_video_path,
"-an",
"-vcodec", "copy",
output_video_path
]
subprocess.run(remove_audio_command, check=True)
def save_to_final_destination(source_path: str, destination_dir: str):
os.makedirs(destination_dir, exist_ok=True)
destination_path = os.path.join(destination_dir, os.path.basename(source_path))
os.rename(source_path, destination_path)
return destination_path
def get_file_md5(file_path: str):
hash_md5 = md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def record_video_mapping(image_path: str, video_path: str, record_file: str):
with open(record_file, "a") as f:
f.write(f"{image_path} {video_path}\n")
def check_existing_video(image_md5: str, record_file: str):
if not os.path.exists(record_file):
return None
with open(record_file, "r") as f:
for line in f:
recorded_image_md5, video_path = line.strip().split()
if recorded_image_md5 == image_md5:
return video_path
return None
@app.post("/dynamic-video")
async def generate_video(image: UploadFile = File(...)):
img_path = os.path.join("dynamic", image.filename)
save_upload_file(image, img_path)
image_md5 = get_file_md5(img_path)
record_file = "dynamic_video_record.txt"
existing_video = check_existing_video(image_md5, record_file)
if existing_video:
return FileResponse(existing_video, media_type='video/mp4')
audio_path = "./examples/driven_audio/dynamic_audio.wav"
video_path = "./examples/ref_video/dynamic.mp4"
result_dir = os.path.join("results")
os.makedirs(result_dir, exist_ok=True)
command = generate_video_command(result_dir, img_path, audio_path, video_path)
subprocess.run(command, check=True)
latest_sub_dir = get_latest_sub_dir(result_dir)
if not latest_sub_dir:
return {"error": "No subdirectory found in result directory"}
result_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##dynamic_audio_enhanced.mp4")
silent_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##dynamic_audio_enhanced_dynamic.mp4")
if os.path.exists(result_video_path):
remove_audio(result_video_path, silent_video_path)
final_destination = save_to_final_destination(silent_video_path, "results/dynamic-video")
record_video_mapping(image_md5, final_destination, record_file)
return FileResponse(final_destination, media_type='video/mp4')
else:
return {"error": "Video file not found"}
@app.post("/silent-video")
async def generate_and_trim_video(image: UploadFile = File(...)):
img_path = os.path.join("silent", image.filename)
save_upload_file(image, img_path)
image_md5 = get_file_md5(img_path)
record_file = "silent_video_record.txt"
existing_video = check_existing_video(image_md5, record_file)
if existing_video:
return FileResponse(existing_video, media_type='video/mp4')
audio_path = "./examples/driven_audio/silent_audio.wav"
video_path = "./examples/ref_video/silent.mp4"
result_dir = os.path.join("results")
os.makedirs(result_dir, exist_ok=True)
command = generate_video_command(result_dir, img_path, audio_path, video_path)
subprocess.run(command, check=True)
latest_sub_dir = get_latest_sub_dir(result_dir)
if not latest_sub_dir:
return {"error": "No subdirectory found in result directory"}
result_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##silent_audio_enhanced.mp4")
trimmed_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##silent_audio_enhanced_trimmed.mp4")
if os.path.exists(result_video_path):
video_duration = get_video_duration(result_video_path)
trim_video(result_video_path, trimmed_video_path, video_duration)
final_destination = save_to_final_destination(trimmed_video_path, "results/silent-video")
record_video_mapping(image_md5, final_destination, record_file)
return FileResponse(final_destination, media_type='video/mp4')
else:
return {"error": "Video file not found"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)