You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
virtual-patient/virtual-patient-rasa/src/main/java/com/supervision/rasa/service/RasaModelManager.java

100 lines
3.9 KiB
Java

package com.supervision.rasa.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.supervision.model.RasaModelInfo;
import com.supervision.rasa.constant.RasaConstant;
import com.supervision.rasa.util.PortUtil;
import com.supervision.service.RasaModeService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class RasaModelManager {
private final RasaModeService rasaModeService;
private final RasaCmdService rasaCmdService;
private boolean wakeUpInterruptServerRunning = false;
public void wakeUpInterruptServer(){
// 1. 查找出记录表中存活的服务
List<RasaModelInfo> rasaModelInfos = rasaModeService.listActive();
List<RasaModelInfo> activeRasaList = rasaModelInfos.stream().filter(info -> CollectionUtil.isNotEmpty(info.getRunCmd()) && null != info.getPort()).collect(Collectors.toList());
if (CollectionUtil.isEmpty(activeRasaList)){
log.info("wakeUpInterruptService: no rasa service need wake up ...");
return;
}
// 2. 重新启动中断的服务
for (RasaModelInfo rasaModelInfo : activeRasaList) {
if (!PortUtil.portIsActive(rasaModelInfo.getPort())){
try {
List<String> runCmd = rasaModelInfo.getRunCmd();
runCmd.add(String.valueOf(rasaModelInfo.getPort()));
runCmd.set(1,rasaCmdService.getShellPath(RasaConstant.RUN_SHELL));
List<String> outMessageList = rasaCmdService.execCmd(rasaModelInfo.getRunCmd(),
s -> StrUtil.isNotBlank(s) && s.contains(RasaConstant.RUN_SUCCESS_MESSAGE), 300);
rasaModelInfo.setTrainLog(String.join("\r\n",outMessageList));
rasaModeService.updateById(rasaModelInfo);
if (!runIsSuccess(outMessageList)){
log.info("wakeUpInterruptServer: restart server port for {} failed,details info : {}",rasaModelInfo.getPort(),String.join("\r\n",outMessageList));
}
} catch (InterruptedException | ExecutionException | TimeoutException e ) {
log.info("wakeUpInterruptServer: restart server port for {} failed",rasaModelInfo.getPort());
throw new RuntimeException(e);
}
log.info("wakeUpInterruptServer: restart server port for {} success ",rasaModelInfo.getPort());
}else {
log.info("wakeUpInterruptServer: port:{} is run..",rasaModelInfo.getPort());
}
}
}
//默认每十分钟执行一次
@Scheduled(cron = "${rasa.wakeup.cron:0 */10 * * * ?}")
public void wakeUpInterruptServerScheduled(){
try {
log.info("wakeUpInterruptServerScheduled: Scheduled is run .... wakeUpInterruptServerRunning is :{}",wakeUpInterruptServerRunning);
if (!wakeUpInterruptServerRunning){
wakeUpInterruptServerRunning = true;
wakeUpInterruptServer();
}
} finally {
wakeUpInterruptServerRunning = false;
}
}
private boolean runIsSuccess(List<String> messageList){
return containKey(messageList,RasaConstant.RUN_SUCCESS_MESSAGE);
}
private boolean containKey(List<String> messageList,String keyWord){
if (CollectionUtil.isEmpty(messageList)){
return false;
}
if (StrUtil.isEmpty(keyWord)){
return false;
}
return messageList.stream().anyMatch(s->StrUtil.isNotEmpty(s) && s.contains(keyWord));
}
}