|
|
package com.supervision.rasa.service;
|
|
|
|
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import com.supervision.model.RasaModelInfo;
|
|
|
import com.supervision.rasa.constant.RasaConstant;
|
|
|
import com.supervision.rasa.util.PortUtil;
|
|
|
import com.supervision.service.RasaModeService;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
@RequiredArgsConstructor
|
|
|
public class RasaModelManager {
|
|
|
|
|
|
private final RasaModeService rasaModeService;
|
|
|
|
|
|
private final RasaCmdService rasaCmdService;
|
|
|
|
|
|
private boolean wakeUpInterruptServerRunning = false;
|
|
|
|
|
|
public void wakeUpInterruptServer(){
|
|
|
|
|
|
// 1. 查找出记录表中存活的服务
|
|
|
List<RasaModelInfo> rasaModelInfos = rasaModeService.listActive();
|
|
|
List<RasaModelInfo> activeRasaList = rasaModelInfos.stream().filter(info -> CollectionUtil.isNotEmpty(info.getRunCmd()) && null != info.getPort()).collect(Collectors.toList());
|
|
|
|
|
|
if (CollectionUtil.isEmpty(activeRasaList)){
|
|
|
log.info("wakeUpInterruptService: no rasa service need wake up ...");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// 2. 重新启动中断的服务
|
|
|
for (RasaModelInfo rasaModelInfo : activeRasaList) {
|
|
|
if (!PortUtil.portIsActive(rasaModelInfo.getPort())){
|
|
|
try {
|
|
|
rasaModelInfo.getRunCmd().add(String.valueOf(rasaModelInfo.getPort()));
|
|
|
List<String> outMessageList = rasaCmdService.execCmd(rasaModelInfo.getRunCmd(),
|
|
|
s -> StrUtil.isNotBlank(s) && s.contains(RasaConstant.RUN_SUCCESS_MESSAGE), 300);
|
|
|
|
|
|
rasaModelInfo.setTrainLog(String.join("\r\n",outMessageList));
|
|
|
rasaModeService.updateById(rasaModelInfo);
|
|
|
if (!runIsSuccess(outMessageList)){
|
|
|
log.info("wakeUpInterruptServer: restart server port for {} failed,details info : {}",rasaModelInfo.getPort(),String.join("\r\n",outMessageList));
|
|
|
}
|
|
|
} catch (InterruptedException | ExecutionException | TimeoutException e ) {
|
|
|
log.info("wakeUpInterruptServer: restart server port for {} failed",rasaModelInfo.getPort());
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
log.info("wakeUpInterruptServer: restart server port for {} success !",rasaModelInfo.getPort());
|
|
|
}else {
|
|
|
log.info("wakeUpInterruptServer: port:{} is run..",rasaModelInfo.getPort());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//默认每十分钟执行一次
|
|
|
@Scheduled(cron = "${rasa.wakeup.cron:0 */10 * * * ?}")
|
|
|
public void wakeUpInterruptServerScheduled(){
|
|
|
try {
|
|
|
log.info("wakeUpInterruptServerScheduled: Scheduled is run .... wakeUpInterruptServerRunning is :{}",wakeUpInterruptServerRunning);
|
|
|
if (!wakeUpInterruptServerRunning){
|
|
|
wakeUpInterruptServerRunning = true;
|
|
|
wakeUpInterruptServer();
|
|
|
|
|
|
}
|
|
|
} finally {
|
|
|
wakeUpInterruptServerRunning = false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private boolean runIsSuccess(List<String> messageList){
|
|
|
|
|
|
return containKey(messageList,RasaConstant.RUN_SUCCESS_MESSAGE);
|
|
|
}
|
|
|
|
|
|
private boolean containKey(List<String> messageList,String keyWord){
|
|
|
|
|
|
if (CollectionUtil.isEmpty(messageList)){
|
|
|
return false;
|
|
|
}
|
|
|
if (StrUtil.isEmpty(keyWord)){
|
|
|
return false;
|
|
|
}
|
|
|
return messageList.stream().anyMatch(s->StrUtil.isNotEmpty(s) && s.contains(keyWord));
|
|
|
}
|
|
|
}
|