package com.supervision.service.danmaku; import cn.hutool.core.lang.Assert; import com.hankcs.hanlp.seg.common.Term; import com.hankcs.hanlp.tokenizer.StandardTokenizer; import com.supervision.domain.LivetalkingChatDTO; import com.supervision.domain.UserDetail; import com.supervision.dto.DanmakuMessage; import com.supervision.service.DigitalHumanDialogueLogService; import com.supervision.service.LivetalkingService; import com.supervision.util.UserUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; import java.util.concurrent.Flow; @Slf4j @Service @RequiredArgsConstructor public class DanmakuWebSocketHandler extends TextWebSocketHandler { private final WebSocketSessionManager sessionManager; private final DigitalHumanDialogueLogService dialogueLogService; private final LivetalkingService livetalkingService; @Override protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) { // 解析客户端消息 DanmakuMessage danmaku = DanmakuMessage.fromJson(textMessage.getPayload()); String roomId = getRoomIdFromSession(session); danmaku.setRoomId(roomId); // 验证消息 validateDanmaku(danmaku); // 保存到日志 String messageId = dialogueLogService.saveLog(danmaku); // 判断是否需要回复 if (isQuestion(danmaku.getContent())) { log.info("检测到问题: {}", danmaku.getContent()); LivetalkingChatDTO livetalkingChatDTO = new LivetalkingChatDTO(danmaku); livetalkingChatDTO.setMessageId(messageId); livetalkingService.chat(livetalkingChatDTO); } // 发布到消息中心 DanmakuPublisher.getInstance().publish(danmaku); } @Override public void afterConnectionEstablished(WebSocketSession session) { String roomId = getRoomIdFromSession(session); sessionManager.addSession(roomId, session); // 为该会话创建订阅者 Flow.Subscriber subscriber = createSubscriber(session); Flow.Subscription subscription = DanmakuPublisher.getInstance() .subscribe(roomId, subscriber); // 将会话与订阅关系保存,方便在连接关闭时取消订阅 session.getAttributes().put("subscription", subscription); try { UserDetail userDetail = UserUtil.currentUser(session); DanmakuMessage message = new DanmakuMessage(); message.setUserId(userDetail.getUserId()); message.setNickname(userDetail.getNickname()); message.setContent(userDetail.getNickname() + "来了"); DanmakuPublisher.getInstance().publish(message); }catch (Exception e){ log.error("获取用户信息失败: {}", e.getMessage()); } } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { String roomId = getRoomIdFromSession(session); sessionManager.removeSession(roomId, session); // 取消订阅 Flow.Subscription subscription = (Flow.Subscription) session.getAttributes().get("subscription"); if (subscription != null) { subscription.cancel(); } } private Flow.Subscriber createSubscriber(WebSocketSession session) { return new Flow.Subscriber<>() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(Long.MAX_VALUE); // 不限制请求数量 } @Override public void onNext(DanmakuMessage danmaku) { try { if (session.isOpen()) { session.sendMessage(new TextMessage(danmaku.toJson())); } } catch (IOException e) { subscription.cancel(); } } @Override public void onError(Throwable throwable) { // 处理错误 log.info("WebSocket错误: {}", throwable.getMessage()); } @Override public void onComplete() { // 发布者关闭 log.info("WebSocket连接已关闭: {}", session.getId()); } }; } private String getRoomIdFromSession(WebSocketSession session) { // 从URI获取roomId,如 ws://localhost:8080/ws/danmaku?roomId=123 String query = session.getUri().getQuery(); return query.split("=")[1]; } private void validateDanmaku(DanmakuMessage danmaku) { // 验证逻辑... Assert.notEmpty(danmaku.getRoomId(), "房间ID不能为空"); Assert.notEmpty(danmaku.getContent(), "弹幕内容不能为空"); Assert.notEmpty(danmaku.getUserId(), "用户ID不能为空"); } private boolean isQuestion(String sentence) { String[] questionWords = {"什么", "为什么", "如何", "哪", "谁", "多少", "是否", "能否", "是不是"}; String[] questionEndings = {"吗", "呢", "?", "?"}; // 结尾判断 for (String end : questionEndings) { if (sentence.trim().endsWith(end)) { return true; } } // 分词判断 for (Term term : StandardTokenizer.segment(sentence)) { for (String qw : questionWords) { if (term.word.equals(qw)) { return true; } } } return false; } }