From 718f3957191aceff8f4fb93f3bc8539c970441e0 Mon Sep 17 00:00:00 2001 From: liu Date: Wed, 20 Dec 2023 14:32:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4websocket=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../supervision/config/WebSocketConfig.java | 11 +++++ .../task/WebsocketKeepaliveTask.java | 48 +++++++++++++++++++ .../usermanage/UserResourceCheck.java | 22 --------- 3 files changed, 59 insertions(+), 22 deletions(-) create mode 100644 virtual-patient-web/src/main/java/com/supervision/task/WebsocketKeepaliveTask.java diff --git a/virtual-patient-web/src/main/java/com/supervision/config/WebSocketConfig.java b/virtual-patient-web/src/main/java/com/supervision/config/WebSocketConfig.java index 317b7d45..e2755bed 100644 --- a/virtual-patient-web/src/main/java/com/supervision/config/WebSocketConfig.java +++ b/virtual-patient-web/src/main/java/com/supervision/config/WebSocketConfig.java @@ -2,6 +2,8 @@ package com.supervision.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration @@ -14,4 +16,13 @@ public class WebSocketConfig { public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } + + @Bean + public TaskScheduler taskScheduler(){ + ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + threadPoolTaskScheduler.setThreadNamePrefix("SockJS-"); + threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); + threadPoolTaskScheduler.setRemoveOnCancelPolicy(true); + return threadPoolTaskScheduler; + } } diff --git a/virtual-patient-web/src/main/java/com/supervision/task/WebsocketKeepaliveTask.java b/virtual-patient-web/src/main/java/com/supervision/task/WebsocketKeepaliveTask.java new file mode 100644 index 00000000..c240ddc3 --- /dev/null +++ b/virtual-patient-web/src/main/java/com/supervision/task/WebsocketKeepaliveTask.java @@ -0,0 +1,48 @@ +package com.supervision.task; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.json.JSONUtil; +import com.supervision.constant.UserTokenConstant; +import com.supervision.controller.WebSocketServer; +import com.supervision.usermanage.UserResourceCheck; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.websocket.Session; +import java.io.IOException; +import java.util.Map; + +@Component +@Slf4j +@RequiredArgsConstructor +public class WebsocketKeepaliveTask { + + private final RedisTemplate redisTemplate; + + @Scheduled(fixedDelay = 5 * 1000L) + public void keepalive() { + log.info("websocket保活接口开始,每5秒钟发送一次消息"); + for (Map.Entry entries : WebSocketServer.SESSION_POOL.entrySet()) { + String userId = entries.getKey(); + Session session = entries.getValue(); + if (ObjectUtil.isNotEmpty(session)) { + try { + + session.getBasicRemote().sendText(JSONUtil.toJsonStr(new UserResourceCheck.Keepalive())); + log.info("用户:{}的websocket保活成功", userId); + } catch (IOException e) { + log.error("用户:{}的websocket连接异常", userId, e); + // 连接异常的用户,移除 + WebSocketServer.SESSION_POOL.remove(userId); + // 移除redis中该用户的缓存 + redisTemplate.opsForHash().delete(UserTokenConstant.USER_WEBSOCKET_CACHE, userId); + } + } + } + log.info("websocket保活接口结束"); + } + +} diff --git a/virtual-patient-web/src/main/java/com/supervision/usermanage/UserResourceCheck.java b/virtual-patient-web/src/main/java/com/supervision/usermanage/UserResourceCheck.java index 9b0182fa..8584aa62 100644 --- a/virtual-patient-web/src/main/java/com/supervision/usermanage/UserResourceCheck.java +++ b/virtual-patient-web/src/main/java/com/supervision/usermanage/UserResourceCheck.java @@ -57,28 +57,6 @@ public class UserResourceCheck { /** * websocket保活接口,每5秒钟发送一次消息 */ - @Scheduled(fixedDelay = 5 * 1000L) - public void keepalive() { - log.info("websocket保活接口开始,每5秒钟发送一次消息"); - for (Map.Entry entries : WebSocketServer.SESSION_POOL.entrySet()) { - String userId = entries.getKey(); - Session session = entries.getValue(); - if (ObjectUtil.isNotEmpty(session)) { - try { - - session.getBasicRemote().sendText(JSONUtil.toJsonStr(new Keepalive())); - log.info("用户:{}的websocket保活成功", userId); - } catch (IOException e) { - log.error("用户:{}的websocket连接异常", userId, e); - // 连接异常的用户,移除 - WebSocketServer.SESSION_POOL.remove(userId); - // 移除redis中该用户的缓存 - redisTemplate.opsForHash().delete(UserTokenConstant.USER_WEBSOCKET_CACHE, userId); - } - } - } - log.info("websocket保活接口结束"); - } // 实现一个方法用于踢下线用户,走的是Redis的消息队列 public void kickUser(String userId, String ignoreSessionId) {