更新接口字段

main
fanpt 8 months ago
parent fc15a6fbba
commit e14e739df9

@ -3,8 +3,9 @@
import os
import subprocess
import uuid
import base64
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from fastapi.responses import JSONResponse
from hashlib import md5
from typing import Dict
@ -12,11 +13,14 @@ app = FastAPI()
tasks: Dict[str, dict] = {} # To store the status and result of each task
md5_to_uid: Dict[str, str] = {} # To map md5 to uid
ALLOWED_IMAGE_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
def save_upload_file(upload_file: UploadFile, filename: str):
with open(filename, "wb") as buffer:
buffer.write(upload_file.file.read())
def is_allowed_file(filename: str):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_IMAGE_EXTENSIONS
def generate_video_command(result_dir: str, img_path: str, audio_path: str, video_path: str):
return [
@ -27,48 +31,19 @@ def generate_video_command(result_dir: str, img_path: str, audio_path: str, vide
"--ref_eyeblink", video_path,
]
def get_latest_sub_dir(result_dir: str):
sub_dirs = [os.path.join(result_dir, d) for d in os.listdir(result_dir) if
os.path.isdir(os.path.join(result_dir, d))]
if not sub_dirs:
return None
return max(sub_dirs, key=os.path.getmtime)
sub_dirs = [os.path.join(result_dir, d) for d in os.listdir(result_dir) if os.path.isdir(os.path.join(result_dir, d))]
return max(sub_dirs, key=os.path.getmtime, default=None)
def get_video_duration(video_path: str):
video_duration_command = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
result = subprocess.run(video_duration_command, capture_output=True, text=True)
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path],
capture_output=True, text=True
)
return float(result.stdout.strip())
def trim_video(input_video_path: str, output_video_path: str, duration: float):
trim_command = [
"ffmpeg",
"-i", input_video_path,
"-t", str(duration - 2),
"-c", "copy",
output_video_path
]
subprocess.run(trim_command, check=True)
def remove_audio(input_video_path: str, output_video_path: str):
remove_audio_command = [
"ffmpeg",
"-i", input_video_path,
"-an",
"-vcodec", "copy",
output_video_path
]
subprocess.run(remove_audio_command, check=True)
def run_ffmpeg_command(command: list):
subprocess.run(command, check=True)
def save_to_final_destination(source_path: str, destination_dir: str):
os.makedirs(destination_dir, exist_ok=True)
@ -76,7 +51,6 @@ def save_to_final_destination(source_path: str, destination_dir: str):
os.rename(source_path, destination_path)
return destination_path
def get_file_md5(file_path: str):
hash_md5 = md5()
with open(file_path, "rb") as f:
@ -84,12 +58,10 @@ def get_file_md5(file_path: str):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def record_video_mapping(image_md5: str, video_path: str, record_file: str):
with open(record_file, "a") as f:
f.write(f"{image_md5} {video_path}\n")
def check_existing_video(image_md5: str, record_file: str):
if not os.path.exists(record_file):
return None
@ -100,11 +72,10 @@ def check_existing_video(image_md5: str, record_file: str):
return video_path
return None
def process_dynamic_video(uid: str, image_md5: str, img_path: str):
record_file = "dynamic_video_record.txt"
audio_path = "./examples/driven_audio/dynamic_audio.wav"
video_path = "./examples/ref_video/dynamic.mp4"
def process_video(uid: str, image_md5: str, img_path: str, audio_type: str):
record_file = f"{audio_type}_video_record.txt"
audio_path = f"./examples/driven_audio/{audio_type}_audio.wav"
video_path = f"./examples/ref_video/{audio_type}.mp4"
result_dir = os.path.join("results")
os.makedirs(result_dir, exist_ok=True)
@ -117,112 +88,73 @@ def process_dynamic_video(uid: str, image_md5: str, img_path: str):
tasks[uid]['status'] = 'failed'
return
result_video_path = os.path.join(latest_sub_dir,
f"{os.path.splitext(os.path.basename(img_path))[0]}##dynamic_audio_enhanced.mp4")
silent_video_path = os.path.join(latest_sub_dir,
f"{os.path.splitext(os.path.basename(img_path))[0]}##dynamic_audio_enhanced_dynamic.mp4")
base_filename = os.path.splitext(os.path.basename(img_path))[0]
result_video_path = os.path.join(latest_sub_dir, f"{base_filename}##{audio_type}_audio_enhanced.mp4")
processed_video_path = os.path.join(latest_sub_dir, f"{base_filename}##{audio_type}_audio_enhanced_processed.mp4")
if os.path.exists(result_video_path):
remove_audio(result_video_path, silent_video_path)
final_destination = save_to_final_destination(silent_video_path, "results/dynamic-video")
record_video_mapping(image_md5, final_destination, record_file)
tasks[uid]['status'] = 'completed'
tasks[uid]['video_path'] = final_destination
else:
tasks[uid]['status'] = 'failed'
def process_silent_video(uid: str, image_md5: str, img_path: str):
record_file = "silent_video_record.txt"
audio_path = "./examples/driven_audio/silent_audio.wav"
video_path = "./examples/ref_video/silent.mp4"
result_dir = os.path.join("results")
os.makedirs(result_dir, exist_ok=True)
command = generate_video_command(result_dir, img_path, audio_path, video_path)
subprocess.run(command, check=True)
latest_sub_dir = get_latest_sub_dir(result_dir)
if not latest_sub_dir:
tasks[uid]['status'] = 'failed'
return
result_video_path = os.path.join(latest_sub_dir,
f"{os.path.splitext(os.path.basename(img_path))[0]}##silent_audio_enhanced.mp4")
trimmed_video_path = os.path.join(latest_sub_dir,
f"{os.path.splitext(os.path.basename(img_path))[0]}##silent_audio_enhanced_trimmed.mp4")
if audio_type == "dynamic":
run_ffmpeg_command(["ffmpeg", "-i", result_video_path, "-an", "-vcodec", "copy", processed_video_path])
else:
video_duration = get_video_duration(result_video_path)
run_ffmpeg_command(["ffmpeg", "-i", result_video_path, "-t", str(video_duration - 2), "-c", "copy", processed_video_path])
if os.path.exists(result_video_path):
video_duration = get_video_duration(result_video_path)
trim_video(result_video_path, trimmed_video_path, video_duration)
final_destination = save_to_final_destination(trimmed_video_path, "results/silent-video")
final_destination = save_to_final_destination(processed_video_path, f"results/{audio_type}-video")
record_video_mapping(image_md5, final_destination, record_file)
tasks[uid]['status'] = 'completed'
tasks[uid]['video_path'] = final_destination
else:
tasks[uid]['status'] = 'failed'
def encode_video_to_base64(video_path: str):
with open(video_path, "rb") as video_file:
return base64.b64encode(video_file.read()).decode('utf-8')
@app.post("/dynamic-video")
async def generate_video(background_tasks: BackgroundTasks, image: UploadFile = File(...)):
img_path = os.path.join("dynamic", image.filename)
async def handle_video_request(background_tasks: BackgroundTasks, image: UploadFile, audio_type: str):
if not is_allowed_file(image.filename):
return JSONResponse(status_code=400, content={"code": 400, "message": "Invalid file type. Only images (png, jpg, jpeg, gif) are allowed.", "uid": "", "video": ""})
img_path = os.path.join(audio_type, image.filename)
save_upload_file(image, img_path)
image_md5 = get_file_md5(img_path)
record_file = "dynamic_video_record.txt"
record_file = f"{audio_type}_video_record.txt"
existing_video = check_existing_video(image_md5, record_file)
if existing_video:
return FileResponse(existing_video, media_type='video/mp4')
video_base64 = encode_video_to_base64(existing_video)
return JSONResponse(content={"code": 200, "message": "Video retrieved successfully.", "uid": "", "video": video_base64})
if image_md5 in md5_to_uid:
uid = md5_to_uid[image_md5]
return {"message": "Video is being generated, please check back later.", "uid": uid}
return JSONResponse(content={"code": 200, "message": "Video is being generated, please check back later.", "uid": uid, "video": ""})
uid = str(uuid.uuid4())
tasks[uid] = {'status': 'processing'}
md5_to_uid[image_md5] = uid
background_tasks.add_task(process_dynamic_video, uid, image_md5, img_path)
return {"message": "Video is being generated, please check back later.", "uid": uid}
background_tasks.add_task(process_video, uid, image_md5, img_path, audio_type)
return JSONResponse(content={"code": 200, "message": "Video is being generated, please check back later.", "uid": uid, "video": ""})
@app.post("/dynamic-video")
async def generate_dynamic_video(background_tasks: BackgroundTasks, image: UploadFile = File(...)):
return await handle_video_request(background_tasks, image, "dynamic")
@app.post("/silent-video")
async def generate_and_trim_video(background_tasks: BackgroundTasks, image: UploadFile = File(...)):
img_path = os.path.join("silent", image.filename)
save_upload_file(image, img_path)
image_md5 = get_file_md5(img_path)
record_file = "silent_video_record.txt"
existing_video = check_existing_video(image_md5, record_file)
if existing_video:
return FileResponse(existing_video, media_type='video/mp4')
if image_md5 in md5_to_uid:
uid = md5_to_uid[image_md5]
return {"message": "Video is being generated, please check back later.", "uid": uid}
uid = str(uuid.uuid4())
tasks[uid] = {'status': 'processing'}
md5_to_uid[image_md5] = uid
background_tasks.add_task(process_silent_video, uid, image_md5, img_path)
return {"message": "Video is being generated, please check back later.", "uid": uid}
async def generate_silent_video(background_tasks: BackgroundTasks, image: UploadFile = File(...)):
return await handle_video_request(background_tasks, image, "silent")
@app.get("/status/{uid}")
async def get_status(uid: str):
task = tasks.get(uid)
if not task:
return JSONResponse(status_code=404, content={"message": "Task not found"})
return JSONResponse(status_code=404, content={"code": 500, "status": "not found", "message": "Task not found", "video": ""})
if task['status'] == 'completed':
return FileResponse(task['video_path'], media_type='video/mp4')
video_base64 = encode_video_to_base64(task['video_path'])
return JSONResponse(content={"code": 200, "status": task['status'], "message": "Video generation completed.", "video": video_base64})
elif task['status'] == 'failed':
return JSONResponse(status_code=500, content={"message": "Video generation failed"})
return JSONResponse(status_code=500, content={"code": 500, "status": task['status'], "message": "Video generation failed", "video": ""})
else:
return {"status": task['status']}
return JSONResponse(content={"code": 200, "status": task['status'], "message": "Video is being generated.", "video": ""})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Loading…
Cancel
Save