package com.supervision.rasa.service; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.StrUtil; import com.supervision.model.RasaModelInfo; import com.supervision.rasa.constant.RasaConstant; import com.supervision.rasa.pojo.dto.RasaRunParam; import com.supervision.rasa.util.PortUtil; import com.supervision.service.RasaModeService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @Slf4j @Component @RequiredArgsConstructor public class RasaModelManager { private final RasaModeService rasaModeService; private final RasaCmdService rasaCmdService; private boolean wakeUpInterruptServerRunning = false; public void wakeUpInterruptServer(){ // 1. 查找出记录表中存活的服务 List rasaModelInfos = rasaModeService.listActive(); List activeRasaList = rasaModelInfos.stream().filter(info -> CollectionUtil.isNotEmpty(info.getRunCmd()) && null != info.getPort()).collect(Collectors.toList()); if (CollectionUtil.isEmpty(activeRasaList)){ log.info("wakeUpInterruptService: no rasa service need wake up ..."); return; } // 2. 重新启动中断的服务 for (RasaModelInfo rasaModelInfo : activeRasaList) { if (!PortUtil.portIsActive(rasaModelInfo.getPort())){ try { RasaRunParam rasaRunParam = RasaRunParam.build(rasaModelInfo.getRunCmd()); rasaRunParam.setPort(String.valueOf(rasaModelInfo.getPort())); rasaRunParam.setShellPath(rasaCmdService.getShellPath(RasaConstant.RUN_SHELL)); List outMessageList = rasaCmdService.execCmd(rasaRunParam.toList(), s -> StrUtil.isNotBlank(s) && s.contains(RasaConstant.RUN_SUCCESS_MESSAGE), 300); rasaModelInfo.setRunLog(String.join("\r\n",outMessageList)); rasaModelInfo.setRunCmd(rasaRunParam.toList()); rasaModeService.updateById(rasaModelInfo); if (!runIsSuccess(outMessageList)){ log.info("wakeUpInterruptServer: restart server port for {} failed,details info : {}",rasaModelInfo.getPort(),String.join("\r\n",outMessageList)); } } catch (InterruptedException | ExecutionException | TimeoutException e ) { log.info("wakeUpInterruptServer: restart server port for {} failed",rasaModelInfo.getPort()); throw new RuntimeException(e); } log.info("wakeUpInterruptServer: restart server port for {} success ",rasaModelInfo.getPort()); }else { log.info("wakeUpInterruptServer: port:{} is run..",rasaModelInfo.getPort()); } } } //默认每十分钟执行一次 @Scheduled(cron = "${rasa.wakeup.cron:0 */10 * * * ?}") public void wakeUpInterruptServerScheduled(){ try { log.info("wakeUpInterruptServerScheduled: Scheduled is run .... wakeUpInterruptServerRunning is :{}",wakeUpInterruptServerRunning); if (!wakeUpInterruptServerRunning){ wakeUpInterruptServerRunning = true; wakeUpInterruptServer(); } } finally { wakeUpInterruptServerRunning = false; } } private boolean runIsSuccess(List messageList){ return containKey(messageList,RasaConstant.RUN_SUCCESS_MESSAGE); } private boolean containKey(List messageList,String keyWord){ if (CollectionUtil.isEmpty(messageList)){ return false; } if (StrUtil.isEmpty(keyWord)){ return false; } return messageList.stream().anyMatch(s->StrUtil.isNotEmpty(s) && s.contains(keyWord)); } }