package com.supervision.controller; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.supervision.constant.UserTokenConstant; import com.supervision.usermanage.UserWebSocketDTO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @Component @Slf4j @ServerEndpoint("/webSocket/{uid}") public class WebSocketServer { @Autowired private RedisTemplate redisTemplate; //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。 private static final ConcurrentHashMap SESSION_POOL = new ConcurrentHashMap<>(); /** * 有客户端连接成功 */ @OnOpen public void onOpen(Session session, @PathParam(value = "uid") String uid) throws IOException { log.info("用户:{}登录,缓存到Redis", uid); // 链接之前先把之前的用户踢下线(ignoreSessionId防止把当前用户踢下线) redisTemplate.convertAndSend(UserTokenConstant.KICK_CHANNEL, JSONUtil.toJsonStr(new UserWebSocketDTO(uid, session.getId()))); SESSION_POOL.put(uid, session); redisTemplate.opsForHash().put(UserTokenConstant.USER_ID_CACHE, uid, session.getId()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session, @PathParam(value = "uid") String uid) { redisTemplate.opsForHash().delete(UserTokenConstant.USER_ID_CACHE, uid, session.getId()); SESSION_POOL.remove(uid); log.info("用户:{}关闭,从Redis中移除,当前连接数为:{}", uid, redisTemplate.opsForHash().size(UserTokenConstant.USER_ID_CACHE)); } /** * 发生错误 */ @OnError public void onError(Session session, @PathParam(value = "uid") String uid, Throwable throwable) { redisTemplate.opsForHash().delete(UserTokenConstant.USER_ID_CACHE, uid, session.getId()); SESSION_POOL.remove(uid); log.error("用户:{}发生错误,从Redis中移除,当前连接数为:{}", uid, redisTemplate.opsForHash().size(UserTokenConstant.USER_ID_CACHE), throwable); } // 实现一个方法用于踢下线用户,走的是Redis的消息队列 public void kickUser(String userId, String ignoreSessionId) throws IOException { log.info("尝试主动踢用户:{}下线", userId); Session session = SESSION_POOL.get(userId); // 只有不是忽略剔除的sessionId才可以踢下线 if (ObjectUtil.isNotEmpty(session) && !StrUtil.equals(ignoreSessionId, session.getId())) { session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "用户被踢下线")); SESSION_POOL.remove(userId); log.info("主动踢用户:{},sessionId:{} 下线成功", userId, session.getId()); return; } log.info("主动踢用户:{}下线,未找到用户,踢下线失败", userId); } }