webflux streaming

main
daixiaoyi 2 months ago
parent 8731898e6f
commit 20930e7122

@ -64,6 +64,11 @@
<artifactId>pinyin4j</artifactId> <artifactId>pinyin4j</artifactId>
<version>2.5.0</version> <version>2.5.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

@ -2,6 +2,9 @@ package com.supervision;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
@SpringBootApplication @SpringBootApplication
public class SpeechDemoServiceApplication { public class SpeechDemoServiceApplication {
@ -9,5 +12,14 @@ public class SpeechDemoServiceApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(SpeechDemoServiceApplication.class, args); SpringApplication.run(SpeechDemoServiceApplication.class, args);
} }
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public WebClient webClient() {
return WebClient.create();
}
} }

@ -1,18 +1,28 @@
package com.supervision.contoller; package com.supervision.contoller;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.StrUtil;
import com.supervision.dto.R; import com.supervision.dto.R;
import com.supervision.dto.robot.RobotTalkDTO; import com.supervision.dto.robot.RobotTalkDTO;
import com.supervision.model.RobotTalkReq; import com.supervision.model.RobotTalkReq;
import com.supervision.model.dify.StreamResponse;
import com.supervision.service.DifyService;
import com.supervision.service.IChatService; import com.supervision.service.IChatService;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import reactor.core.publisher.Flux;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
@RestController @RestController
@RequestMapping("/chat") @RequestMapping("/chat")
@ -20,6 +30,7 @@ import java.util.List;
public class ChatController { public class ChatController {
private final IChatService chatService; private final IChatService chatService;
private final DifyService difyService;
@PostMapping("/talk") @PostMapping("/talk")
public R<RobotTalkDTO> talk(@RequestParam("file") MultipartFile multipartFile, @ModelAttribute RobotTalkReq robotTalkReq) { public R<RobotTalkDTO> talk(@RequestParam("file") MultipartFile multipartFile, @ModelAttribute RobotTalkReq robotTalkReq) {
@ -27,6 +38,11 @@ public class ChatController {
return R.ok(talk); return R.ok(talk);
} }
@PostMapping("/asr")
public R<String> asr(@RequestParam("file") MultipartFile file) throws IOException {
return R.ok(chatService.asr(file));
}
@GetMapping("/talkList") @GetMapping("/talkList")
public List talkList(String sessionId) { public List talkList(String sessionId) {
return new ArrayList(); return new ArrayList();
@ -37,4 +53,16 @@ public class ChatController {
@RequestParam("audioId")String audioId) throws IOException { @RequestParam("audioId")String audioId) throws IOException {
chatService.getAudio(response,audioId); chatService.getAudio(response,audioId);
} }
@GetMapping(value="/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Map<String, String>>> test2(@RequestParam("query") String query) {
return chatService.streamingMessage(query);
}
@GetMapping(value = "/webflux",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chatWebFlux() {
String string = ResourceUtil.readUtf8Str("classpath:static/test.txt");
String[] stringArray = StrUtil.split(string, 1);
return Flux.just(stringArray).delayElements(Duration.ofMillis(50));
}
} }

@ -2,12 +2,12 @@ package com.supervision.model.dify;
import lombok.Data; import lombok.Data;
import static com.supervision.common.constant.DifyConstants.CHAT_RESPONSE_MODE_BLOCKING; import static com.supervision.common.constant.DifyConstants.CHAT_RESPONSE_MODE_STREAMING;
@Data @Data
public class DifyChatReqVO { public class DifyChatReqVO {
private String user; private String user;
private String response_mode = CHAT_RESPONSE_MODE_BLOCKING; private String response_mode = CHAT_RESPONSE_MODE_STREAMING;
private DIFYChatReqInputVO inputs = new DIFYChatReqInputVO(); private DIFYChatReqInputVO inputs = new DIFYChatReqInputVO();
private String query; private String query;
private String conversation_id; private String conversation_id;

@ -0,0 +1,10 @@
package com.supervision.model.dify;
import lombok.Data;
import java.io.Serializable;
@Data
public class OutputsData implements Serializable {
private String answer;
}

@ -0,0 +1,44 @@
package com.supervision.model.dify;
import lombok.Data;
import java.io.Serializable;
@Data
public class StreamResponse implements Serializable {
/**
* .
*/
private String event;
/**
* agent_thought id.
*/
private String id;
/**
* ID.
*/
private String taskId;
/**
* ID.
*/
private String messageId;
/**
* LLM .
*/
private String answer;
/**
* .
*/
private Long createdAt;
/**
* ID.
*/
private String conversationId;
}

@ -0,0 +1,15 @@
package com.supervision.model.dify;
import lombok.Data;
import java.io.Serializable;
@Data
public class StreamResponseData implements Serializable {
private String id;
private String workflow_id;
private String status;
private Long created_at;
private Long finished_at;
private OutputsData outputs;
}

@ -0,0 +1,88 @@
package com.supervision.service;
import com.alibaba.fastjson.JSON;
import com.supervision.dto.paddlespeech.res.TtsResultDTO;
import com.supervision.dto.robot.AnswerInfo;
import com.supervision.model.dify.DIFYChatReqInputVO;
import com.supervision.model.dify.DifyChatReqVO;
import com.supervision.model.dify.StreamResponse;
import com.supervision.util.TtsUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class DifyService {
@Value("${dify.url}")
private String difyUrl;
@Value("${dify.app-auth}")
private String difyAppAuth;
private final WebClient webClient;
Map<String, String> voiceCache = new HashMap<>();
/**
* dify.
*
* @param query
* @return Flux
*/
public Flux<ServerSentEvent<Map<String, String>>> streamingMessage(String query) {
DifyChatReqVO difyChatReqVO = new DifyChatReqVO();
difyChatReqVO.setUser("admin");
DIFYChatReqInputVO inputs = new DIFYChatReqInputVO();
difyChatReqVO.setQuery(query);
// difyChatReqVO.setQuery("尽可能详细的介绍一下勐赫小镇的医疗服务");
difyChatReqVO.setInputs(inputs);
StringBuilder sentence = new StringBuilder();
return webClient.post()
.uri(difyUrl)
.headers(httpHeaders -> {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
httpHeaders.setBearerAuth(difyAppAuth);
})
.bodyValue(JSON.toJSONString(difyChatReqVO))
.retrieve()
.bodyToFlux(StreamResponse.class)
.map(response -> {
Map<String, String> map = new HashMap<>();
map.put("event", response.getEvent());
if (response.getEvent().equals("message") && response.getAnswer() != null) {
TtsResultDTO ttsResultDTO = TtsUtil.ttsTransform(response.getAnswer());
String voiceBaseId = UUID.randomUUID().toString();
voiceCache.put(voiceBaseId, ttsResultDTO.getAudio());
map.put("audioId", voiceBaseId);
}
return ServerSentEvent.builder(map).build();
// if (response.getEvent().equals("message") && response.getAnswer() != null) {
// //遍历answer中的每一个字符判断是否为标点符号如果是说明是句子的结尾将标点符号前的文本拼接到sentence中并打印然后清空sentence如果标点符号后还有文本将文本拼接到sentence中
// for (char ch : response.getAnswer().toCharArray()) {
// sentence.append(ch);
// if (ch == '。' || ch == '' || ch == '' || ch == '' || ch == '、' || ch == '' || ch == '' || ch == '“' || ch == '”') { // Check for punctuation marks
// log.info(sentence.toString());
// sentence.setLength(0); // Clear the sentence
// }
// }
// }
// if (response.getEvent().equals("message_end") && !sentence.isEmpty()) {
// log.info(sentence.toString());
// }
// Map<String, String> map;
// map = Map.of("event", response.getEvent(), "audioId", sentence.toString());
// return ServerSentEvent.builder(map).build();
});
}
}

@ -3,11 +3,18 @@ package com.supervision.service;
import com.supervision.dto.robot.RobotTalkDTO; import com.supervision.dto.robot.RobotTalkDTO;
import com.supervision.model.RobotTalkReq; import com.supervision.model.RobotTalkReq;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import reactor.core.publisher.Flux;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public interface IChatService { public interface IChatService {
Flux<ServerSentEvent<Map<String, String>>> streamingMessage(String query);
String asr(MultipartFile file) throws IOException;
RobotTalkDTO talk(MultipartFile file, RobotTalkReq robotTalkReq); RobotTalkDTO talk(MultipartFile file, RobotTalkReq robotTalkReq);
void getAudio(HttpServletResponse response, String audioId) throws IOException; void getAudio(HttpServletResponse response, String audioId) throws IOException;

@ -1,6 +1,7 @@
package com.supervision.service.impl; package com.supervision.service.impl;
import cn.hutool.core.codec.Base64; import cn.hutool.core.codec.Base64;
import com.alibaba.fastjson.JSON;
import com.supervision.dto.dify.ChatResDTO; import com.supervision.dto.dify.ChatResDTO;
import com.supervision.dto.paddlespeech.res.TtsResultDTO; import com.supervision.dto.paddlespeech.res.TtsResultDTO;
import com.supervision.dto.robot.AnswerInfo; import com.supervision.dto.robot.AnswerInfo;
@ -9,21 +10,27 @@ import com.supervision.dto.robot.RobotTalkDTO;
import com.supervision.model.RobotTalkReq; import com.supervision.model.RobotTalkReq;
import com.supervision.model.dify.DIFYChatReqInputVO; import com.supervision.model.dify.DIFYChatReqInputVO;
import com.supervision.model.dify.DifyChatReqVO; import com.supervision.model.dify.DifyChatReqVO;
import com.supervision.model.dify.StreamResponse;
import com.supervision.service.IChatService; import com.supervision.service.IChatService;
import com.supervision.util.AsrUtil; import com.supervision.util.AsrUtil;
import com.supervision.util.DifyApiUtil; import com.supervision.util.DifyApiUtil;
import com.supervision.util.TtsUtil; import com.supervision.util.TtsUtil;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sourceforge.pinyin4j.PinyinHelper; import net.sourceforge.pinyin4j.PinyinHelper;
import net.sourceforge.pinyin4j.format.HanyuPinyinCaseType; import net.sourceforge.pinyin4j.format.HanyuPinyinCaseType;
import net.sourceforge.pinyin4j.format.HanyuPinyinOutputFormat; import net.sourceforge.pinyin4j.format.HanyuPinyinOutputFormat;
import net.sourceforge.pinyin4j.format.HanyuPinyinToneType; import net.sourceforge.pinyin4j.format.HanyuPinyinToneType;
import net.sourceforge.pinyin4j.format.exception.BadHanyuPinyinOutputFormatCombination; import net.sourceforge.pinyin4j.format.exception.BadHanyuPinyinOutputFormatCombination;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch; import org.springframework.util.StopWatch;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -34,14 +41,70 @@ import java.util.regex.Pattern;
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor
public class ChatServiceImpl implements IChatService { public class ChatServiceImpl implements IChatService {
@Resource @Value("${dify.url}")
private DifyApiUtil difyApiUtil; private String difyUrl;
@Value("${dify.app-auth}")
private String difyAppAuth;
private final WebClient webClient;
private final DifyApiUtil difyApiUtil;
Map<String, String> voiceCache = new HashMap<>(); Map<String, String> voiceCache = new HashMap<>();
@Override
public Flux<ServerSentEvent<Map<String, String>>> streamingMessage(String query) {
DifyChatReqVO difyChatReqVO = new DifyChatReqVO();
difyChatReqVO.setUser("admin");
DIFYChatReqInputVO inputs = new DIFYChatReqInputVO();
difyChatReqVO.setQuery(query);
// difyChatReqVO.setQuery("尽可能详细的介绍一下勐赫小镇的医疗服务");
difyChatReqVO.setInputs(inputs);
StringBuilder sentence = new StringBuilder();
log.info("query:{}", query);
return webClient.post()
.uri(difyUrl)
.headers(httpHeaders -> {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
httpHeaders.setBearerAuth(difyAppAuth);
})
.bodyValue(JSON.toJSONString(difyChatReqVO))
.retrieve()
.bodyToFlux(StreamResponse.class)
.map(response -> {
Map<String, String> map = new HashMap<>();
map.put("event", response.getEvent());
if (response.getEvent().equals("message") && response.getAnswer() != null) {
String voiceBaseId = UUID.randomUUID().toString();
//遍历answer中的每一个字符判断是否为标点符号如果是说明是句子的结尾将标点符号前的文本拼接到sentence中并打印然后清空sentence如果标点符号后还有文本将文本拼接到sentence中
for (char ch : response.getAnswer().toCharArray()) {
sentence.append(ch);
if (ch == '。' || ch == '' || ch == '' || ch == '' || ch == '、' || ch == '' || ch == '' || ch == '“' || ch == '”') { // Check for punctuation marks
log.info(sentence.toString());
TtsResultDTO ttsResultDTO = TtsUtil.ttsTransform(sentence.toString());
voiceCache.put(voiceBaseId, ttsResultDTO.getAudio());
map.put("audioId", voiceBaseId);
sentence.setLength(0); // Clear the sentence
}
}
if (response.getEvent().equals("message_end") && !sentence.isEmpty()) {
log.info(sentence.toString());
TtsResultDTO ttsResultDTO = TtsUtil.ttsTransform(sentence.toString());
voiceCache.put(voiceBaseId, ttsResultDTO.getAudio());
map.put("audioId", voiceBaseId);
}
return ServerSentEvent.builder(map).build();
}
return ServerSentEvent.builder(map).build();
});
}
@Override
public String asr(MultipartFile file) throws IOException {
return replaceTown(AsrUtil.asrTransformByBytes(file.getBytes()));
}
@Override @Override
public RobotTalkDTO talk(MultipartFile file, RobotTalkReq robotTalkReq) { public RobotTalkDTO talk(MultipartFile file, RobotTalkReq robotTalkReq) {
log.info("robotTalkReq:{}", robotTalkReq); log.info("robotTalkReq:{}", robotTalkReq);

@ -1,8 +1,14 @@
package com.supervision.util; package com.supervision.util;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.supervision.dto.dify.ChatResDTO; import com.supervision.dto.dify.ChatResDTO;
import com.supervision.model.dify.DIFYChatReqInputVO;
import com.supervision.model.dify.DifyChatReqVO; import com.supervision.model.dify.DifyChatReqVO;
import com.supervision.model.dify.StreamResponse;
import io.micrometer.common.util.StringUtils;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.ClientProtocolException; import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.classic.methods.HttpPost;
@ -14,18 +20,54 @@ import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.http.io.entity.StringEntity;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@Component @Component
@Slf4j @Slf4j
@Service
public class DifyApiUtil { public class DifyApiUtil {
@Value("${dify.url}") @Value("${dify.url}")
private String difyUrl; private String difyUrl;
@Value("${dify.app-auth}") @Value("${dify.app-auth}")
private String difyAppAuth; private String difyAppAuth;
@Resource
private WebClient webClient;
/**
* dify.
*
* @param query
* @return Flux
*/
public Flux<StreamResponse> streamingMessage(String query) {
DifyChatReqVO difyChatReqVO = new DifyChatReqVO();
difyChatReqVO.setUser("admin");
DIFYChatReqInputVO inputs = new DIFYChatReqInputVO();
difyChatReqVO.setQuery("你好");
difyChatReqVO.setInputs(inputs);
return webClient.post()
.uri(difyUrl)
.headers(httpHeaders -> {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
httpHeaders.setBearerAuth(difyAppAuth);
})
.bodyValue(JSON.toJSONString(difyChatReqVO))
.retrieve()
.bodyToFlux(StreamResponse.class);
}
private boolean shouldInclude(StreamResponse streamResponse) {
// 示例只要message节点的数据和message_end节点的数据
return streamResponse.getEvent().equals("message")
|| streamResponse.getEvent().equals("message_end");
}
public ChatResDTO chat(DifyChatReqVO difyChatReqVO) { public ChatResDTO chat(DifyChatReqVO difyChatReqVO) {
ChatResDTO execute; ChatResDTO execute;

@ -7,7 +7,7 @@ server:
context-path: /speech-demo-service context-path: /speech-demo-service
dify: dify:
url: http://192.168.10.138/v1/chat-messages url: http://192.168.10.138/v1/chat-messages
app-auth: Bearer app-79ABpRQWayX0bK9C2m7vecXe app-auth: app-79ABpRQWayX0bK9C2m7vecXe
paddle-speech: paddle-speech:
tts: http://192.168.10.96:8090/paddlespeech/tts tts: http://192.168.10.96:8090/paddlespeech/tts
asr: http://192.168.10.96:8090/paddlespeech/asr asr: http://192.168.10.96:8090/paddlespeech/asr

@ -0,0 +1,3 @@
SpringBoot+WebFlux通过流式响应实现类似ChatGPT的打字机效果
突然间想用Java实现一下像ChatGPT一样的打字机输出效果但是网上搜了相关教程感觉都不够满意。
这里贴一下自己的实现,为中文互联网做一点小小的贡献

@ -23,7 +23,7 @@ class SpeechDemoServiceApplicationTests {
@Test @Test
void testTtsTransform() { void testTtsTransform() {
TtsResultDTO ttsResultDTO = TtsUtil.ttsTransform("你好,我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学我是小爱同学"); TtsResultDTO ttsResultDTO = TtsUtil.ttsTransform("欢迎来电,我是您的康养顾问小苏。关于您的问题,我们已经为您查询到了相关信息。");
System.out.println(JSONUtil.toJsonStr(ttsResultDTO)); System.out.println(JSONUtil.toJsonStr(ttsResultDTO));
// https://www.toolfk.com/zh-cn/tools/base64-to-audio.html base64转音频 // https://www.toolfk.com/zh-cn/tools/base64-to-audio.html base64转音频

Loading…
Cancel
Save