生成视频功能封装成接口

main
fanpt 9 months ago
parent 92db5752b3
commit f1c1f5f514

@ -0,0 +1,24 @@
from PIL import Image, ImageEnhance, ImageFilter
# 打开图像文件
image_path = r'E:\SadTalker\examples\source_image\91a54181-568f-4cda-8f3c-0f2c811eaf20.jpg'
image = Image.open(image_path)
# 创建ImageEnhance.Color对象
color_enhancer = ImageEnhance.Color(image)
# 增加颜色饱和度(这将给图像添加血色)
# 注意:这个值需要根据你的图像进行调整
saturation_factor = 1.5 # 增加50%的饱和度
color_enhancer = ImageEnhance.Color(image)
color_image = color_enhancer.enhance(saturation_factor)
# 应用高斯模糊,以平滑颜色变化
blurred_image = color_image.filter(ImageFilter.GaussianBlur(1))
# 显示图像
blurred_image.show()
# 保存图像
blurred_image.save('path_to_save_image.jpg')

@ -115,9 +115,9 @@ def main(args):
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("--driven_audio", default='./examples/driven_audio/eluosi.wav', help="path to driven audio")
parser.add_argument("--source_image", default='./examples/source_image/full3.png', help="path to source image")
parser.add_argument("--ref_eyeblink", default=None, help="path to reference video providing eye blinking")
parser.add_argument("--driven_audio", default='./examples/driven_audio/20240315_154953.wav', help="path to driven audio")
parser.add_argument("--source_image", default='./examples/source_image/17.png', help="path to source image")
parser.add_argument("--ref_eyeblink", default='./examples/ref_video/E05005.mp4', help="path to reference video providing eye blinking")
parser.add_argument("--ref_pose", default=None, help="path to reference video providing pose")
parser.add_argument("--checkpoint_dir", default='./checkpoints', help="path to output")
parser.add_argument("--result_dir", default='./results', help="path to output")
@ -127,7 +127,7 @@ if __name__ == '__main__':
parser.add_argument('--input_yaw', nargs='+', type=int, default=None, help="the input yaw degree of the user ")
parser.add_argument('--input_pitch', nargs='+', type=int, default=None, help="the input pitch degree of the user")
parser.add_argument('--input_roll', nargs='+', type=int, default=None, help="the input roll degree of the user")
parser.add_argument('--enhancer', type=str, default=None, help="Face enhancer, [gfpgan, RestoreFormer]")
parser.add_argument('--enhancer', type=str, default='gfpgan', help="Face enhancer, [gfpgan, RestoreFormer]")
parser.add_argument('--background_enhancer', type=str, default=None, help="background enhancer, [realesrgan]")
parser.add_argument("--cpu", dest="cpu", action="store_true")
parser.add_argument("--face3dvis", action="store_true", help="generate 3d face and 3d landmarks")

@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
import os
import subprocess
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
from datetime import datetime
app = FastAPI()
def save_upload_file(upload_file: UploadFile, filename: str):
with open(filename, "wb") as buffer:
buffer.write(upload_file.file.read())
def generate_video_command(result_dir: str, img_path: str, audio_path: str, video_path: str):
return [
"python", "script.py",
"--source_image", img_path,
"--result_dir", result_dir,
"--driven_audio", audio_path,
"--ref_eyeblink", video_path,
]
def get_latest_sub_dir(result_dir: str):
sub_dirs = [os.path.join(result_dir, d) for d in os.listdir(result_dir) if os.path.isdir(os.path.join(result_dir, d))]
if not sub_dirs:
return None
return max(sub_dirs, key=os.path.getmtime)
def get_video_duration(video_path: str):
video_duration_command = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
result = subprocess.run(video_duration_command, capture_output=True, text=True)
return float(result.stdout.strip())
def trim_video(input_video_path: str, output_video_path: str, duration: float):
trim_command = [
"ffmpeg",
"-i", input_video_path,
"-t", str(duration - 2),
"-c", "copy",
output_video_path
]
subprocess.run(trim_command, check=True)
def remove_audio(input_video_path: str, output_video_path: str):
remove_audio_command = [
"ffmpeg",
"-i", input_video_path,
"-an",
"-vcodec", "copy",
output_video_path
]
subprocess.run(remove_audio_command, check=True)
@app.post("/dynamic-video")
async def generate_video(
image: UploadFile = File(...),
):
img_path = "dynamic/dynamic_image.png"
save_upload_file(image, img_path)
audio_path = "./examples/driven_audio/dynamic_audio.wav"
video_path = "./examples/ref_video/dynamic.mp4"
result_dir = os.path.join("results")
os.makedirs(result_dir, exist_ok=True)
command = generate_video_command(result_dir, img_path, audio_path, video_path)
subprocess.run(command, check=True)
latest_sub_dir = get_latest_sub_dir(result_dir)
if not latest_sub_dir:
return {"error": "No subdirectory found in result directory"}
result_video_path = os.path.join(latest_sub_dir, "dynamic_image##dynamic_audio_enhanced.mp4")
silent_video_path = os.path.join(latest_sub_dir, "dynamic_image##dynamic_audio_enhanced_dynamic.mp4")
if os.path.exists(result_video_path):
remove_audio(result_video_path, silent_video_path)
return FileResponse(silent_video_path, media_type='video/mp4')
else:
return {"error": "Video file not found"}
@app.post("/silent-video")
async def generate_and_trim_video(
image: UploadFile = File(...),
):
img_path = "silent/silent_image.png"
save_upload_file(image, img_path)
audio_path = "./examples/driven_audio/silent_audio.wav"
video_path = "./examples/ref_video/silent.mp4"
result_dir = os.path.join("results")
os.makedirs(result_dir, exist_ok=True)
command = generate_video_command(result_dir, img_path, audio_path, video_path)
subprocess.run(command, check=True)
latest_sub_dir = get_latest_sub_dir(result_dir)
if not latest_sub_dir:
return {"error": "No subdirectory found in result directory"}
result_video_path = os.path.join(latest_sub_dir, "silent_image##silent_audio_enhanced.mp4")
trimmed_video_path = os.path.join(latest_sub_dir, "silent_image##silent_audio_enhanced_trimmed.mp4")
if os.path.exists(result_video_path):
video_duration = get_video_duration(result_video_path)
trim_video(result_video_path, trimmed_video_path, video_duration)
return FileResponse(trimmed_video_path, media_type='video/mp4')
else:
return {"error": "Video file not found"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

@ -17,7 +17,7 @@ def load_video_to_cv2(input_path):
full_frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
return full_frames
def save_video_with_watermark(video, audio, save_path, watermark='docs/sadtalker_logo.png'):
def save_video_with_watermark(video, audio, save_path, watermark=None):
temp_file = str(uuid.uuid4())+'.mp4'
cmd = r'ffmpeg -y -i "%s" -i "%s" -vcodec copy "%s"' % (video, audio, temp_file)
os.system(cmd)

@ -0,0 +1,50 @@
import base64
import requests
import wave
def save_audio_to_file(output_file, audio_data):
with wave.open(output_file, 'wb') as wave_file:
wave_file.setnchannels(1) # 设置为单声道
wave_file.setsampwidth(2) # 设置样本宽度以字节为单位2表示16位
wave_file.setframerate(44100) # 设置帧速率
wave_file.writeframes(audio_data)
def text_to_speech(text, output_file):
# 请求参数
request_data = {
"text": text,
"spk_id": 0,
# 语速
"speed": 0.87,
"volume": 1.0,
"sample_rate": 0,
"save_path": output_file
}
# 发送POST请求
response = requests.post("http://192.168.10.138:8090/paddlespeech/tts", json=request_data)
# 解析返回的JSON
response_json = response.json()
if response_json["success"]:
# 获取返回的音频base64编码
base64_audio = response_json["result"]["audio"]
# 将音频保存到文件
with open(output_file, 'wb') as wave_file:
wave_file.write(base64.b64decode(base64_audio))
else:
print("TTS request failed:", response_json["message"]["description"])
# 要发送的语句列表
statements = [
"《庆余年》根据猫腻的小说改编,讲述了少年林殊在乱世中成长的故事。他凭借智慧和勇气,卷入复杂的政治斗争,结识志同道合的盟友和强大的对手,经历挑战与考验,最终成为能左右局势的重要人物。剧中不仅有紧张刺激的情节,还探讨了权力、正义与人性的复杂关系,深受观众喜爱。"
]
# 保存语音的文件名列表
output_files = [f"E:\\SadTalker\\temp\\wav\\tts_result_{i}.wav" for i in range(len(statements))]
# 发送每个语句的请求并保存音频文件
for i in range(len(statements)):
text_to_speech(statements[i], output_files[i])
Loading…
Cancel
Save