添加is_speaking通过接口修改状态

main
fanpt 3 weeks ago
parent b51b8f3ea9
commit ff7a44477e

@ -310,7 +310,7 @@ class DouyinLiveWebFetcher:
class DouyinLiveWebReply: class DouyinLiveWebReply:
def __init__(self, queue: PromptQueue): def __init__(self, queue: PromptQueue, config):
self.queue = queue self.queue = queue
self.system_text_list = [] self.system_text_list = []
self.backend_token = '' self.backend_token = ''
@ -318,6 +318,7 @@ class DouyinLiveWebReply:
self.punctuation = ",.!;:,。!?:;" self.punctuation = ",.!;:,。!?:;"
self.system_message_index = 0 self.system_message_index = 0
self.response_queue = PromptQueue(10) self.response_queue = PromptQueue(10)
self.config = config
def _llm(self, prompt, stream=False): def _llm(self, prompt, stream=False):
payload = { payload = {
@ -432,9 +433,7 @@ class DouyinLiveWebReply:
# 加一个计数器统计is_speaking连续为False的次数如果超过10次才算真正的未在说话 # 加一个计数器统计is_speaking连续为False的次数如果超过10次才算真正的未在说话
while True: while True:
try: try:
is_speaking = requests.post(f'{self.live_chat_config.livetalking_address}/is_speaking', is_speaking = self.config['is_speaking']
json={'sessionid': self.live_chat_config.livetalking_sessionid},
timeout=5).json()['data']
if is_speaking: if is_speaking:
prompt_data = self.queue.get(False) prompt_data = self.queue.get(False)
if prompt_data is not None: if prompt_data is not None:
@ -501,6 +500,7 @@ class DouyinLiveWebReply:
if not self.response_queue.empty(): if not self.response_queue.empty():
reply_message = self.response_queue.get() reply_message = self.response_queue.get()
reply_message = self.reply_message_postprocess(reply_message) reply_message = self.reply_message_postprocess(reply_message)
logger.info(f"开始播放回复弹幕:{reply_message}")
else: else:
precedence_message = self.live_chat_config.precedence_reply_message precedence_message = self.live_chat_config.precedence_reply_message
if not precedence_message: if not precedence_message:
@ -518,11 +518,13 @@ class DouyinLiveWebReply:
reply_message, _id = precedence_message reply_message, _id = precedence_message
# 置空优先文案 # 置空优先文案
self.live_chat_config.flush_precedence_reply_message() self.live_chat_config.flush_precedence_reply_message()
logger.info(f"开始播放系统文案:{reply_message}")
# 判断self.response_queue.empty()true则打印开始播放弹幕回复false则打印开始播放系统文案 # 判断self.response_queue.empty()true则打印开始播放弹幕回复false则打印开始播放系统文案
logger.info(f'开始播放{"弹幕回复" if not self.response_queue.empty() else "系统文案"}: {reply_message}') # logger.info(f'开始播放{"弹幕回复" if not self.response_queue.empty() else "系统文案"}: {reply_message}')
self.post_to_human_sync(reply_message) self.post_to_human_sync(reply_message)
# 等0.5秒再检测 # 等0.5秒再检测
time.sleep(0.5) time.sleep(1)
except Exception: except Exception:
# 发生异常,输出系统文案 # 发生异常,输出系统文案

@ -1,33 +1,88 @@
from liveMan import DouyinLiveWebFetcher, DouyinLiveWebReply import time
from helper import PromptQueue, LiveChatConfig from fastapi import FastAPI
from multiprocessing import Process, Event, freeze_support from pydantic import BaseModel
import uvicorn
from multiprocessing import Process, Event, freeze_support, Manager
from loguru import logger from loguru import logger
from liveMan import DouyinLiveWebFetcher, DouyinLiveWebReply
from helper import PromptQueue, LiveChatConfig
logger.add('log.log', encoding='utf-8', rotation="500MB") logger.add('log.log', encoding='utf-8', rotation="500MB")
# FastAPI应用实例
app = FastAPI(title="直播状态控制接口")
# 请求体模型
class SpeakingStatus(BaseModel):
is_speaking: bool
# 全局变量用于在FastAPI进程中存储共享字典引用
app.state.shared_config = None
# 接口更新is_speaking状态
@app.post("/speaking-status", summary="更新说话状态")
def update_speaking_status(status: SpeakingStatus):
if app.state.shared_config is None:
return {"error": "服务未初始化"}
app.state.shared_config['is_speaking'] = status.is_speaking
# logger.info(f"通过API更新is_speaking状态为: {status.is_speaking}")
return {
"message": "状态更新成功",
"current_status": {
"is_speaking": app.state.shared_config['is_speaking'],
}
}
# 启动FastAPI服务的函数接收共享字典作为参数
def start_fastapi_server(shared_config):
# 将共享字典存储到FastAPI应用的状态中
app.state.shared_config = shared_config
# 启动UVicorn服务器
uvicorn.run(app, host="0.0.0.0", port=8000)
def fetch_user_chat_content(ws_open_event, queue): def fetch_user_chat_content(ws_open_event, queue):
fetcher = DouyinLiveWebFetcher(ws_open_event, queue) fetcher = DouyinLiveWebFetcher(ws_open_event, queue)
fetcher.start() fetcher.start()
def reply_user_chat_content(queue, config):
def reply_user_chat_content(queue): reply = DouyinLiveWebReply(queue, config)
reply = DouyinLiveWebReply(queue)
reply() reply()
if __name__ == '__main__': if __name__ == '__main__':
freeze_support() freeze_support()
LiveChatConfig().update_chat_enable_status('启动中') LiveChatConfig().update_chat_enable_status('启动中')
with Manager() as manager:
# 创建共享字典,包含所有需要的键
shared_config = manager.dict({
'is_speaking': False
})
queue = PromptQueue(10)
ws_open_event = Event()
# 创建进程时将共享字典传递给FastAPI服务
api_process = Process(
target=start_fastapi_server,
args=(shared_config,),
name="FastAPI Server"
)
fetch_process = Process(
target=fetch_user_chat_content,
args=(ws_open_event, queue)
)
reply_process = Process(
target=reply_user_chat_content,
args=(queue, shared_config)
)
queue = PromptQueue(10) # 启动所有进程
ws_open_event = Event() api_process.start()
fetch_process.start()
ws_open_event.wait()
reply_process.start()
fetch_process = Process(target=fetch_user_chat_content, args=(ws_open_event, queue)) # 等待所有进程完成
reply_process = Process(target=reply_user_chat_content, args=(queue,)) fetch_process.join()
fetch_process.start() reply_process.join()
ws_open_event.wait() api_process.join()
reply_process.start()
fetch_process.join()
reply_process.join()

Loading…
Cancel
Save