############################################################################### # Copyright (C) 2024 LiveTalking@lipku https://github.com/lipku/LiveTalking # email: lipku@foxmail.com # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### from __future__ import annotations import time import numpy as np import soundfile as sf import resampy import asyncio import edge_tts import os import hmac import hashlib import base64 import json import uuid from typing import Iterator import requests import queue from queue import Queue from io import BytesIO import copy,websockets,gzip from threading import Thread, Event from enum import Enum from typing import TYPE_CHECKING if TYPE_CHECKING: from basereal import BaseReal from logger import logger class State(Enum): RUNNING=0 PAUSE=1 class BaseTTS: def __init__(self, opt, parent:BaseReal): self.opt=opt self.parent = parent self.fps = opt.fps # 20 ms per frame self.sample_rate = 16000 self.chunk = self.sample_rate // self.fps # 320 samples per chunk (20ms * 16000 / 1000) self.input_stream = BytesIO() self.msgqueue = Queue() self.state = State.RUNNING def flush_talk(self): self.msgqueue.queue.clear() self.state = State.PAUSE def put_msg_txt(self,msg:str,eventpoint=None): if len(msg)>0: self.msgqueue.put((msg,eventpoint)) def render(self,quit_event): process_thread = Thread(target=self.process_tts, args=(quit_event,)) process_thread.start() def process_tts(self,quit_event): while not quit_event.is_set(): try: msg = self.msgqueue.get(block=True, timeout=1) self.state=State.RUNNING except queue.Empty: continue self.txt_to_audio(msg) logger.info('ttsreal thread stop') def txt_to_audio(self,msg): pass ########################################################################################### class EdgeTTS(BaseTTS): def txt_to_audio(self,msg): voicename = self.opt.REF_FILE #"zh-CN-YunxiaNeural" text,textevent = msg t = time.time() asyncio.new_event_loop().run_until_complete(self.__main(voicename,text)) logger.info(f'-------edge tts time:{time.time()-t:.4f}s') if self.input_stream.getbuffer().nbytes<=0: #edgetts err logger.error('edgetts err!!!!!') return self.input_stream.seek(0) stream = self.__create_bytes_stream(self.input_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk and self.state==State.RUNNING: eventpoint=None streamlen -= self.chunk if idx==0: eventpoint={'status':'start','text':text,'msgevent':textevent} elif streamlen0: #skip last frame(not 20ms) # self.queue.put(stream[idx:]) self.input_stream.seek(0) self.input_stream.truncate() def __create_bytes_stream(self,byte_stream): #byte_stream=BytesIO(buffer) stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64 logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.') stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0]>0: logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream async def __main(self,voicename: str, text: str): try: communicate = edge_tts.Communicate(text, voicename) #with open(OUTPUT_FILE, "wb") as file: first = True async for chunk in communicate.stream(): if first: first = False if chunk["type"] == "audio" and self.state==State.RUNNING: #self.push_audio(chunk["data"]) self.input_stream.write(chunk["data"]) #file.write(chunk["data"]) elif chunk["type"] == "WordBoundary": pass except Exception as e: logger.exception('edgetts') ########################################################################################### class FishTTS(BaseTTS): def txt_to_audio(self,msg): text,textevent = msg self.stream_tts( self.fish_speech( text, self.opt.REF_FILE, self.opt.REF_TEXT, "zh", #en args.language, self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def fish_speech(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() req={ 'text':text, 'reference_id':reffile, 'format':'wav', 'streaming':True, 'use_memory_cache':'on' } try: res = requests.post( f"{server_url}/v1/tts", json=req, stream=True, headers={ "content-type": "application/json", }, ) end = time.perf_counter() logger.info(f"fish_speech Time to make POST: {end-start}s") if res.status_code != 200: logger.error("Error:%s", res.text) return first = True for chunk in res.iter_content(chunk_size=17640): # 1764 44100*20ms*2 #print('chunk len:',len(chunk)) if first: end = time.perf_counter() logger.info(f"fish_speech Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk #print("gpt_sovits response.elapsed:", res.elapsed) except Exception as e: logger.exception('fishtts') def stream_tts(self,audio_stream,msg): text,textevent = msg first = True for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = resampy.resample(x=stream, sr_orig=44100, sr_new=self.sample_rate) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint=None if first: eventpoint={'status':'start','text':text,'msgevent':textevent} first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk eventpoint={'status':'end','text':text,'msgevent':textevent} self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### class SovitsTTS(BaseTTS): def txt_to_audio(self,msg): text,textevent = msg self.stream_tts( self.gpt_sovits( text=text, reffile=self.opt.REF_FILE, reftext=self.opt.REF_TEXT, language="zh", #en args.language, server_url=self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def gpt_sovits(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() req={ 'text':text, 'text_lang':language, 'ref_audio_path':reffile, 'prompt_text':reftext, 'prompt_lang':language, 'media_type':'ogg', 'streaming_mode':True } # req["text"] = text # req["text_language"] = language # req["character"] = character # req["emotion"] = emotion # #req["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality # req["streaming_mode"] = True try: res = requests.post( f"{server_url}/tts", json=req, stream=True, ) end = time.perf_counter() logger.info(f"gpt_sovits Time to make POST: {end-start}s") if res.status_code != 200: logger.error("Error:%s", res.text) return first = True for chunk in res.iter_content(chunk_size=None): #12800 1280 32K*20ms*2 logger.info('chunk len:%d',len(chunk)) if first: end = time.perf_counter() logger.info(f"gpt_sovits Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk #print("gpt_sovits response.elapsed:", res.elapsed) except Exception as e: logger.exception('sovits') def __create_bytes_stream(self,byte_stream): #byte_stream=BytesIO(buffer) stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64 logger.info(f'[INFO]tts audio stream {sample_rate}: {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.info(f'[WARN] audio has {stream.shape[1]} channels, only use the first.') stream = stream[:, 0] if sample_rate != self.sample_rate and stream.shape[0]>0: logger.info(f'[WARN] audio sample rate is {sample_rate}, resampling into {self.sample_rate}.') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate) return stream def stream_tts(self,audio_stream,msg): text,textevent = msg first = True for chunk in audio_stream: if chunk is not None and len(chunk)>0: #stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 #stream = resampy.resample(x=stream, sr_orig=32000, sr_new=self.sample_rate) byte_stream=BytesIO(chunk) stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint=None if first: eventpoint={'status':'start','text':text,'msgevent':textevent} first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk eventpoint={'status':'end','text':text,'msgevent':textevent} self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### class CosyVoiceTTS(BaseTTS): def txt_to_audio(self,msg): text,textevent = msg self.stream_tts( self.cosy_voice( text, self.opt.REF_FILE, self.opt.REF_TEXT, "zh", #en args.language, self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def cosy_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() payload = { 'tts_text': text, 'prompt_text': reftext } try: files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))] res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True) end = time.perf_counter() logger.info(f"cosy_voice Time to make POST: {end-start}s") if res.status_code != 200: logger.error("Error:%s", res.text) return first = True for chunk in res.iter_content(chunk_size=9600): # 960 24K*20ms*2 if first: end = time.perf_counter() logger.info(f"cosy_voice Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk except Exception as e: logger.exception('cosyvoice') def stream_tts(self,audio_stream,msg): text,textevent = msg first = True for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint=None if first: eventpoint={'status':'start','text':text,'msgevent':textevent} first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk eventpoint={'status':'end','text':text,'msgevent':textevent} self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### _PROTOCOL = "https://" _HOST = "tts.cloud.tencent.com" _PATH = "/stream" _ACTION = "TextToStreamAudio" class TencentTTS(BaseTTS): def __init__(self, opt, parent): super().__init__(opt,parent) self.appid = os.getenv("TENCENT_APPID") self.secret_key = os.getenv("TENCENT_SECRET_KEY") self.secret_id = os.getenv("TENCENT_SECRET_ID") self.voice_type = int(opt.REF_FILE) self.codec = "pcm" self.sample_rate = 16000 self.volume = 0 self.speed = 0 def __gen_signature(self, params): sort_dict = sorted(params.keys()) sign_str = "POST" + _HOST + _PATH + "?" for key in sort_dict: sign_str = sign_str + key + "=" + str(params[key]) + '&' sign_str = sign_str[:-1] hmacstr = hmac.new(self.secret_key.encode('utf-8'), sign_str.encode('utf-8'), hashlib.sha1).digest() s = base64.b64encode(hmacstr) s = s.decode('utf-8') return s def __gen_params(self, session_id, text): params = dict() params['Action'] = _ACTION params['AppId'] = int(self.appid) params['SecretId'] = self.secret_id params['ModelType'] = 1 params['VoiceType'] = self.voice_type params['Codec'] = self.codec params['SampleRate'] = self.sample_rate params['Speed'] = self.speed params['Volume'] = self.volume params['SessionId'] = session_id params['Text'] = text timestamp = int(time.time()) params['Timestamp'] = timestamp params['Expired'] = timestamp + 24 * 60 * 60 return params def txt_to_audio(self,msg): text,textevent = msg self.stream_tts( self.tencent_voice( text, self.opt.REF_FILE, self.opt.REF_TEXT, "zh", #en args.language, self.opt.TTS_SERVER, #"http://127.0.0.1:5000", #args.server_url, ), msg ) def tencent_voice(self, text, reffile, reftext,language, server_url) -> Iterator[bytes]: start = time.perf_counter() session_id = str(uuid.uuid1()) params = self.__gen_params(session_id, text) signature = self.__gen_signature(params) headers = { "Content-Type": "application/json", "Authorization": str(signature) } url = _PROTOCOL + _HOST + _PATH try: res = requests.post(url, headers=headers, data=json.dumps(params), stream=True) end = time.perf_counter() logger.info(f"tencent Time to make POST: {end-start}s") first = True for chunk in res.iter_content(chunk_size=6400): # 640 16K*20ms*2 #logger.info('chunk len:%d',len(chunk)) if first: try: rsp = json.loads(chunk) #response["Code"] = rsp["Response"]["Error"]["Code"] #response["Message"] = rsp["Response"]["Error"]["Message"] logger.error("tencent tts:%s",rsp["Response"]["Error"]["Message"]) return except: end = time.perf_counter() logger.info(f"tencent Time to first chunk: {end-start}s") first = False if chunk and self.state==State.RUNNING: yield chunk except Exception as e: logger.exception('tencent') def stream_tts(self,audio_stream,msg): text,textevent = msg first = True last_stream = np.array([],dtype=np.float32) for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = np.concatenate((last_stream,stream)) #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint=None if first: eventpoint={'status':'start','text':text,'msgevent':textevent} first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk last_stream = stream[idx:] #get the remain stream eventpoint={'status':'end','text':text,'msgevent':textevent} self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint) ########################################################################################### class DoubaoTTS(BaseTTS): def __init__(self, opt, parent): super().__init__(opt, parent) # 从配置中读取火山引擎参数 self.appid = os.getenv("DOUBAO_APPID") self.token = os.getenv("DOUBAO_TOKEN") _cluster = 'volcano_tts' _host = "openspeech.bytedance.com" self.api_url = f"wss://{_host}/api/v1/tts/ws_binary" self.request_json = { "app": { "appid": self.appid, "token": "access_token", "cluster": _cluster }, "user": { "uid": "xxx" }, "audio": { "voice_type": "xxx", "encoding": "pcm", "rate": 16000, "speed_ratio": 1.0, "volume_ratio": 1.0, "pitch_ratio": 1.0, }, "request": { "reqid": "xxx", "text": "字节跳动语音合成。", "text_type": "plain", "operation": "xxx" } } async def doubao_voice(self, text): # -> Iterator[bytes]: start = time.perf_counter() voice_type = self.opt.REF_FILE try: # 创建请求对象 default_header = bytearray(b'\x11\x10\x11\x00') submit_request_json = copy.deepcopy(self.request_json) submit_request_json["user"]["uid"] = self.parent.sessionid submit_request_json["audio"]["voice_type"] = voice_type submit_request_json["request"]["text"] = text submit_request_json["request"]["reqid"] = str(uuid.uuid4()) submit_request_json["request"]["operation"] = "submit" payload_bytes = str.encode(json.dumps(submit_request_json)) payload_bytes = gzip.compress(payload_bytes) # if no compression, comment this line full_client_request = bytearray(default_header) full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes) full_client_request.extend(payload_bytes) # payload header = {"Authorization": f"Bearer; {self.token}"} first = True async with websockets.connect(self.api_url, extra_headers=header, ping_interval=None) as ws: await ws.send(full_client_request) while True: res = await ws.recv() header_size = res[0] & 0x0f message_type = res[1] >> 4 message_type_specific_flags = res[1] & 0x0f payload = res[header_size*4:] if message_type == 0xb: # audio-only server response if message_type_specific_flags == 0: # no sequence number as ACK #print(" Payload size: 0") continue else: if first: end = time.perf_counter() logger.info(f"doubao tts Time to first chunk: {end-start}s") first = False sequence_number = int.from_bytes(payload[:4], "big", signed=True) payload_size = int.from_bytes(payload[4:8], "big", signed=False) payload = payload[8:] yield payload if sequence_number < 0: break else: break except Exception as e: logger.exception('doubao') # # 检查响应状态码 # if response.status_code == 200: # # 处理响应数据 # audio_data = base64.b64decode(response.json().get('data')) # yield audio_data # else: # logger.error(f"请求失败,状态码: {response.status_code}") # return def txt_to_audio(self, msg): text, textevent = msg asyncio.new_event_loop().run_until_complete( self.stream_tts( self.doubao_voice(text), msg ) ) async def stream_tts(self, audio_stream, msg): text, textevent = msg first = True last_stream = np.array([],dtype=np.float32) async for chunk in audio_stream: if chunk is not None and len(chunk) > 0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = np.concatenate((last_stream,stream)) #stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) # byte_stream=BytesIO(buffer) # stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx = 0 while streamlen >= self.chunk: eventpoint = None if first: eventpoint = {'status': 'start', 'text': text, 'msgenvent': textevent} first = False self.parent.put_audio_frame(stream[idx:idx + self.chunk], eventpoint) streamlen -= self.chunk idx += self.chunk last_stream = stream[idx:] #get the remain stream eventpoint = {'status': 'end', 'text': text, 'msgenvent': textevent} self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint) ########################################################################################### class XTTS(BaseTTS): def __init__(self, opt, parent): super().__init__(opt,parent) self.speaker = self.get_speaker(opt.REF_FILE, opt.TTS_SERVER) def txt_to_audio(self,msg): text,textevent = msg self.stream_tts( self.xtts( text, self.speaker, "zh-cn", #en args.language, self.opt.TTS_SERVER, #"http://localhost:9000", #args.server_url, "20" #args.stream_chunk_size ), msg ) def get_speaker(self,ref_audio,server_url): files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))} response = requests.post(f"{server_url}/clone_speaker", files=files) return response.json() def xtts(self,text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]: start = time.perf_counter() speaker["text"] = text speaker["language"] = language speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality try: res = requests.post( f"{server_url}/tts_stream", json=speaker, stream=True, ) end = time.perf_counter() logger.info(f"xtts Time to make POST: {end-start}s") if res.status_code != 200: print("Error:", res.text) return first = True for chunk in res.iter_content(chunk_size=9600): #24K*20ms*2 if first: end = time.perf_counter() logger.info(f"xtts Time to first chunk: {end-start}s") first = False if chunk: yield chunk except Exception as e: print(e) def stream_tts(self,audio_stream,msg): text,textevent = msg first = True for chunk in audio_stream: if chunk is not None and len(chunk)>0: stream = np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32767 stream = resampy.resample(x=stream, sr_orig=24000, sr_new=self.sample_rate) #byte_stream=BytesIO(buffer) #stream = self.__create_bytes_stream(byte_stream) streamlen = stream.shape[0] idx=0 while streamlen >= self.chunk: eventpoint=None if first: eventpoint={'status':'start','text':text,'msgevent':textevent} first = False self.parent.put_audio_frame(stream[idx:idx+self.chunk],eventpoint) streamlen -= self.chunk idx += self.chunk eventpoint={'status':'end','text':text,'msgevent':textevent} self.parent.put_audio_frame(np.zeros(self.chunk,np.float32),eventpoint)