diff --git a/virtual-patient-common/src/main/java/com/supervision/constant/UserTokenConstant.java b/virtual-patient-common/src/main/java/com/supervision/constant/UserTokenConstant.java index e3576264..29466321 100644 --- a/virtual-patient-common/src/main/java/com/supervision/constant/UserTokenConstant.java +++ b/virtual-patient-common/src/main/java/com/supervision/constant/UserTokenConstant.java @@ -4,5 +4,7 @@ public interface UserTokenConstant { String TOKEN_CACHE = "USER:LOGIN:TOKEN:"; + String USER_ID_CACHE = "USER:ID:CACHE"; + String KICK_CHANNEL = "USER:KICK:CHANNEL"; } diff --git a/virtual-patient-web/src/main/java/com/supervision/controller/WebSocketServer.java b/virtual-patient-web/src/main/java/com/supervision/controller/WebSocketServer.java index a6ae710a..c74d8337 100644 --- a/virtual-patient-web/src/main/java/com/supervision/controller/WebSocketServer.java +++ b/virtual-patient-web/src/main/java/com/supervision/controller/WebSocketServer.java @@ -1,7 +1,10 @@ package com.supervision.controller; import cn.hutool.core.util.ObjectUtil; +import com.supervision.constant.UserTokenConstant; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.websocket.*; @@ -15,10 +18,10 @@ import java.util.concurrent.atomic.AtomicInteger; @Component @Slf4j @ServerEndpoint("/webSocket/{uid}") +@RequiredArgsConstructor public class WebSocketServer { - //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 - private static final AtomicInteger onlineNum = new AtomicInteger(0); + private final RedisTemplate redisTemplate; //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。 private static final ConcurrentHashMap SESSION_POOL = new ConcurrentHashMap<>(); @@ -28,9 +31,9 @@ public class WebSocketServer { */ @OnOpen public void onOpen(Session session, @PathParam(value = "uid") String uid) { + log.info("用户:{}登录,缓存到Redis", uid); SESSION_POOL.put(uid, session); - onlineNum.incrementAndGet(); - log.info(uid + "加入webSocket!当前人数为" + onlineNum); + redisTemplate.opsForSet().add(UserTokenConstant.USER_ID_CACHE, uid); } /** @@ -38,48 +41,31 @@ public class WebSocketServer { */ @OnClose public void onClose(Session session, @PathParam(value = "uid") String uid) { + redisTemplate.opsForSet().remove(UserTokenConstant.USER_ID_CACHE, uid); SESSION_POOL.remove(uid); - int cnt = onlineNum.decrementAndGet(); - log.info("有连接关闭,当前连接数为:{}", cnt); - } - - /** - * 发送消息 - */ - public void sendMessage(Session session, String message) throws IOException { - if (session != null) { - synchronized (session) { - session.getBasicRemote().sendText(message); - } - } - } + log.info("用户:{}关闭,从Redis中移除,当前连接数为:{}", uid, redisTemplate.opsForSet().size(UserTokenConstant.USER_ID_CACHE)); - /** - * 群发消息 - */ - public void broadCastInfo(String message) throws IOException { - for (Session session : SESSION_POOL.values()) { - if (session.isOpen()) { - sendMessage(session, message); - } - } } /** * 发生错误 */ @OnError - public void onError(Session session, Throwable throwable) { - log.error("发生错误"); - throwable.printStackTrace(); + public void onError(Session session, @PathParam(value = "uid") String uid, Throwable throwable) { + redisTemplate.opsForSet().remove(UserTokenConstant.USER_ID_CACHE, uid); + SESSION_POOL.remove(uid); + log.error("用户:{}发生错误,从Redis中移除,当前连接数为:{}", uid, redisTemplate.opsForSet().size(UserTokenConstant.USER_ID_CACHE), throwable); } - // 实现一个方法用于踢下线用户 + // 实现一个方法用于踢下线用户,走的是Redis的消息队列 public void kickUser(String userId) throws IOException { - log.info("踢用户{}下线", userId); + log.info("尝试主动踢用户:{}下线", userId); Session session = SESSION_POOL.get(userId); if (ObjectUtil.isNotEmpty(session)) { session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "用户被踢下线")); + log.info("主动踢用户:{}下线成功", userId); + return; } + log.info("主动踢用户:{}下线,未找到用户,踢下线失败", userId); } }