添加新功能:上传相同图片返回已生成视频

main
fanpt 1 year ago
parent caf55a934e
commit c8248cb584

@ -4,7 +4,7 @@ import os
import subprocess import subprocess
from fastapi import FastAPI, File, UploadFile from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from datetime import datetime from hashlib import md5
app = FastAPI() app = FastAPI()
@ -58,15 +58,45 @@ def remove_audio(input_video_path: str, output_video_path: str):
] ]
subprocess.run(remove_audio_command, check=True) subprocess.run(remove_audio_command, check=True)
def save_to_final_destination(source_path: str, destination_dir: str):
os.makedirs(destination_dir, exist_ok=True)
destination_path = os.path.join(destination_dir, os.path.basename(source_path))
os.rename(source_path, destination_path)
return destination_path
def get_file_md5(file_path: str):
hash_md5 = md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def record_video_mapping(image_path: str, video_path: str, record_file: str):
with open(record_file, "a") as f:
f.write(f"{image_path} {video_path}\n")
def check_existing_video(image_md5: str, record_file: str):
if not os.path.exists(record_file):
return None
with open(record_file, "r") as f:
for line in f:
recorded_image_md5, video_path = line.strip().split()
if recorded_image_md5 == image_md5:
return video_path
return None
@app.post("/dynamic-video") @app.post("/dynamic-video")
async def generate_video( async def generate_video(image: UploadFile = File(...)):
image: UploadFile = File(...), img_path = os.path.join("dynamic", image.filename)
):
img_path = "dynamic/dynamic_image.png"
save_upload_file(image, img_path) save_upload_file(image, img_path)
audio_path = "./examples/driven_audio/dynamic_audio.wav" image_md5 = get_file_md5(img_path)
record_file = "dynamic_video_record.txt"
existing_video = check_existing_video(image_md5, record_file)
if existing_video:
return FileResponse(existing_video, media_type='video/mp4')
audio_path = "./examples/driven_audio/dynamic_audio.wav"
video_path = "./examples/ref_video/dynamic.mp4" video_path = "./examples/ref_video/dynamic.mp4"
result_dir = os.path.join("results") result_dir = os.path.join("results")
@ -79,24 +109,29 @@ async def generate_video(
if not latest_sub_dir: if not latest_sub_dir:
return {"error": "No subdirectory found in result directory"} return {"error": "No subdirectory found in result directory"}
result_video_path = os.path.join(latest_sub_dir, "dynamic_image##dynamic_audio_enhanced.mp4") result_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##dynamic_audio_enhanced.mp4")
silent_video_path = os.path.join(latest_sub_dir, "dynamic_image##dynamic_audio_enhanced_dynamic.mp4") silent_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##dynamic_audio_enhanced_dynamic.mp4")
if os.path.exists(result_video_path): if os.path.exists(result_video_path):
remove_audio(result_video_path, silent_video_path) remove_audio(result_video_path, silent_video_path)
return FileResponse(silent_video_path, media_type='video/mp4') final_destination = save_to_final_destination(silent_video_path, "results/dynamic-video")
record_video_mapping(image_md5, final_destination, record_file)
return FileResponse(final_destination, media_type='video/mp4')
else: else:
return {"error": "Video file not found"} return {"error": "Video file not found"}
@app.post("/silent-video") @app.post("/silent-video")
async def generate_and_trim_video( async def generate_and_trim_video(image: UploadFile = File(...)):
image: UploadFile = File(...), img_path = os.path.join("silent", image.filename)
):
img_path = "silent/silent_image.png"
save_upload_file(image, img_path) save_upload_file(image, img_path)
audio_path = "./examples/driven_audio/silent_audio.wav" image_md5 = get_file_md5(img_path)
record_file = "silent_video_record.txt"
existing_video = check_existing_video(image_md5, record_file)
if existing_video:
return FileResponse(existing_video, media_type='video/mp4')
audio_path = "./examples/driven_audio/silent_audio.wav"
video_path = "./examples/ref_video/silent.mp4" video_path = "./examples/ref_video/silent.mp4"
result_dir = os.path.join("results") result_dir = os.path.join("results")
@ -109,13 +144,15 @@ async def generate_and_trim_video(
if not latest_sub_dir: if not latest_sub_dir:
return {"error": "No subdirectory found in result directory"} return {"error": "No subdirectory found in result directory"}
result_video_path = os.path.join(latest_sub_dir, "silent_image##silent_audio_enhanced.mp4") result_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##silent_audio_enhanced.mp4")
trimmed_video_path = os.path.join(latest_sub_dir, "silent_image##silent_audio_enhanced_trimmed.mp4") trimmed_video_path = os.path.join(latest_sub_dir, f"{os.path.splitext(image.filename)[0]}##silent_audio_enhanced_trimmed.mp4")
if os.path.exists(result_video_path): if os.path.exists(result_video_path):
video_duration = get_video_duration(result_video_path) video_duration = get_video_duration(result_video_path)
trim_video(result_video_path, trimmed_video_path, video_duration) trim_video(result_video_path, trimmed_video_path, video_duration)
return FileResponse(trimmed_video_path, media_type='video/mp4') final_destination = save_to_final_destination(trimmed_video_path, "results/silent-video")
record_video_mapping(image_md5, final_destination, record_file)
return FileResponse(final_destination, media_type='video/mp4')
else: else:
return {"error": "Video file not found"} return {"error": "Video file not found"}

Loading…
Cancel
Save