diff --git a/src/main/java/com/supervision/job/XxlJobTask.java b/src/main/java/com/supervision/job/XxlJobTask.java index 1bf121f..595e511 100644 --- a/src/main/java/com/supervision/job/XxlJobTask.java +++ b/src/main/java/com/supervision/job/XxlJobTask.java @@ -2,6 +2,7 @@ package com.supervision.job; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; +import com.supervision.common.constant.XxlJobConstants; import com.supervision.neo4j.service.Neo4jService; import com.supervision.police.domain.NotePrompt; import com.supervision.police.domain.TaskCaseRecord; @@ -28,7 +29,8 @@ public class XxlJobTask { private final CaseEvidenceService caseEvidenceService; private final TaskRecordService taskRecordService; - private TaskCaseRecordService taskCaseRecordService; + + private final TaskCaseRecordService taskCaseRecordService; private final ModelCaseService modelCaseService; @@ -43,7 +45,7 @@ public class XxlJobTask { /** * 睡觉通知 */ - @XxlJob("evidenceAnalysis") + @XxlJob(XxlJobConstants.TASK_NAME_EVIDENCE_ANALYSIS) public void evidenceAnalysis() { String param = XxlJobHelper.getJobParam(); log.info("【证据解析】任务开始。ID: 【{}】", param); @@ -59,12 +61,12 @@ public class XxlJobTask { /** * 提示词提取任务 */ - @XxlJob("promptExtractTask") + @XxlJob(XxlJobConstants.TASK_NAME_PROMPT_EXTRACT_TASK) public void promptExtractTask() { String jobParam = XxlJobHelper.getJobParam(); log.info("【提取任务】任务开始。参数: {}", jobParam); + Map map = JSON.parseObject(XxlJobHelper.getJobParam(), Map.class); try { - Map map = JSON.parseObject(XxlJobHelper.getJobParam(), Map.class); String taskId = map.get("taskId"); String caseId = map.get("caseId"); String promptId = map.get("promptId"); @@ -103,6 +105,7 @@ public class XxlJobTask { } switch (prompt.getType()) { case TYPE_GRAPH_REASONING: + log.info("【图推理】任务开始。任务ID: 【{}】", taskId); List tripleInfos = extractTripleInfoService.extractTripleInfo(prompt, caseId, map.get("executeId")); for (TripleInfo tripleInfo : tripleInfos) { neo4jService.saveTripleInfo(tripleInfo); @@ -110,8 +113,6 @@ public class XxlJobTask { if (CollUtil.isNotEmpty(tripleInfos)){ tripleInfoService.updateNeo4jFlag(tripleInfos.stream().map(TripleInfo::getId).toList(), "1"); } - - log.info("【图推理】任务开始。任务ID: 【{}】", taskId); break; case TYPE_STRUCTURAL_REASONING: log.info("【结构推理】任务开始。任务ID: 【{}】", taskId); @@ -121,11 +122,12 @@ public class XxlJobTask { log.error("未知的任务类型"); break; } - //TODO:更新案件状态、任务状态 + taskRecordService.completeTask(taskId, map.get("executeId"), true); log.info("【提取任务】任务结束。任务ID: 【{}】", taskId); } } catch (Exception e) { log.error("任务执行失败", e); + taskRecordService.completeTask(map.get("taskId"), map.get("executeId"), false); } finally { log.info("【提取任务】任务结束。"); } diff --git a/src/main/java/com/supervision/police/service/TaskCaseRecordService.java b/src/main/java/com/supervision/police/service/TaskCaseRecordService.java index 9e31396..94ecc72 100644 --- a/src/main/java/com/supervision/police/service/TaskCaseRecordService.java +++ b/src/main/java/com/supervision/police/service/TaskCaseRecordService.java @@ -17,4 +17,20 @@ public interface TaskCaseRecordService extends IService { * @return */ List queryProcessingTaskList(); + + + List queryByTaskId(String taskId); + + + /** + * 获取实际状态 + * @param taskCaseRecord 任务记录 + * @return + */ + String getActuallyStatus(TaskCaseRecord taskCaseRecord); + + TaskCaseRecord updateStatus(String taskId, String executeId,boolean isSuccess); + TaskCaseRecord updateStatus(String taskId, String executeId,boolean isSuccess,List taskCaseRecordList); + + Boolean updateStatus(String taskId,List olderStatus,String nowStatus); } diff --git a/src/main/java/com/supervision/police/service/TaskRecordService.java b/src/main/java/com/supervision/police/service/TaskRecordService.java index e227acd..b0b626b 100644 --- a/src/main/java/com/supervision/police/service/TaskRecordService.java +++ b/src/main/java/com/supervision/police/service/TaskRecordService.java @@ -1,6 +1,7 @@ package com.supervision.police.service; import com.baomidou.mybatisplus.core.metadata.IPage; +import com.supervision.police.domain.TaskCaseRecord; import com.supervision.police.domain.TaskRecord; import com.baomidou.mybatisplus.extension.service.IService; import com.supervision.police.dto.TaskInfoDTO; @@ -40,4 +41,14 @@ public interface TaskRecordService extends IService { void deleteTask(List taskIds); + + /** + * 完成任务 + * @param taskId 任务id + * @param executeId 执行id + */ + void completeTask(String taskId,String executeId,boolean isSuccess); + + + String determineStatus(List taskCaseRecords); } diff --git a/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java b/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java index 4e9540e..2ab7952 100644 --- a/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java +++ b/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java @@ -1,12 +1,17 @@ package com.supervision.police.service.impl; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.supervision.common.constant.TaskRecordConstants; import com.supervision.police.domain.TaskCaseRecord; import com.supervision.police.service.TaskCaseRecordService; import com.supervision.police.mapper.TaskCaseRecordMapper; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.util.Arrays; import java.util.List; /** @@ -14,6 +19,7 @@ import java.util.List; * @description 针对表【task_case_record】的数据库操作Service实现 * @createDate 2024-12-25 09:57:08 */ +@Slf4j @Service public class TaskCaseRecordServiceImpl extends ServiceImpl implements TaskCaseRecordService{ @@ -21,6 +27,76 @@ public class TaskCaseRecordServiceImpl extends ServiceImpl queryProcessingTaskList() { return super.lambdaQuery().eq(TaskCaseRecord::getStatus, TaskRecordConstants.TASK_STATUS_PROCESSING).list(); } + + @Override + public List queryByTaskId(String taskId) { + + return super.lambdaQuery().eq(TaskCaseRecord::getTaskRecordId, taskId).list(); + } + + @Override + public String getActuallyStatus(TaskCaseRecord taskCaseRecord) { + String waitingId = taskCaseRecord.getWaitingId(); + if (StrUtil.isNotEmpty(waitingId)) { + return TaskRecordConstants.TASK_STATUS_PROCESSING; + } + return TaskRecordConstants.TASK_STATUS_SUCCESS; + } + + @Override + public TaskCaseRecord updateStatus(String taskId, String executeId, boolean isSuccess) { + List taskCaseRecords = this.queryByTaskId(taskId); + + return updateStatus(taskId, executeId, isSuccess, taskCaseRecords); + } + + @Override + public TaskCaseRecord updateStatus(String taskId, String executeId, boolean isSuccess, List taskCaseRecords) { + // 理论上只能存在一个taskCase信息 + List taskCaseRecordList = taskCaseRecords.stream() + .filter(taskCaseRecord -> StrUtil.isNotEmpty(taskCaseRecord.getWaitingId())) + .filter(taskCaseRecord -> Arrays.asList(taskCaseRecord.getWaitingId().split(",")).contains(executeId)) + .toList(); + log.info("updateStatus:任务【{}】,当前执行ID【{}】,当前任务案件执行列表长度:{}", taskId, executeId, taskCaseRecordList.size()); + TaskCaseRecord taskCaseRecord = CollUtil.getFirst(taskCaseRecordList); + + taskCaseRecord.setWaitingId(removeSingle(taskCaseRecord.getWaitingId(), executeId)); + if (isSuccess){ + taskCaseRecord.setProcessedId(appendSingle(taskCaseRecord.getProcessedId(), executeId)); + }else { + taskCaseRecord.setExceptionId(appendSingle(taskCaseRecord.getExceptionId(), executeId)); + } + taskCaseRecord.setStatus(this.getActuallyStatus(taskCaseRecord)); + // 更新任务案件执行记录 + this.updateById(taskCaseRecord); + return taskCaseRecord; + } + + @Override + public Boolean updateStatus(String taskId, List oldStatus, String nowStatus) { + return super.lambdaUpdate() + .eq(TaskCaseRecord::getTaskRecordId, taskId) + .in(CollUtil.isNotEmpty(oldStatus),TaskCaseRecord::getStatus, oldStatus) + .set(TaskCaseRecord::getStatus, nowStatus) + .update(); + } + + private String appendSingle(String longString, String single) { + if (StrUtil.isEmpty(longString)){ + return single; + } + + return String.join(",", longString, single); + } + + private String removeSingle(String longString, String single) { + if (StrUtil.isEmpty(longString)){ + return longString; + } + String[] split = longString.split(","); + split = ArrayUtil.remove(split, ArrayUtil.indexOf(split, single)); + return ArrayUtil.join(split, ","); + } } diff --git a/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java b/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java index 76f7b5a..405e176 100644 --- a/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java +++ b/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java @@ -24,10 +24,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static com.supervision.common.constant.NotePromptConstants.TYPE_GRAPH_REASONING; import static com.supervision.common.constant.NotePromptConstants.TYPE_STRUCTURAL_REASONING; @@ -42,15 +39,19 @@ import static com.supervision.common.constant.XxlJobConstants.TASK_NAME_PROMPT_E @Slf4j @Service @RequiredArgsConstructor -public class TaskRecordServiceImpl extends ServiceImpl - implements TaskRecordService { +public class TaskRecordServiceImpl extends ServiceImpl implements TaskRecordService { - final TaskCaseRecordService taskCaseRecordService; - final ModelCaseService modelCaseService; - final NoteRecordService noteRecordService; - final CaseEvidenceService caseEvidenceService; - final NotePromptService notePromptService; - final XxlJobService xxlJobService; + private final TaskCaseRecordService taskCaseRecordService; + + private final ModelCaseService modelCaseService; + + private final NoteRecordService noteRecordService; + + private final CaseEvidenceService caseEvidenceService; + + private final NotePromptService notePromptService; + + private final XxlJobService xxlJobService; @Override public void executePromptExtractTask(TaskRecordVo taskRecordVo) { @@ -210,6 +211,61 @@ public class TaskRecordServiceImpl extends ServiceImpl taskCaseRecords = taskCaseRecordService.queryByTaskId(taskId); + String taskStatus = this.determineStatus(taskCaseRecords); + log.info("completeTask:任务ID:【{}】,初始任务状态:【{}】,计算后任务状态:【{}】", taskId, taskCaseRecord.getStatus(),taskStatus); + if (!StrUtil.equals(taskStatus,taskRecord.getStatus())){ + taskRecord.setStatus(taskStatus); + super.updateById(taskRecord); + } + + } + + @Override + public String determineStatus(List taskCaseRecords) { + if (CollUtil.isEmpty(taskCaseRecords)){ + return TASK_STATUS_SUCCESS; + } + //todo: 规则定下来再确定 + + // 有一组失败的,则任务失败 + taskCaseRecords.stream().filter(taskCaseRecord -> StrUtil.isNotEmpty(taskCaseRecord.getExceptionId())); + return TASK_STATUS_FAIL; + } + + + }