From 55076f376b45723d0b08987adbaf5a467694b7fb Mon Sep 17 00:00:00 2001
From: xueqingkun <xueqingkun@126.com>
Date: Mon, 30 Dec 2024 14:25:37 +0800
Subject: [PATCH] =?UTF-8?q?1.=20=E6=B7=BB=E5=8A=A0=E5=8D=95=E4=B8=AA?=
 =?UTF-8?q?=E6=8F=90=E7=A4=BA=E8=AF=8D=E5=88=86=E6=9E=90=E7=AC=94=E5=BD=95?=
 =?UTF-8?q?=E5=8A=9F=E8=83=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../java/com/supervision/job/XxlJobTask.java  | 16 ++--
 .../police/service/TaskCaseRecordService.java | 16 ++++
 .../police/service/TaskRecordService.java     | 11 +++
 .../impl/TaskCaseRecordServiceImpl.java       | 76 ++++++++++++++++++
 .../service/impl/TaskRecordServiceImpl.java   | 80 ++++++++++++++++---
 5 files changed, 180 insertions(+), 19 deletions(-)

diff --git a/src/main/java/com/supervision/job/XxlJobTask.java b/src/main/java/com/supervision/job/XxlJobTask.java
index 1bf121f..595e511 100644
--- a/src/main/java/com/supervision/job/XxlJobTask.java
+++ b/src/main/java/com/supervision/job/XxlJobTask.java
@@ -2,6 +2,7 @@ package com.supervision.job;
 
 import cn.hutool.core.collection.CollUtil;
 import com.alibaba.fastjson.JSON;
+import com.supervision.common.constant.XxlJobConstants;
 import com.supervision.neo4j.service.Neo4jService;
 import com.supervision.police.domain.NotePrompt;
 import com.supervision.police.domain.TaskCaseRecord;
@@ -28,7 +29,8 @@ public class XxlJobTask {
     private final CaseEvidenceService caseEvidenceService;
 
     private final TaskRecordService taskRecordService;
-    private TaskCaseRecordService taskCaseRecordService;
+
+    private final TaskCaseRecordService taskCaseRecordService;
 
     private final ModelCaseService modelCaseService;
 
@@ -43,7 +45,7 @@ public class XxlJobTask {
     /**
      * 睡觉通知
      */
-    @XxlJob("evidenceAnalysis")
+    @XxlJob(XxlJobConstants.TASK_NAME_EVIDENCE_ANALYSIS)
     public void evidenceAnalysis() {
         String param = XxlJobHelper.getJobParam();
         log.info("【证据解析】任务开始。ID: 【{}】", param);
@@ -59,12 +61,12 @@ public class XxlJobTask {
     /**
      * 提示词提取任务
      */
-    @XxlJob("promptExtractTask")
+    @XxlJob(XxlJobConstants.TASK_NAME_PROMPT_EXTRACT_TASK)
     public void promptExtractTask() {
         String jobParam = XxlJobHelper.getJobParam();
         log.info("【提取任务】任务开始。参数: {}", jobParam);
+        Map<String, String> map = JSON.parseObject(XxlJobHelper.getJobParam(), Map.class);
         try {
-            Map<String, String> map = JSON.parseObject(XxlJobHelper.getJobParam(), Map.class);
             String taskId = map.get("taskId");
             String caseId = map.get("caseId");
             String promptId = map.get("promptId");
@@ -103,6 +105,7 @@ public class XxlJobTask {
                 }
                 switch (prompt.getType()) {
                     case TYPE_GRAPH_REASONING:
+                        log.info("【图推理】任务开始。任务ID: 【{}】", taskId);
                         List<TripleInfo> tripleInfos = extractTripleInfoService.extractTripleInfo(prompt, caseId, map.get("executeId"));
                         for (TripleInfo tripleInfo : tripleInfos) {
                             neo4jService.saveTripleInfo(tripleInfo);
@@ -110,8 +113,6 @@ public class XxlJobTask {
                         if (CollUtil.isNotEmpty(tripleInfos)){
                             tripleInfoService.updateNeo4jFlag(tripleInfos.stream().map(TripleInfo::getId).toList(), "1");
                         }
-
-                        log.info("【图推理】任务开始。任务ID: 【{}】", taskId);
                         break;
                     case TYPE_STRUCTURAL_REASONING:
                         log.info("【结构推理】任务开始。任务ID: 【{}】", taskId);
@@ -121,11 +122,12 @@ public class XxlJobTask {
                         log.error("未知的任务类型");
                         break;
                 }
-                //TODO:更新案件状态、任务状态
+                taskRecordService.completeTask(taskId, map.get("executeId"), true);
                 log.info("【提取任务】任务结束。任务ID: 【{}】", taskId);
             }
         } catch (Exception e) {
             log.error("任务执行失败", e);
+            taskRecordService.completeTask(map.get("taskId"), map.get("executeId"), false);
         } finally {
             log.info("【提取任务】任务结束。");
         }
diff --git a/src/main/java/com/supervision/police/service/TaskCaseRecordService.java b/src/main/java/com/supervision/police/service/TaskCaseRecordService.java
index 9e31396..94ecc72 100644
--- a/src/main/java/com/supervision/police/service/TaskCaseRecordService.java
+++ b/src/main/java/com/supervision/police/service/TaskCaseRecordService.java
@@ -17,4 +17,20 @@ public interface TaskCaseRecordService extends IService<TaskCaseRecord> {
      * @return
      */
     List<TaskCaseRecord> queryProcessingTaskList();
+
+
+    List<TaskCaseRecord> queryByTaskId(String taskId);
+
+
+    /**
+     * 获取实际状态
+     * @param taskCaseRecord 任务记录
+     * @return
+     */
+    String getActuallyStatus(TaskCaseRecord taskCaseRecord);
+
+    TaskCaseRecord updateStatus(String taskId, String executeId,boolean isSuccess);
+    TaskCaseRecord updateStatus(String taskId, String executeId,boolean isSuccess,List<TaskCaseRecord> taskCaseRecordList);
+
+    Boolean updateStatus(String taskId,List<String> olderStatus,String nowStatus);
 }
diff --git a/src/main/java/com/supervision/police/service/TaskRecordService.java b/src/main/java/com/supervision/police/service/TaskRecordService.java
index e227acd..b0b626b 100644
--- a/src/main/java/com/supervision/police/service/TaskRecordService.java
+++ b/src/main/java/com/supervision/police/service/TaskRecordService.java
@@ -1,6 +1,7 @@
 package com.supervision.police.service;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.supervision.police.domain.TaskCaseRecord;
 import com.supervision.police.domain.TaskRecord;
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.supervision.police.dto.TaskInfoDTO;
@@ -40,4 +41,14 @@ public interface TaskRecordService extends IService<TaskRecord> {
 
 
     void deleteTask(List<String> taskIds);
+
+    /**
+     * 完成任务
+     * @param taskId 任务id
+     * @param executeId 执行id
+     */
+    void completeTask(String taskId,String executeId,boolean isSuccess);
+
+
+    String determineStatus(List<TaskCaseRecord> taskCaseRecords);
 }
diff --git a/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java b/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java
index 4e9540e..2ab7952 100644
--- a/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java
+++ b/src/main/java/com/supervision/police/service/impl/TaskCaseRecordServiceImpl.java
@@ -1,12 +1,17 @@
 package com.supervision.police.service.impl;
 
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.StrUtil;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.supervision.common.constant.TaskRecordConstants;
 import com.supervision.police.domain.TaskCaseRecord;
 import com.supervision.police.service.TaskCaseRecordService;
 import com.supervision.police.mapper.TaskCaseRecordMapper;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -14,6 +19,7 @@ import java.util.List;
 * @description 针对表【task_case_record】的数据库操作Service实现
 * @createDate 2024-12-25 09:57:08
 */
+@Slf4j
 @Service
 public class TaskCaseRecordServiceImpl extends ServiceImpl<TaskCaseRecordMapper, TaskCaseRecord>
     implements TaskCaseRecordService{
@@ -21,6 +27,76 @@ public class TaskCaseRecordServiceImpl extends ServiceImpl<TaskCaseRecordMapper,
     public List<TaskCaseRecord> queryProcessingTaskList() {
         return super.lambdaQuery().eq(TaskCaseRecord::getStatus, TaskRecordConstants.TASK_STATUS_PROCESSING).list();
     }
+
+    @Override
+    public List<TaskCaseRecord> queryByTaskId(String taskId) {
+
+        return super.lambdaQuery().eq(TaskCaseRecord::getTaskRecordId, taskId).list();
+    }
+
+    @Override
+    public String getActuallyStatus(TaskCaseRecord taskCaseRecord) {
+        String waitingId = taskCaseRecord.getWaitingId();
+        if (StrUtil.isNotEmpty(waitingId)) {
+            return TaskRecordConstants.TASK_STATUS_PROCESSING;
+        }
+        return TaskRecordConstants.TASK_STATUS_SUCCESS;
+    }
+
+    @Override
+    public TaskCaseRecord updateStatus(String taskId, String executeId, boolean isSuccess) {
+        List<TaskCaseRecord> taskCaseRecords = this.queryByTaskId(taskId);
+
+        return updateStatus(taskId, executeId, isSuccess, taskCaseRecords);
+    }
+
+    @Override
+    public TaskCaseRecord updateStatus(String taskId, String executeId, boolean isSuccess, List<TaskCaseRecord> taskCaseRecords) {
+        // 理论上只能存在一个taskCase信息
+        List<TaskCaseRecord> taskCaseRecordList = taskCaseRecords.stream()
+                .filter(taskCaseRecord -> StrUtil.isNotEmpty(taskCaseRecord.getWaitingId()))
+                .filter(taskCaseRecord -> Arrays.asList(taskCaseRecord.getWaitingId().split(",")).contains(executeId))
+                .toList();
+        log.info("updateStatus:任务【{}】,当前执行ID【{}】,当前任务案件执行列表长度:{}", taskId, executeId, taskCaseRecordList.size());
+        TaskCaseRecord taskCaseRecord = CollUtil.getFirst(taskCaseRecordList);
+
+        taskCaseRecord.setWaitingId(removeSingle(taskCaseRecord.getWaitingId(), executeId));
+        if (isSuccess){
+            taskCaseRecord.setProcessedId(appendSingle(taskCaseRecord.getProcessedId(), executeId));
+        }else {
+            taskCaseRecord.setExceptionId(appendSingle(taskCaseRecord.getExceptionId(), executeId));
+        }
+        taskCaseRecord.setStatus(this.getActuallyStatus(taskCaseRecord));
+        // 更新任务案件执行记录
+        this.updateById(taskCaseRecord);
+        return taskCaseRecord;
+    }
+
+    @Override
+    public Boolean updateStatus(String taskId, List<String> oldStatus, String nowStatus) {
+        return super.lambdaUpdate()
+                .eq(TaskCaseRecord::getTaskRecordId, taskId)
+                .in(CollUtil.isNotEmpty(oldStatus),TaskCaseRecord::getStatus, oldStatus)
+                .set(TaskCaseRecord::getStatus, nowStatus)
+                .update();
+    }
+
+    private String appendSingle(String longString, String single) {
+        if (StrUtil.isEmpty(longString)){
+            return single;
+        }
+
+        return String.join(",", longString, single);
+    }
+
+    private String removeSingle(String longString, String single) {
+        if (StrUtil.isEmpty(longString)){
+            return longString;
+        }
+        String[] split = longString.split(",");
+        split = ArrayUtil.remove(split, ArrayUtil.indexOf(split, single));
+        return ArrayUtil.join(split, ",");
+    }
 }
 
 
diff --git a/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java b/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java
index 76f7b5a..405e176 100644
--- a/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java
+++ b/src/main/java/com/supervision/police/service/impl/TaskRecordServiceImpl.java
@@ -24,10 +24,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static com.supervision.common.constant.NotePromptConstants.TYPE_GRAPH_REASONING;
 import static com.supervision.common.constant.NotePromptConstants.TYPE_STRUCTURAL_REASONING;
@@ -42,15 +39,19 @@ import static com.supervision.common.constant.XxlJobConstants.TASK_NAME_PROMPT_E
 @Slf4j
 @Service
 @RequiredArgsConstructor
-public class TaskRecordServiceImpl extends ServiceImpl<TaskRecordMapper, TaskRecord>
-        implements TaskRecordService {
+public class TaskRecordServiceImpl extends ServiceImpl<TaskRecordMapper, TaskRecord> implements TaskRecordService {
 
-    final TaskCaseRecordService taskCaseRecordService;
-    final ModelCaseService modelCaseService;
-    final NoteRecordService noteRecordService;
-    final CaseEvidenceService caseEvidenceService;
-    final NotePromptService notePromptService;
-    final XxlJobService xxlJobService;
+    private final TaskCaseRecordService taskCaseRecordService;
+
+    private final ModelCaseService modelCaseService;
+
+    private final NoteRecordService noteRecordService;
+
+    private final CaseEvidenceService caseEvidenceService;
+
+    private final NotePromptService notePromptService;
+
+    private final XxlJobService xxlJobService;
 
     @Override
     public void executePromptExtractTask(TaskRecordVo taskRecordVo) {
@@ -210,6 +211,61 @@ public class TaskRecordServiceImpl extends ServiceImpl<TaskRecordMapper, TaskRec
 
     }
 
+    @Override
+    @Transactional(transactionManager = "dataSourceTransactionManager", rollbackFor = Exception.class)
+    public void completeTask(String taskId, String executeId,boolean isSuccess) {
+        if (StrUtil.isEmpty(taskId)){
+            log.info("completeTask:任务ID为空");
+            return;
+        }
+        if (StrUtil.isEmpty(executeId)){
+            log.info("completeTask:执行ID为空");
+            return;
+        }
+        TaskRecord taskRecord = super.getById(taskId);
+        if (null == taskRecord){
+            log.info("completeTask:任务不存在,任务ID:【{}】", taskId);
+            return;
+        }
+
+        TaskCaseRecord taskCaseRecord = taskCaseRecordService.updateStatus(taskId, executeId, isSuccess);
+        log.info("completeTask:任务ID:【{}】,执行ID:【{}】,任务状态:【{}】", taskId, executeId, taskCaseRecord.getStatus());
+        // 校验总体任务是否是取消中
+        if (StrUtil.equalsAny(taskRecord.getStatus(),TASK_STATUS_CANCELLING,TASK_STATUS_CANCELED)){
+            log.info("completeTask:任务状态为取消中,任务状态更新即将更新为【{}】,任务ID: 【{}】", taskId,taskRecord.getStatus());
+            Boolean success = taskCaseRecordService.updateStatus(taskId, List.of(TASK_STATUS_WAITING,TASK_STATUS_PROCESSING), TASK_STATUS_CANCELED);
+            log.info("completeTask:任务状态更新完成,task_case数据任务状态【{}】变动,任务ID: 【{}】", taskId, success?"产生":"无");
+            taskRecord.setStatus(TASK_STATUS_CANCELED);
+            this.updateById(taskRecord);
+            return;
+        }
+
+
+        // 更新任务状态
+        List<TaskCaseRecord> taskCaseRecords = taskCaseRecordService.queryByTaskId(taskId);
+        String taskStatus = this.determineStatus(taskCaseRecords);
+        log.info("completeTask:任务ID:【{}】,初始任务状态:【{}】,计算后任务状态:【{}】", taskId, taskCaseRecord.getStatus(),taskStatus);
+        if (!StrUtil.equals(taskStatus,taskRecord.getStatus())){
+            taskRecord.setStatus(taskStatus);
+            super.updateById(taskRecord);
+        }
+
+    }
+
+    @Override
+    public String determineStatus(List<TaskCaseRecord> taskCaseRecords) {
+        if (CollUtil.isEmpty(taskCaseRecords)){
+            return TASK_STATUS_SUCCESS;
+        }
+        //todo: 规则定下来再确定
+
+        // 有一组失败的,则任务失败
+        taskCaseRecords.stream().filter(taskCaseRecord -> StrUtil.isNotEmpty(taskCaseRecord.getExceptionId()));
+        return TASK_STATUS_FAIL;
+    }
+
+
+
 }