动态判断等待时间结束时TTS是否有返回audio,如果没有,插入一次再次推送预定义audio

main
daixiaoyi 2 months ago
parent 05781451d1
commit 5271a4507e

@ -37,6 +37,8 @@ import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
import java.time.Duration;
@ -45,6 +47,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -53,6 +56,7 @@ import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class ChatServiceImpl implements IChatService {
private static final int WAIT_MS = 500;
@Value("${dify.url}")
private String difyUrl;
@ -93,7 +97,14 @@ public class ChatServiceImpl implements IChatService {
timeInterval.start();
timeInterval.start("start");
log.info("request:时间:{}", DateUtil.now());
Flux<StreamResponse> flux = webClient.post()
// 创建一个线程安全的 receivedAudio
AtomicBoolean receivedAudio = new AtomicBoolean(false);
// 用于确保定时器只启动一次
AtomicBoolean timerStarted = new AtomicBoolean(false);
Sinks.Many<ServerSentEvent<Map<String, String>>> sink = Sinks.many().multicast().onBackpressureBuffer();
webClient.post()
.uri(difyUrl)
.headers(httpHeaders -> {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
@ -101,21 +112,47 @@ public class ChatServiceImpl implements IChatService {
})
.bodyValue(JSON.toJSONString(difyChatReqVO))
.retrieve()
.bodyToFlux(StreamResponse.class);
Flux<StreamResponse> timeoutMono1 = Flux.interval(Duration.ofMillis(1000)).take(8).map(aLong -> {
StreamResponse timeoutResponse = new StreamResponse();
timeoutResponse.setEvent("timeout");
return timeoutResponse;
});
Flux<StreamResponse> mergedFlux = flux.mergeWith(timeoutMono1);
return mergedFlux.mapNotNull(response -> {
.bodyToFlux(StreamResponse.class)
.publishOn(Schedulers.boundedElastic()).mapNotNull(response -> {
log.info("response:时间:{},响应回答:{}", timeInterval.intervalMs(), JSONUtil.toJsonStr(response));
Map<String, String> map = new HashMap<>();
map.put("event", response.getEvent());
// 第一次接收数据时启动定时器
if (timerStarted.compareAndSet(false, true)) {
try {
Thread.sleep(WAIT_MS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
predefinedAnswerAssistant.skipFirst();
String text = predefinedAnswerAssistant.getHelloAnswer();
predefinedAnswerAssistant.resetInitTime(text);
String voiceBaseId = MD5.create().digestHex(text);
map.put("audioId", voiceBaseId);
map.put("event", "message");
log.info("首次接收dify响应推送预定义打招呼回答id【{}】,文本:{}", text, voiceBaseId);
audioCache.put(voiceBaseId, predefinedAnswerAssistant.getHelloAnswerAudio(text));
sink.tryEmitNext(ServerSentEvent.<Map<String, String>>builder()
.data(map)
.build());
Mono.delay(Duration.ofMillis(predefinedAnswerAssistant.getNextActivateTime()))
.subscribe(t -> {
// WAIT_SECOND秒后检查 receivedAudio
if (!receivedAudio.get()) {
String text2 = predefinedAnswerAssistant.getPredefinedAnswer();
String voiceBaseId2 = MD5.create().digestHex(text2);
map.put("audioId", voiceBaseId2);
map.put("event", "message");
log.info("超过等待时间阈值推送预定义回答id【{}】,文本:{}", text2, voiceBaseId2);
audioCache.put(voiceBaseId2, predefinedAnswerAssistant.getPredefinedAnswerAudio(text2));
// 插入依次推送
sink.tryEmitNext(ServerSentEvent.<Map<String, String>>builder()
.data(map)
.build());
}
});
}
if (response.getEvent().equals("message") && response.getAnswer() != null) {
log.info("response:时间:{}", DateUtil.now());
log.info("response:{}", response);
//遍历answer中的每一个字符判断是否为标点符号如果是说明是句子的结尾将标点符号前的文本拼接到sentence中并打印然后清空sentence如果标点符号后还有文本将文本拼接到sentence中
for (char ch : response.getAnswer().toCharArray()) {
@ -130,6 +167,7 @@ public class ChatServiceImpl implements IChatService {
map.put("audioId", voiceBaseId);
sentence.setLength(0);
predefinedAnswerAssistant.closePreviewAnswer();
receivedAudio.set(true);
return ServerSentEvent.builder(map).build();
}
}
@ -157,31 +195,10 @@ public class ChatServiceImpl implements IChatService {
this.appendDialogCache(response.getConversation_id(), builder.build());
return ServerSentEvent.builder(map).build();
}
if (predefinedAnswerAssistant.overThreshold()) {
if (StrUtil.isEmpty(difyChatReqVO.getConversation_id()) && !predefinedAnswerAssistant.isSkipFirst()) {
predefinedAnswerAssistant.skipFirst();
String text = predefinedAnswerAssistant.getHelloAnswer();
predefinedAnswerAssistant.resetInitTime(text);
log.info("超过阈值,返回预定义打招呼回答:{}", text);
String voiceBaseId = MD5.create().digestHex(text);
map.put("audioId", voiceBaseId);
log.info("response前置回答id{}", voiceBaseId);
audioCache.put(voiceBaseId, predefinedAnswerAssistant.getHelloAnswerAudio(text));
return ServerSentEvent.builder(map).build();
} else {
String text = predefinedAnswerAssistant.getPredefinedAnswer();
log.info("超过阈值,返回预定义回答:{}", text);
predefinedAnswerAssistant.resetInitTime(text);
String voiceBaseId = MD5.create().digestHex(text);
map.put("audioId", voiceBaseId);
log.info("response前置回答id{}", voiceBaseId);
audioCache.put(voiceBaseId, predefinedAnswerAssistant.getPredefinedAnswerAudio(text));
return ServerSentEvent.builder(map).build();
}
}
return null;
}).filter(Objects::nonNull);
}).filter(Objects::nonNull)// 上游数据调用完成后自动结束流
.doOnComplete(sink::tryEmitComplete).subscribe(sink::tryEmitNext);
return sink.asFlux();
}
@Override

@ -2,6 +2,7 @@ package com.supervision.util;
import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.RandomUtil;
@ -156,5 +157,8 @@ public class PredefinedAnswerAssistant {
log.info("预定义答案加载完成....");
}
public long getNextActivateTime() {
return this.initTime.between(new Date(), DateUnit.MS);
}
}

Loading…
Cancel
Save