diff --git a/src/main/java/com/supervision/service/impl/ChatServiceImpl.java b/src/main/java/com/supervision/service/impl/ChatServiceImpl.java index d2b159e..a9ebac6 100644 --- a/src/main/java/com/supervision/service/impl/ChatServiceImpl.java +++ b/src/main/java/com/supervision/service/impl/ChatServiceImpl.java @@ -37,6 +37,8 @@ import org.springframework.web.multipart.MultipartFile; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; import java.io.IOException; import java.time.Duration; @@ -45,6 +47,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -53,6 +56,7 @@ import java.util.stream.Collectors; @Service @RequiredArgsConstructor public class ChatServiceImpl implements IChatService { + private static final int WAIT_MS = 500; @Value("${dify.url}") private String difyUrl; @@ -93,7 +97,14 @@ public class ChatServiceImpl implements IChatService { timeInterval.start(); timeInterval.start("start"); log.info("request:时间:{}", DateUtil.now()); - Flux<StreamResponse> flux = webClient.post() + + + // 创建一个线程安全的 receivedAudio + AtomicBoolean receivedAudio = new AtomicBoolean(false); + // 用于确保定时器只启动一次 + AtomicBoolean timerStarted = new AtomicBoolean(false); + Sinks.Many<ServerSentEvent<Map<String, String>>> sink = Sinks.many().multicast().onBackpressureBuffer(); + webClient.post() .uri(difyUrl) .headers(httpHeaders -> { httpHeaders.setContentType(MediaType.APPLICATION_JSON); @@ -101,21 +112,47 @@ public class ChatServiceImpl implements IChatService { }) .bodyValue(JSON.toJSONString(difyChatReqVO)) .retrieve() - .bodyToFlux(StreamResponse.class); - - - Flux<StreamResponse> timeoutMono1 = Flux.interval(Duration.ofMillis(1000)).take(8).map(aLong -> { - StreamResponse timeoutResponse = new StreamResponse(); - timeoutResponse.setEvent("timeout"); - return timeoutResponse; - }); - Flux<StreamResponse> mergedFlux = flux.mergeWith(timeoutMono1); - return mergedFlux.mapNotNull(response -> { + .bodyToFlux(StreamResponse.class) + .publishOn(Schedulers.boundedElastic()).mapNotNull(response -> { log.info("response:时间:{},响应回答:{}", timeInterval.intervalMs(), JSONUtil.toJsonStr(response)); Map<String, String> map = new HashMap<>(); map.put("event", response.getEvent()); + // 第一次接收数据时启动定时器 + if (timerStarted.compareAndSet(false, true)) { + try { + Thread.sleep(WAIT_MS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + predefinedAnswerAssistant.skipFirst(); + String text = predefinedAnswerAssistant.getHelloAnswer(); + predefinedAnswerAssistant.resetInitTime(text); + String voiceBaseId = MD5.create().digestHex(text); + map.put("audioId", voiceBaseId); + map.put("event", "message"); + log.info("首次接收dify响应,推送预定义打招呼回答id:【{}】,文本:{}", text, voiceBaseId); + audioCache.put(voiceBaseId, predefinedAnswerAssistant.getHelloAnswerAudio(text)); + sink.tryEmitNext(ServerSentEvent.<Map<String, String>>builder() + .data(map) + .build()); + Mono.delay(Duration.ofMillis(predefinedAnswerAssistant.getNextActivateTime())) + .subscribe(t -> { + // WAIT_SECOND秒后检查 receivedAudio + if (!receivedAudio.get()) { + String text2 = predefinedAnswerAssistant.getPredefinedAnswer(); + String voiceBaseId2 = MD5.create().digestHex(text2); + map.put("audioId", voiceBaseId2); + map.put("event", "message"); + log.info("超过等待时间阈值,推送预定义回答id:【{}】,文本:{}", text2, voiceBaseId2); + audioCache.put(voiceBaseId2, predefinedAnswerAssistant.getPredefinedAnswerAudio(text2)); + // 插入依次推送 + sink.tryEmitNext(ServerSentEvent.<Map<String, String>>builder() + .data(map) + .build()); + } + }); + } if (response.getEvent().equals("message") && response.getAnswer() != null) { - log.info("response:时间:{}", DateUtil.now()); log.info("response:{}", response); //遍历answer中的每一个字符,判断是否为标点符号,如果是,说明是句子的结尾,将标点符号前的文本拼接到sentence中,并打印,然后清空sentence,如果标点符号后还有文本,将文本拼接到sentence中 for (char ch : response.getAnswer().toCharArray()) { @@ -130,6 +167,7 @@ public class ChatServiceImpl implements IChatService { map.put("audioId", voiceBaseId); sentence.setLength(0); predefinedAnswerAssistant.closePreviewAnswer(); + receivedAudio.set(true); return ServerSentEvent.builder(map).build(); } } @@ -157,31 +195,10 @@ public class ChatServiceImpl implements IChatService { this.appendDialogCache(response.getConversation_id(), builder.build()); return ServerSentEvent.builder(map).build(); } - if (predefinedAnswerAssistant.overThreshold()) { - if (StrUtil.isEmpty(difyChatReqVO.getConversation_id()) && !predefinedAnswerAssistant.isSkipFirst()) { - predefinedAnswerAssistant.skipFirst(); - String text = predefinedAnswerAssistant.getHelloAnswer(); - predefinedAnswerAssistant.resetInitTime(text); - log.info("超过阈值,返回预定义打招呼回答:{}", text); - String voiceBaseId = MD5.create().digestHex(text); - map.put("audioId", voiceBaseId); - log.info("response前置回答id:{}", voiceBaseId); - audioCache.put(voiceBaseId, predefinedAnswerAssistant.getHelloAnswerAudio(text)); - return ServerSentEvent.builder(map).build(); - } else { - String text = predefinedAnswerAssistant.getPredefinedAnswer(); - log.info("超过阈值,返回预定义回答:{}", text); - predefinedAnswerAssistant.resetInitTime(text); - String voiceBaseId = MD5.create().digestHex(text); - map.put("audioId", voiceBaseId); - log.info("response前置回答id:{}", voiceBaseId); - audioCache.put(voiceBaseId, predefinedAnswerAssistant.getPredefinedAnswerAudio(text)); - return ServerSentEvent.builder(map).build(); - } - - } return null; - }).filter(Objects::nonNull); + }).filter(Objects::nonNull)// 上游数据调用完成后自动结束流 + .doOnComplete(sink::tryEmitComplete).subscribe(sink::tryEmitNext); + return sink.asFlux(); } @Override diff --git a/src/main/java/com/supervision/util/PredefinedAnswerAssistant.java b/src/main/java/com/supervision/util/PredefinedAnswerAssistant.java index 758e3c7..137f8e6 100644 --- a/src/main/java/com/supervision/util/PredefinedAnswerAssistant.java +++ b/src/main/java/com/supervision/util/PredefinedAnswerAssistant.java @@ -2,6 +2,7 @@ package com.supervision.util; import cn.hutool.core.date.DateField; import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.RandomUtil; @@ -156,5 +157,8 @@ public class PredefinedAnswerAssistant { log.info("预定义答案加载完成...."); } + public long getNextActivateTime() { + return this.initTime.between(new Date(), DateUnit.MS); + } }