From d44d05c8abc779e0953f287331b9c72dc1a3f0b3 Mon Sep 17 00:00:00 2001 From: liu <liujiatong112@163.com> Date: Wed, 31 Jul 2024 15:41:28 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../police/domain/CaseTaskRecord.java | 5 - .../police/dto/RecordUploadStatusVO.java | 5 + .../service/RecordSplitProcessService.java | 11 ++ .../service/RecordSplitTypeService.java | 2 +- .../impl/ExtractTripleInfoServiceImpl.java | 13 --- .../impl/ModelRecordTypeServiceImpl.java | 100 +++++++----------- .../impl/NoteRecordSplitServiceImpl.java | 83 +++++++++++++-- .../impl/RecordSplitProcessServiceImpl.java | 23 ++++ .../impl/RecordSplitTypeServiceImpl.java | 87 ++++++++++++--- .../thread/RecordSplitTypeThread.java | 23 ++-- src/main/resources/application-dev.yml | 8 +- 11 files changed, 238 insertions(+), 122 deletions(-) create mode 100644 src/main/java/com/supervision/police/dto/RecordUploadStatusVO.java create mode 100644 src/main/java/com/supervision/police/service/RecordSplitProcessService.java create mode 100644 src/main/java/com/supervision/police/service/impl/RecordSplitProcessServiceImpl.java diff --git a/src/main/java/com/supervision/police/domain/CaseTaskRecord.java b/src/main/java/com/supervision/police/domain/CaseTaskRecord.java index d55439c..4c63c0f 100644 --- a/src/main/java/com/supervision/police/domain/CaseTaskRecord.java +++ b/src/main/java/com/supervision/police/domain/CaseTaskRecord.java @@ -20,11 +20,6 @@ public class CaseTaskRecord implements Serializable { @TableId private String id; - /** - * 类型 1笔录分类 2提取三元组 - */ - private Integer type; - /** * 案件ID */ diff --git a/src/main/java/com/supervision/police/dto/RecordUploadStatusVO.java b/src/main/java/com/supervision/police/dto/RecordUploadStatusVO.java new file mode 100644 index 0000000..21f7100 --- /dev/null +++ b/src/main/java/com/supervision/police/dto/RecordUploadStatusVO.java @@ -0,0 +1,5 @@ +package com.supervision.police.dto; + + +public class RecordUploadStatusVO { +} diff --git a/src/main/java/com/supervision/police/service/RecordSplitProcessService.java b/src/main/java/com/supervision/police/service/RecordSplitProcessService.java new file mode 100644 index 0000000..3cace96 --- /dev/null +++ b/src/main/java/com/supervision/police/service/RecordSplitProcessService.java @@ -0,0 +1,11 @@ +package com.supervision.police.service; + +import com.supervision.police.domain.ModelRecordType; +import com.supervision.police.domain.NoteRecordSplit; + +import java.util.List; + +public interface RecordSplitProcessService { + + void process(List<ModelRecordType> allTypeList, List<NoteRecordSplit> splitList); +} diff --git a/src/main/java/com/supervision/police/service/RecordSplitTypeService.java b/src/main/java/com/supervision/police/service/RecordSplitTypeService.java index cf5bd13..186261f 100644 --- a/src/main/java/com/supervision/police/service/RecordSplitTypeService.java +++ b/src/main/java/com/supervision/police/service/RecordSplitTypeService.java @@ -8,5 +8,5 @@ import java.util.List; public interface RecordSplitTypeService { - void type(List<ModelRecordType> allTypeList, QARecordNodeDTO qa, NoteRecordSplit noteRecord); + void type(List<ModelRecordType> allTypeList, List<NoteRecordSplit> splitList); } diff --git a/src/main/java/com/supervision/police/service/impl/ExtractTripleInfoServiceImpl.java b/src/main/java/com/supervision/police/service/impl/ExtractTripleInfoServiceImpl.java index 4b9b2be..f7504de 100644 --- a/src/main/java/com/supervision/police/service/impl/ExtractTripleInfoServiceImpl.java +++ b/src/main/java/com/supervision/police/service/impl/ExtractTripleInfoServiceImpl.java @@ -159,23 +159,10 @@ public class ExtractTripleInfoServiceImpl implements ExtractTripleInfoService { if (CollUtil.isNotEmpty(tripleInfos)) { // 首先清除现在已经提取过的三元组信息 tripleInfoService.lambdaUpdate().eq(TripleInfo::getRecordId, recordId).remove(); - // TODO 这里,如果已经生成了图谱,怎么办? - // 首先要把这个笔录已经提取过的三元组记录删除掉,删除掉之后才可以重新提取 for (TripleInfo tripleInfo : tripleInfos) { tripleInfoService.save(tripleInfo); } } - if (CollUtil.isEmpty(futures)) { - // 如果所有的任务都执行完了,就标记为成功 - caseTaskRecordService.lambdaUpdate().set(CaseTaskRecord::getStatus, 2).set(CaseTaskRecord::getFinishTime, LocalDateTime.now()) - .eq(CaseTaskRecord::getType, 2).eq(CaseTaskRecord::getRecordId, recordId) - .eq(CaseTaskRecord::getCaseId, caseId).update(); - } else { - // 否则标记为失败 - caseTaskRecordService.lambdaUpdate().set(CaseTaskRecord::getStatus, 3).set(CaseTaskRecord::getFinishTime, LocalDateTime.now()) - .eq(CaseTaskRecord::getType, 2).eq(CaseTaskRecord::getRecordId, recordId) - .eq(CaseTaskRecord::getCaseId, caseId).update(); - } log.info("三元组提取任务执行完毕,结束"); } } diff --git a/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java b/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java index f1f79e5..3427176 100644 --- a/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java +++ b/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java @@ -51,12 +51,17 @@ public class ModelRecordTypeServiceImpl extends ServiceImpl<ModelRecordTypeMappe private final OllamaChatClient chatClient; - private final CaseTaskRecordService caseTaskRecordService; private final NotePromptTypeRelService notePromptTypeRelService; + private final CaseTaskRecordService caseTaskRecordService; + + private final NoteRecordSplitService noteRecordSplitService; + + private final RecordSplitProcessService recordSplitProcessService; @Autowired - private ExtractTripleInfoService extractTripleInfo; + private ModelRecordTypeService modelRecordTypeService; + @Override public List<TypeDTO> queryTypeListChoose() { @@ -154,7 +159,7 @@ public class ModelRecordTypeServiceImpl extends ServiceImpl<ModelRecordTypeMappe boolean save; if (StringUtils.isEmpty(prompt.getId())) { // 新增的时候,校验是否已经存在相同的三元组关系,如果已经存在了相同的三元组关系,不允许添加 - checkHasSameTriple(prompt.getStartEntityType(), prompt.getRelType(),prompt.getEndEntityType(),null); + checkHasSameTriple(prompt.getStartEntityType(), prompt.getRelType(), prompt.getEndEntityType(), null); save = notePromptService.save(prompt); // 新增prompt绑定的分类信息 for (String typeId : typeList) { @@ -164,7 +169,7 @@ public class ModelRecordTypeServiceImpl extends ServiceImpl<ModelRecordTypeMappe notePromptTypeRelService.save(rel); } } else { - checkHasSameTriple(prompt.getStartEntityType(), prompt.getRelType(),prompt.getEndEntityType(),prompt.getId()); + checkHasSameTriple(prompt.getStartEntityType(), prompt.getRelType(), prompt.getEndEntityType(), prompt.getId()); save = notePromptService.updateById(prompt); // 更新prompt绑定的分类信息 // 首先查询已经有的,如果都存在,就不变,如果数据库有,前端没有,就删除,如果前端有,数据库没有,就新增 @@ -222,15 +227,15 @@ public class ModelRecordTypeServiceImpl extends ServiceImpl<ModelRecordTypeMappe } } - private void checkHasSameTriple(String startEntityType, String relType, String endEntityType,String promptId) { + private void checkHasSameTriple(String startEntityType, String relType, String endEntityType, String promptId) { List<NotePrompt> list = notePromptService.lambdaQuery().eq(NotePrompt::getStartEntityType, startEntityType) .eq(NotePrompt::getRelType, relType).eq(NotePrompt::getEndEntityType, endEntityType).list(); - if (CollUtil.isNotEmpty(list) ) { - if (StrUtil.isBlank(promptId)){ + if (CollUtil.isNotEmpty(list)) { + if (StrUtil.isBlank(promptId)) { throw new RuntimeException("该三元组关系已经存在,请勿重复添加"); - }else { + } else { // 校验list查出来的是不是和promptId相等,如果不想等,也报错 - if (!list.get(0).getId().equals(promptId)){ + if (!list.get(0).getId().equals(promptId)) { throw new RuntimeException("该三元组关系已经存在,请勿重复添加"); } } @@ -252,70 +257,39 @@ public class ModelRecordTypeServiceImpl extends ServiceImpl<ModelRecordTypeMappe } } - + /* + * 异步提交的任务,不回滚 + */ @Override + @Transactional(transactionManager = "dataSourceTransactionManager", rollbackFor = Exception.class, noRollbackFor = BusinessException.class) public List<TripleInfo> getThreeInfo(String caseId, String name, String recordId) { if (StrUtil.isBlank(recordId)) { throw new RuntimeException("笔录ID不能为空"); } - boolean taskStatus = taskExtractStatusCheck(caseId, recordId); - // 如果校验结果为false,则说明需要进行提取三元组操作 - if (!taskStatus) { - extractTripleInfo.extractTripleInfo(caseId, name, recordId); - } - // 这里进行查询 - return tripleInfoService.lambdaQuery().eq(TripleInfo::getRecordId, recordId).list(); - } - - /** - * 提取任务校验,校验是否已经存在相关的人物,如果存在相关的任务,就不再继续执行了,直接告诉任务正在执行中 - */ - private boolean taskExtractStatusCheck(String caseId, String recordId) { - // 首先查询是否存在任务,如果不存在,就新建 - Optional<CaseTaskRecord> caseTaskRecordOpt = caseTaskRecordService.lambdaQuery() - .eq(CaseTaskRecord::getType, 2).eq(CaseTaskRecord::getCaseId, caseId).eq(CaseTaskRecord::getRecordId, recordId).oneOpt(); - if (caseTaskRecordOpt.isEmpty()) { - CaseTaskRecord newCaseTaskRecord = new CaseTaskRecord(); - newCaseTaskRecord.setType(2); - newCaseTaskRecord.setCaseId(caseId); - newCaseTaskRecord.setRecordId(recordId); - newCaseTaskRecord.setStatus(1); - newCaseTaskRecord.setSubmitTime(LocalDateTime.now()); - caseTaskRecordService.save(newCaseTaskRecord); - return false; - } else { - - // 如果存在,则校验时间是否已经超过1天,如果超过了1天还没有执行完毕,就重新提交这个任务 + // 这里查询任务是否完成,如果完成了,就给结果 + Optional<CaseTaskRecord> caseTaskRecordOpt = caseTaskRecordService.lambdaQuery().eq(CaseTaskRecord::getCaseId, caseId) + .eq(CaseTaskRecord::getRecordId, recordId).oneOpt(); + if (caseTaskRecordOpt.isPresent()) { CaseTaskRecord caseTaskRecord = caseTaskRecordOpt.get(); - // 如果未执行,则提交执行 - if (caseTaskRecordOpt.get().getStatus() == 0) { - caseTaskRecord.setStatus(1); - caseTaskRecord.setSubmitTime(LocalDateTime.now()); - caseTaskRecordService.updateById(caseTaskRecord); - return false; + // 0未执行 1正在执行 2执行成功 3执行超时 + if (caseTaskRecord.getStatus() == 1) { + throw new BusinessException("笔录解析任务未完成,请等待"); } - if (caseTaskRecordOpt.get().getStatus() == 1 && LocalDateTime.now().isAfter(caseTaskRecord.getSubmitTime().plusDays(1))) { - // 如果已经超过1天,则重新提交任务 - caseTaskRecord.setStatus(1); - caseTaskRecord.setSubmitTime(LocalDateTime.now()); - caseTaskRecordService.updateById(caseTaskRecord); - return false; - } else if (caseTaskRecordOpt.get().getStatus() == 2) { - // 如果执行成功,就返回true,取反之后就可以返回三元组信息了 - return true; - } else if (caseTaskRecordOpt.get().getStatus() == 3) { - caseTaskRecord.setStatus(1); - caseTaskRecord.setSubmitTime(LocalDateTime.now()); - caseTaskRecordService.updateById(caseTaskRecord); - return false; - } else { - // 如果没有超过1天,则返回正在执行中 - throw new BusinessException("笔录拆分及三元组提取任务正在执行中,请稍后"); - + if (caseTaskRecord.getStatus() == 0 || caseTaskRecord.getStatus() == 3) { + // 重新提交 + List<ModelRecordType> allTypeList = modelRecordTypeService.lambdaQuery().list(); + // 根据recordId查询所有的分割后的笔录 + List<NoteRecordSplit> list = noteRecordSplitService.lambdaQuery().eq(NoteRecordSplit::getNoteRecordId, recordId).list(); + recordSplitProcessService.process(allTypeList, list); + // 更新为1执行中 + throw new BusinessException("笔录解析任务未完成,请等待"); } } + // 这里进行查询 + return tripleInfoService.lambdaQuery().eq(TripleInfo::getRecordId, recordId).list(); } + @Override public void testExtractThreeInfo() { // ------------------------- @@ -428,7 +402,7 @@ public class ModelRecordTypeServiceImpl extends ServiceImpl<ModelRecordTypeMappe @Override public String addNeo4j(List<String> ids) { - if (CollUtil.isEmpty(ids)){ + if (CollUtil.isEmpty(ids)) { return "成功插入0条信息"; } List<TripleInfo> tripleInfos = tripleInfoService.listByIds(ids); diff --git a/src/main/java/com/supervision/police/service/impl/NoteRecordSplitServiceImpl.java b/src/main/java/com/supervision/police/service/impl/NoteRecordSplitServiceImpl.java index 6d3db5b..003cd68 100644 --- a/src/main/java/com/supervision/police/service/impl/NoteRecordSplitServiceImpl.java +++ b/src/main/java/com/supervision/police/service/impl/NoteRecordSplitServiceImpl.java @@ -7,9 +7,11 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.supervision.common.utils.IPages; import com.supervision.common.utils.ListUtils; import com.supervision.common.utils.StringUtils; +import com.supervision.config.BusinessException; import com.supervision.minio.domain.MinioFile; import com.supervision.minio.mapper.MinioFileMapper; import com.supervision.minio.service.MinioService; +import com.supervision.police.domain.CaseTaskRecord; import com.supervision.police.domain.ModelRecordType; import com.supervision.police.domain.NoteRecordSplit; import com.supervision.police.domain.NoteRecord; @@ -18,10 +20,7 @@ import com.supervision.police.dto.NoteRecordDetailDTO; import com.supervision.police.mapper.ModelCaseMapper; import com.supervision.police.mapper.NoteRecordSplitMapper; import com.supervision.police.mapper.NoteRecordMapper; -import com.supervision.police.service.CaseStatusManageService; -import com.supervision.police.service.ModelRecordTypeService; -import com.supervision.police.service.NoteRecordSplitService; -import com.supervision.police.service.RecordSplitTypeService; +import com.supervision.police.service.*; import com.supervision.springaidemo.dto.QARecordNodeDTO; import com.supervision.springaidemo.util.RecordRegexUtil; import com.supervision.springaidemo.util.WordReadUtil; @@ -47,15 +46,16 @@ public class NoteRecordSplitServiceImpl extends ServiceImpl<NoteRecordSplitMappe private final MinioService minioService; - private final ModelCaseMapper modelCaseMapper; - private final MinioFileMapper minioFileMapper; + + private final CaseTaskRecordService caseTaskRecordService; + @Autowired private ModelRecordTypeService modelRecordTypeService; @Autowired - private RecordSplitTypeService recordSplitTypeService; + private RecordSplitProcessService recordSplitProcessService; private final CaseStatusManageService caseStatusManageService; @@ -92,7 +92,7 @@ public class NoteRecordSplitServiceImpl extends ServiceImpl<NoteRecordSplitMappe //所有对话类型 List<ModelRecordType> allTypeList = modelRecordTypeService.lambdaQuery().list(); if (i > 0 && CollUtil.isNotEmpty(records.getFileIdList())) { - //拆分笔录 + // 拆分笔录 for (String fileId : records.getFileIdList()) { MinioFile minioFile = minioService.getMinioFile(fileId); InputStream inputStream = null; @@ -105,6 +105,7 @@ public class NoteRecordSplitServiceImpl extends ServiceImpl<NoteRecordSplitMappe } String context = WordReadUtil.readWord(inputStream); List<QARecordNodeDTO> qaList = RecordRegexUtil.recordRegex(context, record.getName()); + List<NoteRecordSplit> splitList = new ArrayList<>(); for (QARecordNodeDTO qa : qaList) { try { NoteRecordSplit noteRecord = new NoteRecordSplit(); @@ -116,12 +117,19 @@ public class NoteRecordSplitServiceImpl extends ServiceImpl<NoteRecordSplitMappe noteRecord.setAnswer(qa.getAnswer()); noteRecord.setCreateTime(LocalDateTime.now()); this.save(noteRecord); - // 通过异步的形式提交分类 - recordSplitTypeService.type(allTypeList, qa, noteRecord); + splitList.add(noteRecord); + } catch (Exception e) { log.error(e.getMessage(), e); } } + // 创建任务之后,再开始 + boolean taskStatus = saveRecordProcessTask(record.getCaseId(), record.getId()); + // 如果校验结果为false,则说明需要进行分类以及三元组操作 + if (taskStatus) { + // 对笔录进行分类,并对笔录进行提取三元组 + recordSplitProcessService.process(allTypeList, splitList); + } } caseStatusManageService.whenUploadRecord(record.getCaseId()); return "保存成功"; @@ -130,6 +138,61 @@ public class NoteRecordSplitServiceImpl extends ServiceImpl<NoteRecordSplitMappe } } + /** + * 校验是否已经执行过任务 + * + * @param caseId + * @param recordId + * @return 没有执行过, 返回true, 没有执行过, 返回false + */ + private boolean saveRecordProcessTask(String caseId, String recordId) { + // 首先查询是否存在任务,如果不存在,就新建 + Optional<CaseTaskRecord> caseTaskRecordOpt = caseTaskRecordService.lambdaQuery() + .eq(CaseTaskRecord::getCaseId, caseId).eq(CaseTaskRecord::getRecordId, recordId).oneOpt(); + if (caseTaskRecordOpt.isEmpty()) { + CaseTaskRecord newCaseTaskRecord = new CaseTaskRecord(); + newCaseTaskRecord.setCaseId(caseId); + newCaseTaskRecord.setRecordId(recordId); + newCaseTaskRecord.setStatus(1); + newCaseTaskRecord.setSubmitTime(LocalDateTime.now()); + return caseTaskRecordService.save(newCaseTaskRecord); + } else { + // 如果存在,则校验时间是否已经超过1天,如果超过了1天还没有执行完毕,就重新提交这个任务 + CaseTaskRecord caseTaskRecord = caseTaskRecordOpt.get(); + // 如果未执行,则提交执行 + if (caseTaskRecordOpt.get().getStatus() == 0) { + caseTaskRecord.setStatus(1); + caseTaskRecord.setSubmitTime(LocalDateTime.now()); + caseTaskRecordService.updateById(caseTaskRecord); + return true; + } + if (caseTaskRecordOpt.get().getStatus() == 1 && LocalDateTime.now().isAfter(caseTaskRecord.getSubmitTime().plusDays(1))) { + // 如果已经超过1天,则重新提交任务 + caseTaskRecord.setStatus(1); + caseTaskRecord.setSubmitTime(LocalDateTime.now()); + caseTaskRecordService.updateById(caseTaskRecord); + return true; + } else if (caseTaskRecordOpt.get().getStatus() == 2) { + // 如果执行成功,就不再执行了 + return false; + } else if (caseTaskRecordOpt.get().getStatus() == 3) { + // 如果执行失败,就重新执行 + caseTaskRecord.setStatus(1); + caseTaskRecord.setSubmitTime(LocalDateTime.now()); + caseTaskRecordService.updateById(caseTaskRecord); + return true; + } else { + // 如果都不是就在跑一次 + caseTaskRecord.setStatus(1); + caseTaskRecord.setSubmitTime(LocalDateTime.now()); + caseTaskRecordService.updateById(caseTaskRecord); + return true; + + } + } + } + + @Override public Map<String, Object> queryRecords(NoteRecord noteRecords, Integer page, Integer size) { LambdaQueryWrapper<NoteRecord> wrapper = Wrappers.lambdaQuery(NoteRecord.class) diff --git a/src/main/java/com/supervision/police/service/impl/RecordSplitProcessServiceImpl.java b/src/main/java/com/supervision/police/service/impl/RecordSplitProcessServiceImpl.java new file mode 100644 index 0000000..5bdeb42 --- /dev/null +++ b/src/main/java/com/supervision/police/service/impl/RecordSplitProcessServiceImpl.java @@ -0,0 +1,23 @@ +package com.supervision.police.service.impl; + +import com.supervision.police.domain.ModelRecordType; +import com.supervision.police.domain.NoteRecordSplit; +import com.supervision.police.service.RecordSplitProcessService; +import com.supervision.police.service.RecordSplitTypeService; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +@RequiredArgsConstructor +public class RecordSplitProcessServiceImpl implements RecordSplitProcessService { + + private final RecordSplitTypeService recordSplitTypeService; + + @Override + public void process(List<ModelRecordType> allTypeList, List<NoteRecordSplit> splitList) { + // 通过异步的形式提交分类 + recordSplitTypeService.type(allTypeList,splitList); + } +} diff --git a/src/main/java/com/supervision/police/service/impl/RecordSplitTypeServiceImpl.java b/src/main/java/com/supervision/police/service/impl/RecordSplitTypeServiceImpl.java index 6ae390d..6b00c5c 100644 --- a/src/main/java/com/supervision/police/service/impl/RecordSplitTypeServiceImpl.java +++ b/src/main/java/com/supervision/police/service/impl/RecordSplitTypeServiceImpl.java @@ -1,8 +1,12 @@ package com.supervision.police.service.impl; +import cn.hutool.core.collection.ConcurrentHashSet; +import cn.hutool.core.util.StrUtil; import com.supervision.police.domain.ModelRecordType; import com.supervision.police.domain.NoteRecordSplit; +import com.supervision.police.domain.TripleInfo; import com.supervision.police.service.CaseTaskRecordService; +import com.supervision.police.service.ExtractTripleInfoService; import com.supervision.police.service.NoteRecordSplitService; import com.supervision.police.service.RecordSplitTypeService; import com.supervision.springaidemo.dto.QARecordNodeDTO; @@ -13,8 +17,15 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.ai.ollama.OllamaChatClient; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; -import java.util.List; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; @Service @Slf4j @@ -25,23 +36,73 @@ public class RecordSplitTypeServiceImpl implements RecordSplitTypeService { private final NoteRecordSplitService noteRecordSplitService; + private final ExtractTripleInfoService extractTripleInfoService; + + private final ConcurrentHashSet<String> recordSplitIdSet = new ConcurrentHashSet<String>(); + @Async @Override - public void type(List<ModelRecordType> allTypeList, QARecordNodeDTO qa, NoteRecordSplit noteRecord){ - // 这里线程休眠1秒,因为首先报保证消息记录能够插入完成,插入完成之后,再去提交大模型,让大模型去分类.防止分类太快,分类结果出来了,插入还没有插入完成 + @Transactional(transactionManager = "dataSourceTransactionManager", rollbackFor = Exception.class) + public void type(List<ModelRecordType> allTypeList, List<NoteRecordSplit> splitList) { + // 这里线程休眠1秒,因为首先报保证消息记录能够插入完成,插入完成之后,再去提交大模型,让大模型去分类.防止分类太快,分类结果出来了,插入还没有插入完成 try { Thread.sleep(1000); - }catch (Exception e){ - log.error("线程休眠失败"); + } catch (Exception e) { + log.error("分类任务线程休眠失败"); + } + List<Future<String>> futures = new ArrayList<>(); + for (NoteRecordSplit recordSplit : splitList) { + // 进行分类 + log.info("分类任务提交线程池进行分类"); + RecordSplitTypeThread recordSplitTypeThread = new RecordSplitTypeThread(allTypeList, recordSplit, chatClient, noteRecordSplitService); + // 分类之后的id + Future<String> afterTypeSplitIdFuture = RecordSplitTypeThreadPool.recordSplitTypeExecutor.submit(recordSplitTypeThread); + futures.add(afterTypeSplitIdFuture); + log.info("分类任务线程池提交分类成功"); } - // 首先创建一个提取任务 - - // 进行分类 - log.info("提交线程池进行分类"); - RecordSplitTypeThread recordSplitTypeThread = new RecordSplitTypeThread(allTypeList, qa, chatClient, noteRecordSplitService, noteRecord); - RecordSplitTypeThreadPool.recordSplitTypeExecutor.submit(recordSplitTypeThread); - log.info("线程池提交分类成功"); - // 这里应该对分类任务的执行过程进行监控,分类结束之后,才能提取三元组的关系.问了产品,暂时先不做,等后面在考虑 + // 如果分类完成了,那么就去提取三元组 + AtomicInteger atomicInteger = new AtomicInteger(0); + while (futures.size() > 0) { + Iterator<Future<String>> iterator = futures.iterator(); + while (iterator.hasNext()) { + Future<String> future = iterator.next(); + try { + // 如果分类成功,就开始提取三元组 + if (future.isDone()) { + String afterTypeSplitId = future.get(); + if (StrUtil.isNotBlank(afterTypeSplitId)) { + Optional<NoteRecordSplit> optById = noteRecordSplitService.getOptById(afterTypeSplitId); + if (optById.isPresent()) { + NoteRecordSplit recordSplit = optById.get(); + extractTripleInfoService.extractTripleInfo(recordSplit.getCaseId(), recordSplit.getPersonName(), afterTypeSplitId); + } + } + iterator.remove(); + } + } catch (Exception e) { + log.info("分类任务从线程中获取任务失败"); + iterator.remove(); + } + } + try { + int currentCount = atomicInteger.incrementAndGet(); + if (currentCount > 1000) { + log.info("分类任务执行超时,遍历任务已执行:{}次,任务还剩余:{}个,不再继续执行", currentCount, futures.size()); + // 将还在执行的线程中断 + futures.forEach(future -> { + future.cancel(true); + }); + break; + } + log.info("分类任务已检查{}遍,任务剩余{}个,休眠5s后继续检查", currentCount, futures.size()); + Thread.sleep(1000 * 5); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + log.info("分类任务执行完毕"); + // 分类任务执行完成之后,就将任务进行更新 + } } diff --git a/src/main/java/com/supervision/thread/RecordSplitTypeThread.java b/src/main/java/com/supervision/thread/RecordSplitTypeThread.java index 16664dd..6db7712 100644 --- a/src/main/java/com/supervision/thread/RecordSplitTypeThread.java +++ b/src/main/java/com/supervision/thread/RecordSplitTypeThread.java @@ -29,26 +29,23 @@ import java.util.stream.Collectors; * 笔录分类线程 */ @Slf4j -public class RecordSplitTypeThread implements Callable<Boolean> { +public class RecordSplitTypeThread implements Callable<String> { private final List<ModelRecordType> allTypeList; - private final QARecordNodeDTO qa; + private final NoteRecordSplit noteRecordSplit; private final OllamaChatClient chatClient; private final NoteRecordSplitService noteRecordSplitService; - private final NoteRecordSplit noteRecord; - - public RecordSplitTypeThread(List<ModelRecordType> allTypeList, QARecordNodeDTO qa, OllamaChatClient chatClient, NoteRecordSplitService noteRecordSplitService, NoteRecordSplit noteRecord) { + public RecordSplitTypeThread(List<ModelRecordType> allTypeList, NoteRecordSplit noteRecordSplit, OllamaChatClient chatClient, NoteRecordSplitService noteRecordSplitService) { this.allTypeList = allTypeList; - this.qa = qa; this.chatClient = chatClient; this.noteRecordSplitService = noteRecordSplitService; - this.noteRecord = noteRecord; + this.noteRecordSplit = noteRecordSplit; } private static final String TYPE_TEMPLATE = """ @@ -90,7 +87,7 @@ public class RecordSplitTypeThread implements Callable<Boolean> { private static final String TYPE_CONTEXT_TEMPLATE = "{分类type:{type},区别点(分类释义):{typeExt}}"; @Override - public Boolean call() throws Exception { + public String call() throws Exception { String type; try { StopWatch stopWatch = new StopWatch(); @@ -103,8 +100,8 @@ public class RecordSplitTypeThread implements Callable<Boolean> { // 开始对笔录进行分类 Map<String, String> paramMap = new HashMap<>(); paramMap.put("typeContext", CollUtil.join(typeContextList, ";")); - paramMap.put("question", qa.getQuestion()); - paramMap.put("answer", qa.getAnswer()); + paramMap.put("question", noteRecordSplit.getQuestion()); + paramMap.put("answer", noteRecordSplit.getAnswer()); Prompt prompt = new Prompt(new UserMessage(StrUtil.format(NEW_TEMPLATE, paramMap))); stopWatch.start(); log.info("开始分析:"); @@ -112,7 +109,7 @@ public class RecordSplitTypeThread implements Callable<Boolean> { stopWatch.stop(); log.info("耗时:{}", stopWatch.getTotalTimeSeconds()); String content = call.getResult().getOutput().getContent(); - log.info("问:{}, 答:{}", qa.getQuestion(), qa.getAnswer()); + log.info("问:{}, 答:{}", noteRecordSplit.getQuestion(), noteRecordSplit.getAnswer()); log.info("分析的结果是:{}", content); TypeResultDTO result = JSONUtil.toBean(content, TypeResultDTO.class); List<TypeNodeDTO> typeList = result.getResult(); @@ -129,8 +126,8 @@ public class RecordSplitTypeThread implements Callable<Boolean> { log.error("分类任务执行失败:{}", e.getMessage(), e); type = "无"; } - noteRecordSplitService.lambdaUpdate().set(NoteRecordSplit::getRecordType, type).eq(NoteRecordSplit::getId, noteRecord.getId()).update(); - return true; + noteRecordSplitService.lambdaUpdate().set(NoteRecordSplit::getRecordType, type).eq(NoteRecordSplit::getId, noteRecordSplit.getId()).update(); + return noteRecordSplit.getId(); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 000e4cf..63243aa 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -4,15 +4,15 @@ spring: ai: # 文档地址 https://docs.spring.io/spring-ai/reference/1.0-SNAPSHOT/api/chat/ollama-chat.html ollama: -# base-url: http://113.128.242.110:11434 - base-url: http://192.168.10.70:11434 + base-url: http://113.128.242.110:11434 +# base-url: http://192.168.10.70:11434 # base-url: http://124.220.94.55:8060 chat: enabled: true options: #model: qwen2:7b - model: llama3-chinese:8b - # model: qwen2:72b + #model: llama3-chinese:8b + model: qwen2:72b # 控制模型在请求后加载到内存中的时间(稍微长一点的时间,避免重复加载浪费性能,加快处理速度) keep_alive: 30m # 例如0.3 From 0e8e3389d0d36a405c0b875bc50c62431cb53d98 Mon Sep 17 00:00:00 2001 From: liu <liujiatong112@163.com> Date: Wed, 31 Jul 2024 15:44:48 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../police/service/impl/ModelRecordTypeServiceImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java b/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java index 3427176..c3d2b33 100644 --- a/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java +++ b/src/main/java/com/supervision/police/service/impl/ModelRecordTypeServiceImpl.java @@ -55,10 +55,10 @@ public class ModelRecordTypeServiceImpl extends ServiceImpl<ModelRecordTypeMappe private final NotePromptTypeRelService notePromptTypeRelService; private final CaseTaskRecordService caseTaskRecordService; - - private final NoteRecordSplitService noteRecordSplitService; - - private final RecordSplitProcessService recordSplitProcessService; + @Autowired + private NoteRecordSplitService noteRecordSplitService; + @Autowired + private RecordSplitProcessService recordSplitProcessService; @Autowired private ModelRecordTypeService modelRecordTypeService;