You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

592 lines
23 KiB
Python

###############################################################################
# Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking
# email: lipku@foxmail.com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
from __future__ import annotations
import time
import numpy as np
import soundfile as sf
import resampy
import asyncio
import edge_tts
5 months ago
import os
import hmac
import hashlib
import base64
import json
import uuid
from typing import Iterator
import requests
import queue
from queue import Queue
from io import BytesIO
from threading import Thread, Event
from enum import Enum
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from basereal import BaseReal
from logger import logger
class State(Enum):
RUNNING=0
PAUSE=1
class BaseTTS:
def __init__(self, opt, parent:BaseReal):
self.opt=opt
self.parent = parent
self.fps = opt.fps # 20 ms per frame
self.sample_rate = 16000
self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000)
self.input_stream = BytesIO()
self.msgqueue = Queue()
self.state = State.RUNNING
def flush_talk(self):
self.msgqueue.queue.clear()
self.state = State.PAUSE
def put_msg_txt(self,msg:str,eventpoint=None):
10 months ago
if len(msg)>0:
self.msgqueue.put((msg,eventpoint))
def render(self,quit_event):
process_thread = Thread(target=self.process_tts, args=(quit_event,))
process_thread.start()
def process_tts(self,quit_event):
while not quit_event.is_set():
try:
msg = self.msgqueue.get(block=True, timeout=1)
self.state=State.RUNNING
except queue.Empty:
continue
self.txt_to_audio(msg)
logger.info('ttsreal thread stop')
def txt_to_audio(self,msg):
pass
###########################################################################################
class EdgeTTS(BaseTTS):
def txt_to_audio(self,msg):
voicename = self.opt.REF_FILE #"zh-CN-YunxiaNeural"
text,textevent = msg
t = time.time()
asyncio.new_event_loop().run_until_complete(self.__main(voicename,text))
logger.info(f'-------edge tts time:{time.time()-t:.4f}s')
if self.input_stream.getbuffer().nbytes<=0: #edgetts err
logger.error('edgetts err!!!!!')
return
self.input_stream.seek(0)
stream = self.__create_bytes_stream(self.input_stream)
streamlen = stream.shape[0]
idx=0
while streamlen >= self.chunk and self.state==State.RUNNING:
eventpoint=None
streamlen -= self.chunk
if idx==0:
eventpoint={'status':'start','text':text,'msgevent':textevent}
elif streamlen<self.chunk:
eventpoint={'status':'end','text':text,'msgevent':textevent}
self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
idx += self.chunk
#if streamlen>0: #skip last frame(not 20ms)
# self.queue.put(stream[idx:])
self.input_stream.seek(0)
self.input_stream.truncate()
def __create_bytes_stream(self,byte_stream):
#byte_stream=BytesIO(buffer)
stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
stream = stream.astype(np.float32)
if stream.ndim > 1:
logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
stream = stream[:, 0]
if sample_rate != self.sample_rate and stream.shape[0]>0:
logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
return stream
async def __main(self,voicename: str, text: str):
10 months ago
try:
communicate = edge_tts.Communicate(text, voicename)
#with open(OUTPUT_FILE, "wb") as file:
first = True
async for chunk in communicate.stream():
if first:
first = False
if chunk["type"] == "audio" and self.state==State.RUNNING:
#self.push_audio(chunk["data"])
self.input_stream.write(chunk["data"])
#file.write(chunk["data"])
elif chunk["type"] == "WordBoundary":
pass
except Exception as e:
logger.exception('edgetts')
###########################################################################################
class FishTTS(BaseTTS):
def txt_to_audio(self,msg):
text,textevent = msg
self.stream_tts(
self.fish_speech(
text,
self.opt.REF_FILE,
self.opt.REF_TEXT,
"zh", #en args.language,
self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
),
msg
)
def fish_speech(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
start = time.perf_counter()
req={
'text':text,
'reference_id':reffile,
'format':'wav',
'streaming':True,
'use_memory_cache':'on'
}
try:
res = requests.post(
f"{server_url}/v1/tts",
json=req,
stream=True,
headers={
"content-type": "application/json",
},
)
end = time.perf_counter()
logger.info(f"fish_speech Time to make POST: {end-start}s")
if res.status_code != 200:
logger.error("Error:%s", res.text)
return
first = True
for chunk in res.iter_content(chunk_size=17640): # 1764 44100*20ms*2
#print('chunk len:',len(chunk))
if first:
end = time.perf_counter()
logger.info(f"fish_speech Time to first chunk: {end-start}s")
first = False
if chunk and self.state==State.RUNNING:
yield chunk
#print("gpt_sovits response.elapsed:", res.elapsed)
except Exception as e:
logger.exception('fishtts')
def stream_tts(self,audio_stream,msg):
text,textevent = msg
first = True
for chunk in audio_stream:
if chunk is not None and len(chunk)>0:
stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
stream = resampy.resample(x=stream, sr_orig=44100, sr_new=self.sample_rate)
#byte_stream=BytesIO(buffer)
#stream = self.__create_bytes_stream(byte_stream)
streamlen = stream.shape[0]
idx=0
while streamlen >= self.chunk:
eventpoint=None
if first:
eventpoint={'status':'start','text':text,'msgevent':textevent}
first = False
self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
streamlen -= self.chunk
idx += self.chunk
eventpoint={'status':'end','text':text,'msgevent':textevent}
self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
###########################################################################################
class SovitsTTS(BaseTTS):
def txt_to_audio(self,msg):
text,textevent = msg
self.stream_tts(
self.gpt_sovits(
text,
1 year ago
self.opt.REF_FILE,
self.opt.REF_TEXT,
"zh", #en args.language,
self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
),
msg
)
1 year ago
def gpt_sovits(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
start = time.perf_counter()
1 year ago
req={
'text':text,
'text_lang':language,
'ref_audio_path':reffile,
'prompt_text':reftext,
'prompt_lang':language,
'media_type':'ogg',
1 year ago
'streaming_mode':True
}
# req["text"] = text
# req["text_language"] = language
# req["character"] = character
# req["emotion"] = emotion
# #req["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
# req["streaming_mode"] = True
10 months ago
try:
res = requests.post(
f"{server_url}/tts",
json=req,
stream=True,
)
end = time.perf_counter()
logger.info(f"gpt_sovits Time to make POST: {end-start}s")
10 months ago
if res.status_code != 200:
logger.error("Error:%s", res.text)
10 months ago
return
first = True
for chunk in res.iter_content(chunk_size=None): #12800 1280 32K*20ms*2
logger.info('chunk len:%d',len(chunk))
10 months ago
if first:
end = time.perf_counter()
logger.info(f"gpt_sovits Time to first chunk: {end-start}s")
10 months ago
first = False
if chunk and self.state==State.RUNNING:
yield chunk
#print("gpt_sovits response.elapsed:", res.elapsed)
except Exception as e:
logger.exception('sovits')
def __create_bytes_stream(self,byte_stream):
#byte_stream=BytesIO(buffer)
stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
stream = stream.astype(np.float32)
if stream.ndim > 1:
logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
stream = stream[:, 0]
if sample_rate != self.sample_rate and stream.shape[0]>0:
logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.')
stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
return stream
def stream_tts(self,audio_stream,msg):
text,textevent = msg
first = True
for chunk in audio_stream:
if chunk is not None and len(chunk)>0:
#stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
#stream = resampy.resample(x=stream, sr_orig=32000, sr_new=self.sample_rate)
byte_stream=BytesIO(chunk)
stream = self.__create_bytes_stream(byte_stream)
streamlen = stream.shape[0]
idx=0
while streamlen >= self.chunk:
eventpoint=None
if first:
eventpoint={'status':'start','text':text,'msgevent':textevent}
first = False
self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
streamlen -= self.chunk
idx += self.chunk
eventpoint={'status':'end','text':text,'msgevent':textevent}
self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
11 months ago
###########################################################################################
class CosyVoiceTTS(BaseTTS):
def txt_to_audio(self,msg):
text,textevent = msg
11 months ago
self.stream_tts(
self.cosy_voice(
text,
11 months ago
self.opt.REF_FILE,
self.opt.REF_TEXT,
"zh", #en args.language,
self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
),
msg
11 months ago
)
def cosy_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
start = time.perf_counter()
payload = {
'tts_text': text,
'prompt_text': reftext
}
10 months ago
try:
files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))]
res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True)
11 months ago
10 months ago
end = time.perf_counter()
logger.info(f"cosy_voice Time to make POST: {end-start}s")
10 months ago
if res.status_code != 200:
logger.error("Error:%s", res.text)
10 months ago
return
first = True
5 months ago
for chunk in res.iter_content(chunk_size=9600): # 960 24K*20ms*2
10 months ago
if first:
end = time.perf_counter()
logger.info(f"cosy_voice Time to first chunk: {end-start}s")
10 months ago
first = False
if chunk and self.state==State.RUNNING:
yield chunk
except Exception as e:
logger.exception('cosyvoice')
11 months ago
def stream_tts(self,audio_stream,msg):
text,textevent = msg
first = True
11 months ago
for chunk in audio_stream:
if chunk is not None and len(chunk)>0:
stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
5 months ago
stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
11 months ago
#byte_stream=BytesIO(buffer)
#stream = self.__create_bytes_stream(byte_stream)
streamlen = stream.shape[0]
idx=0
while streamlen >= self.chunk:
eventpoint=None
if first:
eventpoint={'status':'start','text':text,'msgevent':textevent}
first = False
self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
11 months ago
streamlen -= self.chunk
idx += self.chunk
eventpoint={'status':'end','text':text,'msgevent':textevent}
self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
11 months ago
###########################################################################################
5 months ago
_PROTOCOL = "https://"
_HOST = "tts.cloud.tencent.com"
_PATH = "/stream"
_ACTION = "TextToStreamAudio"
class TencentTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt,parent)
self.appid = os.getenv("TENCENT_APPID")
self.secret_key = os.getenv("TENCENT_SECRET_KEY")
self.secret_id = os.getenv("TENCENT_SECRET_ID")
self.voice_type = int(opt.REF_FILE)
self.codec = "pcm"
self.sample_rate = 16000
self.volume = 0
self.speed = 0
def __gen_signature(self, params):
sort_dict = sorted(params.keys())
sign_str = "POST" + _HOST + _PATH + "?"
for key in sort_dict:
sign_str = sign_str + key + "=" + str(params[key]) + '&'
sign_str = sign_str[:-1]
hmacstr = hmac.new(self.secret_key.encode('utf-8'),
sign_str.encode('utf-8'), hashlib.sha1).digest()
s = base64.b64encode(hmacstr)
s = s.decode('utf-8')
return s
def __gen_params(self, session_id, text):
params = dict()
params['Action'] = _ACTION
params['AppId'] = int(self.appid)
params['SecretId'] = self.secret_id
params['ModelType'] = 1
params['VoiceType'] = self.voice_type
params['Codec'] = self.codec
params['SampleRate'] = self.sample_rate
params['Speed'] = self.speed
params['Volume'] = self.volume
params['SessionId'] = session_id
params['Text'] = text
timestamp = int(time.time())
params['Timestamp'] = timestamp
params['Expired'] = timestamp + 24 * 60 * 60
return params
def txt_to_audio(self,msg):
text,textevent = msg
self.stream_tts(
self.tencent_voice(
text,
self.opt.REF_FILE,
self.opt.REF_TEXT,
"zh", #en args.language,
self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url,
),
msg
)
def tencent_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]:
start = time.perf_counter()
session_id = str(uuid.uuid1())
params = self.__gen_params(session_id, text)
signature = self.__gen_signature(params)
headers = {
"Content-Type": "application/json",
"Authorization": str(signature)
}
url = _PROTOCOL + _HOST + _PATH
try:
res = requests.post(url, headers=headers,
data=json.dumps(params), stream=True)
end = time.perf_counter()
logger.info(f"tencent Time to make POST: {end-start}s")
first = True
for chunk in res.iter_content(chunk_size=6400): # 640 16K*20ms*2
#logger.info('chunk len:%d',len(chunk))
if first:
try:
rsp = json.loads(chunk)
#response["Code"] = rsp["Response"]["Error"]["Code"]
#response["Message"] = rsp["Response"]["Error"]["Message"]
logger.error("tencent tts:%s",rsp["Response"]["Error"]["Message"])
return
except:
end = time.perf_counter()
logger.info(f"tencent Time to first chunk: {end-start}s")
first = False
if chunk and self.state==State.RUNNING:
yield chunk
except Exception as e:
logger.exception('tencent')
def stream_tts(self,audio_stream,msg):
text,textevent = msg
first = True
last_stream = np.array([],dtype=np.float32)
for chunk in audio_stream:
if chunk is not None and len(chunk)>0:
stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
stream = np.concatenate((last_stream,stream))
#stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
#byte_stream=BytesIO(buffer)
#stream = self.__create_bytes_stream(byte_stream)
streamlen = stream.shape[0]
idx=0
while streamlen >= self.chunk:
eventpoint=None
if first:
eventpoint={'status':'start','text':text,'msgevent':textevent}
5 months ago
first = False
self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
streamlen -= self.chunk
idx += self.chunk
last_stream = stream[idx:] #get the remain stream
eventpoint={'status':'end','text':text,'msgevent':textevent}
5 months ago
self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)
###########################################################################################
class XTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt,parent)
self.speaker = self.get_speaker(opt.REF_FILE, opt.TTS_SERVER)
def txt_to_audio(self,msg):
text,textevent = msg
self.stream_tts(
self.xtts(
text,
self.speaker,
"zh-cn", #en args.language,
self.opt.TTS_SERVER, #"http://localhost:9000", #args.server_url,
"20" #args.stream_chunk_size
),
msg
)
def get_speaker(self,ref_audio,server_url):
files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))}
response = requests.post(f"{server_url}/clone_speaker", files=files)
return response.json()
def xtts(self,text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]:
start = time.perf_counter()
speaker["text"] = text
speaker["language"] = language
speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
10 months ago
try:
res = requests.post(
f"{server_url}/tts_stream",
json=speaker,
stream=True,
)
end = time.perf_counter()
logger.info(f"xtts Time to make POST: {end-start}s")
10 months ago
if res.status_code != 200:
print("Error:", res.text)
return
10 months ago
first = True
for chunk in res.iter_content(chunk_size=9600): #24K*20ms*2
if first:
end = time.perf_counter()
logger.info(f"xtts Time to first chunk: {end-start}s")
10 months ago
first = False
if chunk:
yield chunk
except Exception as e:
print(e)
def stream_tts(self,audio_stream,msg):
text,textevent = msg
first = True
for chunk in audio_stream:
if chunk is not None and len(chunk)>0:
stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767
stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate)
#byte_stream=BytesIO(buffer)
#stream = self.__create_bytes_stream(byte_stream)
streamlen = stream.shape[0]
idx=0
while streamlen >= self.chunk:
eventpoint=None
if first:
eventpoint={'status':'start','text':text,'msgevent':textevent}
first = False
self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint)
streamlen -= self.chunk
idx += self.chunk
eventpoint={'status':'end','text':text,'msgevent':textevent}
self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)