移除websocket

release_2.0.0
liu 1 year ago
parent 400b39e34f
commit 82c887ee8c

@ -25,12 +25,6 @@ public class DiagnoseHallController {
private final DiagnoseHallService diagnoseHallService;
@ApiOperation("获取问诊资源")
@GetMapping("achieveDiagnoseResource")
public boolean achieveDiagnoseResource(){
return diagnoseHallService.achieveDiagnoseResource();
}
@ApiOperation("分页查询问诊流程列表")
@GetMapping("queryDiagnoseProcessPageList")
public IPage<DiagnoseProcessResVo> queryDiagnoseProcessPageList(@ApiParam("起始页") @RequestParam(defaultValue = "1") Integer pageNum,

@ -1,12 +1,8 @@
package com.supervision.controller;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObjectIter;
import cn.hutool.json.JSONUtil;
import com.supervision.constant.UserTokenConstant;
import com.supervision.exception.BusinessException;
import com.supervision.model.User;
import com.supervision.pojo.vo.LoginReqVO;
@ -15,24 +11,15 @@ import com.supervision.pojo.vo.UserInfoReqVo;
import com.supervision.pojo.vo.UserInfoResVo;
import com.supervision.service.UserManageService;
import com.supervision.service.UserService;
import com.supervision.websocket.dto.UserWebSocketDTO;
import com.supervision.websocket.UserResourceCheck;
import com.supervision.util.TokenUtil;
import com.supervision.util.UserUtil;
import com.supervision.websocket.dto.WebSocketLoginDTO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.apache.xmlbeans.impl.common.IOUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
@Api(tags = "用户管理")
@ -47,18 +34,6 @@ public class UserController {
private final UserManageService userManageService;
private final UserResourceCheck userResourceCheck;
@Value("${ws.nginx-ip:}")
private String wsIp;
@Value("${ws.nginx-port:}")
private String wsPort;
@Value("${spring.profiles.active}")
private String active;
@Value("${server.port}")
private String port;
@ApiOperation("登录")
@PostMapping("login")
@ -80,41 +55,7 @@ public class UserController {
return loginResVO;
}
@ApiOperation("踢用户下线")
@GetMapping("kickUser")
public void kickUser(String uuid, String userId) throws IOException {
if (StrUtil.isNotBlank(uuid)) {
redisTemplate.convertAndSend(UserTokenConstant.KICK_CHANNEL, JSONUtil.toJsonStr(new UserWebSocketDTO(uuid)));
} else {
ScanOptions options = ScanOptions.scanOptions()
.match("*" + userId + "*") // 可选:使用模式匹配指定要匹配的键
.count(10) // 可选:指定每次迭代返回的元素数量
.build();
Cursor<Map.Entry<Object, Object>> cursor = null;
try {
cursor = redisTemplate.opsForHash().scan(UserTokenConstant.USER_WEBSOCKET_CACHE, options);
while (cursor.hasNext()) {
Map.Entry<Object, Object> entry = cursor.next();
Object value = entry.getValue();
WebSocketLoginDTO bean = JSONUtil.toBean(String.valueOf(value), WebSocketLoginDTO.class);
if (userId.equals(bean.getUserId())) {
redisTemplate.convertAndSend(UserTokenConstant.KICK_CHANNEL, JSONUtil.toJsonStr(new UserWebSocketDTO(bean.getUserId())));
}
}
} catch (Exception e) {
if (ObjectUtil.isNotNull(cursor)) {
cursor.close();
}
}
}
}
@ApiOperation("查看资源是否有剩余")
@GetMapping("resourceIsFree")
public boolean resourceIsFree() {
return userResourceCheck.achieveDiagnoseResource();
}
@ApiOperation("修改用户信息")
@PutMapping("updateUserInfo")
@ -130,21 +71,9 @@ public class UserController {
return userManageService.getUserAccountInfo(user.getId());
}
@ApiOperation("获取本机IP地址,用来给websocket使用")
@GetMapping("queryWebSocketUrl")
public String queryWebSocketUrl() {
String template = "wss://{}:{}/virtual-patient-websocket/";
if (StrUtil.isNotBlank(wsIp) && StrUtil.isNotBlank(wsPort)) {
return StrUtil.format(template, wsIp, wsPort);
}
throw new BusinessException("未获取到ws的nginx地址,请确认配置文件是否配置");
}
@ApiOperation("获取当前在线的用户")
@GetMapping("queryCurrentOnlineUser")
public Map<Object, Object> queryCurrentOnlineUser() {
return redisTemplate.opsForHash().entries(UserTokenConstant.USER_WEBSOCKET_CACHE);
}
}

@ -1,69 +0,0 @@
package com.supervision.controller;
import com.supervision.constant.UserTokenConstant;
import com.supervision.websocket.UserResourceCheck;
import com.supervision.websocket.WebSocketSessionPool;
import com.supervision.websocket.dto.WebSocketLoginDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@Component
@Slf4j
@ServerEndpoint("/webSocket/{uuid}/{userId}")
public class WebSocketServer {
private static RedisTemplate<String, String> redisTemplate;
private static UserResourceCheck userResourceCheck;
// 因为是ServerEndpoint是多例,所以需要这样注入
@Autowired
public void setUserResourceCheck(UserResourceCheck userResourceCheck) {
WebSocketServer.userResourceCheck = userResourceCheck;
}
// 因为是ServerEndpoint是多例,所以需要这样注入
@Autowired
public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
WebSocketServer.redisTemplate = redisTemplate;
}
/**
*
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "uuid") String uuid, @PathParam(value = "userId") String userId) {
// 校验资源数量,并保存
userResourceCheck.achieveDiagnoseResourceAndOpenConnection(uuid, userId, session);
WebSocketSessionPool.SESSION_POOL.put(uuid, session);
log.info("用户:{},UUID:{}登录成功", userId, uuid);
}
/**
*
*/
@OnClose
public void onClose(Session session, @PathParam(value = "uuid") String uuid, @PathParam(value = "userId") String userId) {
redisTemplate.opsForHash().delete(UserTokenConstant.USER_WEBSOCKET_CACHE, uuid);
WebSocketSessionPool.SESSION_POOL.remove(uuid);
log.info("用户:{},uuid:{}关闭从Redis中移除,当前连接数为:{}", userId, uuid, redisTemplate.opsForHash().size(UserTokenConstant.USER_WEBSOCKET_CACHE));
}
/**
*
*/
@OnError
public void onError(Session session, @PathParam(value = "uuid") String uuid, @PathParam(value = "userId") String userId, Throwable throwable) {
redisTemplate.opsForHash().delete(UserTokenConstant.USER_WEBSOCKET_CACHE, uuid);
WebSocketSessionPool.SESSION_POOL.remove(uuid);
log.error("用户:{},uuid:{}发生错误从Redis中移除,当前连接数为:{}", userId, uuid, redisTemplate.opsForHash().size(UserTokenConstant.USER_WEBSOCKET_CACHE), throwable);
}
}

@ -9,8 +9,6 @@ import com.supervision.vo.result.DiagnoseProcessResVo;
public interface DiagnoseHallService {
boolean achieveDiagnoseResource();
IPage<DiagnoseProcessResVo> queryDiagnoseProcessPageList(Integer pageNum, Integer pageSize,
DiagnoseProcessReqVo diagnoseProcessReqVo);

@ -3,7 +3,6 @@ package com.supervision.service.impl;
import cn.hutool.core.lang.Assert;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.lock.annotation.Lock4j;
import com.supervision.model.DiagnosisPrimary;
import com.supervision.service.DiagnoseHallService;
import com.supervision.service.DiagnosisPrimaryService;
@ -13,7 +12,6 @@ import com.supervision.vo.ask.DiagnosisPrimaryVO;
import com.supervision.vo.manage.MedicalRecPageResVO;
import com.supervision.vo.result.DiagnoseProcessReqVo;
import com.supervision.vo.result.DiagnoseProcessResVo;
import com.supervision.websocket.UserResourceCheck;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -29,7 +27,6 @@ import java.util.stream.Collectors;
@Slf4j
public class DiagnoseHallServiceImpl implements DiagnoseHallService {
private final UserResourceCheck userResourceCheck;
private final ProcessService processService;
@ -37,12 +34,6 @@ public class DiagnoseHallServiceImpl implements DiagnoseHallService {
private final DiagnosisPrimaryService diagnosisPrimaryService;
@Lock4j(name = "achieveDiagnoseResource")
@Override
public boolean achieveDiagnoseResource() {
// 如果小于数字人最大连接数,则可以连接
return userResourceCheck.achieveDiagnoseResource();
}
@Override
public IPage<DiagnoseProcessResVo> queryDiagnoseProcessPageList(Integer pageNum, Integer pageSize,

@ -1,27 +0,0 @@
package com.supervision.websocket;
import cn.hutool.json.JSONUtil;
import com.supervision.constant.UserTokenConstant;
import com.supervision.websocket.dto.UserWebSocketDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KickUserListener implements MessageListener {
@Autowired
private UserResourceCheck userResourceCheck;
@Override
public void onMessage(Message message, byte[] pattern) {
String messageString = message.toString();
UserWebSocketDTO user = JSONUtil.toBean(messageString, UserWebSocketDTO.class);
log.info("Redis的Channel:{}收到踢用户{}下线消息", UserTokenConstant.KICK_CHANNEL, user.getUuid());
userResourceCheck.kickUser(user.getUuid());
}
}

@ -1,24 +0,0 @@
package com.supervision.websocket;
import com.supervision.constant.UserTokenConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisListener {
@Autowired
private KickUserListener kickUserListener;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(kickUserListener, new ChannelTopic(UserTokenConstant.KICK_CHANNEL));
return container;
}
}

@ -1,84 +0,0 @@
package com.supervision.websocket;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.lock.annotation.Lock4j;
import com.supervision.constant.UserTokenConstant;
import com.supervision.exception.BusinessException;
import com.supervision.websocket.dto.KickDTO;
import com.supervision.websocket.dto.NoResourceDTO;
import com.supervision.websocket.dto.WebSocketLoginDTO;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import javax.websocket.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RequiredArgsConstructor
public class UserResourceCheck {
@Value("${human.resourceMaxNumber}")
private String resourceNumber;
private final RedisTemplate<String, String> redisTemplate;
@Lock4j(name = "achieveDiagnoseResource")
public boolean achieveDiagnoseResource() {
long humanMaxNumber = Long.parseLong(resourceNumber);
long currentSize = redisTemplate.opsForHash().size(UserTokenConstant.USER_WEBSOCKET_CACHE);
// 如果小于数字人最大连接数,则可以连接
return currentSize < humanMaxNumber;
}
@Lock4j(name = "achieveDiagnoseResourceAndOpenConnection")
public void achieveDiagnoseResourceAndOpenConnection(String uuid, String userId, Session session) {
// 如果小于数字人最大连接数,则可以连接
if (!achieveDiagnoseResource()) {
try {
session.getBasicRemote().sendText(JSONUtil.toJsonStr(new NoResourceDTO()));
} catch (Exception e) {
log.error("发送消息失败", e);
}
throw new BusinessException("暂时没有资源,建立连接失败");
}
redisTemplate.opsForHash().put(UserTokenConstant.USER_WEBSOCKET_CACHE, uuid, JSONUtil.toJsonStr(new WebSocketLoginDTO(uuid, userId, session.getId())));
}
/**
* websocket,5
*/
// 实现一个方法用于踢下线用户,走的是Redis的消息队列
public void kickUser(String uuid) {
log.info("尝试踢uuid:{}下线", uuid);
Session session = WebSocketSessionPool.SESSION_POOL.get(uuid);
// 只有不是忽略剔除的sessionId才可以踢下线
if (ObjectUtil.isNotEmpty(session)) {
try {
session.getBasicRemote().sendText(JSONUtil.toJsonStr(new KickDTO()));
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "用户被踢下线"));
WebSocketSessionPool.SESSION_POOL.remove(uuid);
// 然后从redis中移除掉
redisTemplate.opsForHash().delete(UserTokenConstant.USER_WEBSOCKET_CACHE, uuid);
log.info("踢UUID:{},sessionId:{} 下线成功", uuid, session.getId());
return;
} catch (IOException e) {
log.error("用户:{}的websocket连接异常", e.getMessage());
// TODO 如果用户连接异常,怎么办
}
}
log.info("踢UUID:{}下线,未找到用户,踢下线失败", uuid);
}
}

@ -1,13 +0,0 @@
package com.supervision.websocket;
import com.supervision.websocket.dto.WebSocketLoginDTO;
import javax.websocket.Session;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketSessionPool {
//concurrent包的线程安全Set用来存放每个客户端对应的WebSocketServer对象。
public static final ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();
}

@ -1,37 +0,0 @@
package com.supervision.websocket;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.supervision.websocket.UserResourceCheck;
import com.supervision.websocket.WebSocketSessionPool;
import com.supervision.websocket.dto.KeepaliveDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.websocket.Session;
import java.io.IOException;
import java.util.Map;
@Component
@Slf4j
@RequiredArgsConstructor
public class WebsocketKeepaliveTask {
@Scheduled(fixedDelay = 5 * 1000L)
public void keepalive() {
for (Map.Entry<String, Session> entries : WebSocketSessionPool.SESSION_POOL.entrySet()) {
String userId = entries.getKey();
Session session = entries.getValue();
if (ObjectUtil.isNotEmpty(session)) {
try {
session.getBasicRemote().sendText(JSONUtil.toJsonStr(new KeepaliveDTO()));
log.info("用户:{}的websocket保活成功", userId);
} catch (IOException e) {
log.error("用户:{}的websocket连接异常", userId, e);
}
}
}
}
}

@ -1,12 +0,0 @@
package com.supervision.websocket.dto;
import com.supervision.constant.UserTokenConstant;
import lombok.Data;
@Data
public class KeepaliveDTO {
private final Integer code = UserTokenConstant.KEEPALIVE_CODE;
private final String message = "keepalive";
}

@ -1,12 +0,0 @@
package com.supervision.websocket.dto;
import com.supervision.constant.UserTokenConstant;
import lombok.Data;
@Data
public class KickDTO {
private final Integer code = UserTokenConstant.KICK_CODE;
private final String message = "用户被踢下线";
}

@ -1,10 +0,0 @@
package com.supervision.websocket.dto;
import com.supervision.constant.UserTokenConstant;
public class NoResourceDTO {
private final Integer code = UserTokenConstant.NO_RESOURCE_CODE;
private final String message = "用户被踢下线";
}

@ -1,12 +0,0 @@
package com.supervision.websocket.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class UserWebSocketDTO {
private String uuid;
}

@ -1,28 +0,0 @@
package com.supervision.websocket.dto;
import lombok.Data;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Data
public class WebSocketLoginDTO {
private String userId;
private String uuid;
private String sessionId;
private String loginTime;
public WebSocketLoginDTO(String uuid, String userId, String sessionId) {
this.userId = userId;
this.uuid = uuid;
this.sessionId = sessionId;
this.loginTime = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
}
public WebSocketLoginDTO() {
}
}
Loading…
Cancel
Save