夜航星
Lila's dream
Lila's dream
Published on 2025-05-01 / 54 Visits
0
0

LangChain4j实现RAG知识库

#AI

一、LangChain4j实现RAG知识库

Gitee同步(可以加载图片):https://gitee.com/rickkkFang/my-notes/blob/master/72.AI/Learning.md 文章参考:https://javaai.pig4cloud.com/docs/

RAG:Retrieval Augmented Generation(检索增强生成)

1、RAG基础

1.1 什么是 RAG?

检索增强生成(Retrieval-Augmented Generation,简称RAG)是一种结合大型语言模型(LLM)和外部知识库的技术,旨在提高生成文本的准确性和相关性。以下是对RAG的通俗介绍: RAG的基本概念 RAG的核心思想是通过引入外部知识源来增强LLM的输出能力。传统的LLM通常基于其训练数据生成响应,但这些数据可能过时或不够全面。RAG允许模型在生成答案之前,从特定的知识库中检索相关信息,从而提供更准确和上下文相关的回答

1.2 RAG 流程

RAG 过程分为两个阶段:索引(indexing)和检索(retrieval)。LangChain4j 提供了这两个阶段的工具。

索引阶段

在索引阶段,文档经过预处理以便在检索阶段进行高效搜索。对于向量搜索,通常包括清理文档、添加额外数据和元数据、将文档分割为小段(分块),将这些段落嵌入为向量,并存储在嵌入存储(向量数据库)中。

索引通常是离线进行的,可以通过定期任务(如每周重建索引)完成。

image-20250507215738552

  1. 用户上传文档 → 文本分割为段落

  2. 嵌入模型生成段落向量 → 存入Milvus等向量数据库

  3. 搜索时,查询文本被转为向量 → Milvus返回最相似段落

检索阶段

检索阶段通常是在线进行的,当用户提交问题时,从索引的文档中寻找相关信息。对于向量搜索来说,这通常包括将用户的查询嵌入为向量,并在嵌入存储中执行相似性搜索,找到相关的段落并将它们注入提示中,再发送给 LLM。

image-20250507215812538

1.3 Easy RAG

LangChain4j 提供了”Easy RAG”功能,旨在让你快速上手 RAG,无需学习嵌入、选择向量存储或如何解析和分割文档等复杂步骤。只需指向你的文档,LangChain4j 将为你处理大部分细节。

1.导入依赖:

<dependency>
    <groupId>dev.langchain4j</groupId>
    <artifactId>langchain4j-easy-rag</artifactId>
</dependency>

2.配置存储和Ai service

/**
    * 嵌入存储 (简易内存存储)
    *
    * @return {@link InMemoryEmbeddingStore }<{@link TextSegment }>
    */
@Bean
public InMemoryEmbeddingStore<TextSegment> embeddingStore() {
    return new InMemoryEmbeddingStore<>();
}
​
@Bean
public ChatAssistant assistant(ChatLanguageModel chatLanguageModel, EmbeddingStore<TextSegment> embeddingStore) {
    return AiServices.builder(ChatAssistant.class)
            .chatLanguageModel(chatLanguageModel)
            .chatMemory(MessageWindowChatMemory.withMaxMessages(10))
            .contentRetriever(EmbeddingStoreContentRetriever.from(embeddingStore))
            .build();
}

3. 向量存储文档

Document document = FileSystemDocumentLoader.loadDocument("/Users/lengleng/Downloads/test.docx");
EmbeddingStoreIngestor.ingest(document,embeddingStore);

4. 测提提问

String result = assistant.chat("合同总金额");

进阶配置

/**
 * 这个类负责将文档摄入嵌入存储。
 * 它使用各种组件来转换、分割和嵌入文档,然后存储它们。
 */
public class EmbeddingStoreIngestor {
    // 将输入文档转换为适合处理的格式
    private final DocumentTransformer documentTransformer;
    // 将文档分割成更小的段落
    private final DocumentSplitter documentSplitter;
    // 转换单个文本段落(例如,用于标准化或清理)
    private final TextSegmentTransformer textSegmentTransformer;
    // 为文本段落生成嵌入向量
    private final EmbeddingModel embeddingModel;
    // 存储生成的嵌入向量及其对应的文本段落
    private final EmbeddingStore<TextSegment> embeddingStore;
}
​
// 使用构建器模式的示例
EmbeddingStoreIngestor
    .builder()
    .documentTransformer()  // 设置文档转换器
    .documentSplitter()     // 设置文档分割器
    .embeddingModel()       // 设置嵌入模型
    .embeddingStore()       // 设置嵌入存储
    .build()                // 构建EmbeddingStoreIngestor实例
    .ingest(document);      // 将文档摄入嵌入存储

1.4 RAG API基础

LangChain4j 是一个强大的 Java 库,提供了丰富的 API 来简化构建自定义 RAG(检索增强生成)管道的过程。本指南将详细介绍 LangChain4j 的主要组件和 API,从简单到复杂的多种管道实现。

image-20250508001550327

1)核心概念

1.Document (文档)Document 类表示一个完整的文档,例如单个 PDF 文件或网页。

主要方法:

  • Document.text(): 返回文档的文本内容

  • Document.metadata(): 返回文档的元数据

  • Document.toTextSegment(): 将 Document 转换为 TextSegment

  • Document.from(String, Metadata): 从文本和元数据创建 Document

  • Document.from(String): 从文本创建不带元数据的 Document

2.Metadata (元数据)Metadata 存储文档的额外信息,如名称、来源、最后更新时间等。

主要方法:

  • Metadata.from(Map): 从 Map 创建 Metadata

  • Metadata.put(String key, String value): 添加元数据条目

  • Metadata.getString(String key) / getInteger(String key): 获取指定类型的元数据值

  • Metadata.containsKey(String key): 检查是否包含指定键

  • Metadata.remove(String key): 移除指定键的元数据条目

  • Metadata.copy(): 复制元数据

  • Metadata.toMap(): 将元数据转换为 Map

3.TextSegment (文本片段)TextSegment 表示文档的一个片段,专用于文本信息。

主要方法:

  • TextSegment.text(): 返回文本片段的内容

  • TextSegment.metadata(): 返回文本片段的元数据

  • TextSegment.from(String, Metadata): 从文本和元数据创建 TextSegment

  • TextSegment.from(String): 从文本创建不带元数据的 TextSegment

4.Embedding (嵌入)Embedding 类封装了一个数值向量,表示嵌入内容的语义含义。

主要方法:

  • Embedding.dimension(): 返回嵌入向量的维度

  • CosineSimilarity.between(Embedding, Embedding): 计算两个嵌入向量的余弦相似度

  • Embedding.normalize(): 对嵌入向量进行归一化

2)文档处理

1.Document Loader (文档加载器)

LangChain4j 提供了多种文档加载器:

  • FileSystemDocumentLoader: 从文件系统加载文档

  • UrlDocumentLoader: 从 URL 加载文档

  • AmazonS3DocumentLoader: 从 Amazon S3 加载文档

  • AzureBlobStorageDocumentLoader: 从 Azure Blob 存储加载文档

  • GitHubDocumentLoader: 从 GitHub 仓库加载文档

  • TencentCosDocumentLoader: 从腾讯云 COS 加载文档

2.Document Parser (文档解析器)

用于解析不同格式的文档:

  • TextDocumentParser: 解析纯文本文件

  • ApachePdfBoxDocumentParser: 解析 PDF 文件

  • ApachePoiDocumentParser: 解析 MS Office 文件格式

  • ApacheTikaDocumentParser: 自动检测并解析几乎所有文件格式

示例:

// 加载单个文档
Document document = FileSystemDocumentLoader.loadDocument("/path/to/file.txt", new TextDocumentParser());
​
// 加载目录下的所有文档
List<Document> documents = FileSystemDocumentLoader.loadDocuments("/path/to/directory", new TextDocumentParser());

3.Document Transformer (文档转换器)

DocumentTransformer 用于对文档执行各种转换,如清理、过滤、增强或总结

目前提供的转换器:

  • HtmlToTextDocumentTransformer: 从 HTML 中提取文本内容和元数据

4.Document Splitter (文档拆分器)

用于将文档拆分成更小的片段,拆分结果为TextSegment

  • DocumentByParagraphSplitter: 按段落拆分

  • DocumentBySentenceSplitter: 按句子拆分

  • DocumentByWordSplitter: 按单词拆分

  • DocumentByCharacterSplitter: 按字符拆分

  • DocumentByRegexSplitter: 按正则表达式拆分

使用步骤:

  1. 实例化 DocumentSplitter 并指定 TextSegment 的大小和重叠字符数

  2. 调用 split(Document)splitAll(List<Document>) 方法

  3. DocumentSplitter 将文档拆分成小片段,并组合为 TextSegment

3)嵌入处理

1.Embedding Model (嵌入模型)

EmbeddingModel 接口表示一种将文本转换为 Embedding 的模型。

主要方法:

  • EmbeddingModel.embed(String): 嵌入指定文本

  • EmbeddingModel.embed(TextSegment): 嵌入指定 TextSegment

  • EmbeddingModel.embedAll(List<TextSegment>): 嵌入多个 TextSegment

  • EmbeddingModel.dimension(): 返回嵌入向量的维度

2.Embedding Store (嵌入存储)

EmbeddingStore 接口表示一个嵌入存储库(向量数据库),用于存储和高效搜索相似的嵌入。

主要方法:

  • EmbeddingStore.add(Embedding): 添加嵌入并返回随机 ID

  • EmbeddingStore.addAll(List<Embedding>): 添加多个嵌入并返回随机 ID 列表

  • EmbeddingStore.search(EmbeddingSearchRequest): 搜索最相似的嵌入

  • EmbeddingStore.remove(String id): 删除指定 ID 的嵌入

3.Embedding Store Ingestor (嵌入存储摄取器)

EmbeddingStoreIngestor 负责将文档嵌入并存储到 EmbeddingStore 中。

示例:

EmbeddingStoreIngestor ingestor = EmbeddingStoreIngestor.builder()
    .embeddingModel(embeddingModel)
    .embeddingStore(embeddingStore)
    .build();
​
ingestor.ingest(document1);

可以通过指定 DocumentTransformerDocumentSplitter,在嵌入前对文档进行转换和拆分。

4)构建RAG管道

使用 LangChain4j 构建 RAG 管道的一般步骤:

  1. 加载文档: 使用适当的 DocumentLoaderDocumentParser 加载文档

  2. 转换文档: 使用 DocumentTransformer 清理或增强文档(可选)

  3. 拆分文档: 使用 DocumentSplitter 将文档拆分为更小的片段

  4. 嵌入文档: 使用 EmbeddingModel 将文档片段转换为嵌入向量

  5. 存储嵌入: 使用 EmbeddingStore 存储嵌入向量

  6. 检索相关内容: 根据用户查询,从 EmbeddingStore 检索最相关的文档片段

  7. 生成响应: 将检索到的相关内容与用户查询一起提供给语言模型,生成最终响应

5)最佳实践

  • 根据具体需求选择合适的文档拆分策略

  • 使用自定义的 DocumentTransformer 来清理和增强文档

  • 选择合适的嵌入模型和嵌入存储以平衡性能和准确性

  • 定期更新和维护嵌入存储,以确保信息的时效性

  • 对检索结果进行后处理,如重新排序或过滤,以提高相关性

LangChain4j 提供了构建高效 RAG 系统所需的全套工具。通过灵活组合这些组件,可以创建适合特定用例的自定义 RAG 管道。随着项目的发展,不断优化和调整各个组件,以提高系统的整体性能和准确性。

二、RAG源码级流程详解

AiChatController.msg 端点获取到前端的消息会建立 SSE(Server-Sent Events 实时数据推送) 双向请求链接

Server-Sent Events (SSE) 是HTML5引入的一种轻量级的服务器向浏览器客户端单向推送实时数据的技术。在Spring Boot框架中,我们可以很容易地集成并利用SSE来实现实时通信。

在Spring Boot项目中,无需额外引入特定的依赖,因为Spring Web MVC模块已经内置了对SSE的支持。

SSE主要流程:1、创建SSE端点 2、通过端点发送事件 3、关闭端点连接

AiChatController.msg 是整个聊天流程的入口点,它接收前端传来的消息 key,并建立 Server-Sent Events (SSE) 连接,实现服务器向客户端的实时推送。

0、构建知识库和文档

  • 创建知识库

    包含知识库的一些基本配置:关联向量库、知识数量、大小、支持多轮会话、匹配率、匹配条数、默认分片长度、是否使用数据标注、会话是否提前压缩、嵌入模型、重排模型、摘要模型

  • 知识库关联文档

    支持文档格式:ISSUE、Q&A、文本录入、文件、图片(视频TODO)

    分片算法:智能分片、段落分片、句子分片、字符分片

    分片值:指每个分片里面字符总数量 重叠值:指分片之间的重叠大小,避免分割丢失上下文

  • 分片处理

    @Override
    public void handle(AiDocumentDTO aiDocumentDTO) {
        log.info("======= AI 正在处理文档:{} =======", aiDocumentDTO.getName());
        // 数据库保存文档
        AiDocumentEntity documentEntity = saveDocument(aiDocumentDTO);
        try {
            saveSlice(documentEntity, aiDocumentDTO);
        } catch (Exception e) {
            documentEntity.setSliceStatus(SliceStatusEnums.FAILED.getStatus());
            documentEntity.setSliceFailReason(e.getMessage());
            SpringUtil.getBean(AiDocumentService.class).updateById(documentEntity);
            return;
        }
    ​
        // 摘要文档
        try {
            summaryDocument(documentEntity, aiDocumentDTO);
        } catch (Exception e) {
            documentEntity.setSummaryStatus(SliceStatusEnums.FAILED.getStatus());
            documentEntity.setSummaryFailReason(e.getMessage());
            SpringUtil.getBean(AiDocumentService.class).updateById(documentEntity);
            return;
        }
    ​
        // 向量化切片
        embedSlice(documentEntity);
        log.info("======= AI 文档:{} 处理介绍 =======", aiDocumentDTO.getName());
    }
  • 保存切片

    @Override
    @SneakyThrows
    public void saveSlice(AiDocumentEntity documentEntity, AiDocumentDTO documentDTO) {
        Response response = remoteFileService.getFile(documentEntity.getFileUrl());
        String extName = FileUtil.extName(documentEntity.getFileUrl());
        // 查询知识库配置
        AiDatasetEntity aiDatasetEntity = aiDatasetMapper.selectById(documentEntity.getDatasetId());
    // 从所有文件处理器中筛选出能处理当前文件的实现,优先级最高的文件解析处理器
        UploadFileParseHandler uploadFileParseHandler = parseHandlerList.stream()
                .filter(handler -> handler.supports(aiDatasetEntity, documentEntity))
                .max(Comparator.comparingInt(Ordered::getOrder))
                .get();
    // langchain4j的file转为字符串方法
    // Pair 是Hutool提供的 ••二元组容器••,用于临时存储两个关联值
        Pair<FileParserStatusEnums, String> parserFile = uploadFileParseHandler.file2String(documentEntity,
                response.body().asInputStream(), extName);
    ​
        // 保存结果
        saveResult(uploadFileParseHandler, parserFile, documentEntity, aiDatasetEntity, documentDTO);
    }
  • 保存解析结果到数据库

    /**
     * 保存解析结果到数据库
     *
     * @param uploadFileParseHandler 上传文件解析处理程序
     * @param parserFile             文件解析状态和内容的键值对
     * @param documentEntity         文档实体对象
     * @param aiDatasetEntity        AI数据集实体对象
     * @param documentDTO            文档数据传输对象
     */
    public void saveResult(UploadFileParseHandler uploadFileParseHandler,
                           Pair<FileParserStatusEnums, String> parserFile, AiDocumentEntity documentEntity,
                           AiDatasetEntity aiDatasetEntity, AiDocumentDTO documentDTO) {
        // 解析失败
        if (FileParserStatusEnums.PARSE_FAIL.equals(parserFile.getKey())) {
            log.warn("文件:{}  解析失败", documentEntity.getName());
            documentEntity.setSliceStatus(SliceStatusEnums.FAILED.getStatus());
            documentEntity.setSliceFailReason(parserFile.getValue());
            documentMapper.updateById(documentEntity);
            return;
        }
    ​
        // 解析中
        if (FileParserStatusEnums.OCR_PARSING.equals(parserFile.getKey())) {
            documentEntity.setSliceStatus(SliceStatusEnums.OCR_PARSING.getStatus());
            documentEntity.setSliceFailReason(parserFile.getValue());
            documentMapper.updateById(documentEntity);
            return;
        }
    ​
        if (FileParserStatusEnums.AI_PARSING.equals(parserFile.getKey())) {
            documentEntity.setSliceStatus(SliceStatusEnums.AI_PARSING.getStatus());
            // 解析中,失败原因是 具体的batch ID
            documentEntity.setSliceFailReason(parserFile.getValue());
            documentMapper.updateById(documentEntity);
            return;
        }
    ​
        // 清洗数据
        String cleanResultData = uploadFileParseHandler.cleanData(parserFile.getValue());
    ​
        // 拆分数据,根据不同的分片算法选择不同的拆分器进行数据的拆分
        List<String> contentList = uploadFileParseHandler.splitData(documentEntity, aiDatasetEntity, cleanResultData, documentDTO);
        if (CollUtil.isEmpty(contentList) || StrUtil.isBlank(contentList.get(0))) {
            log.warn("文件:{}  解析失败", documentEntity.getName());
            documentEntity.setSliceStatus(SliceStatusEnums.FAILED.getStatus());
    ​
            if (StrUtil.isBlank(documentEntity.getSliceFailReason())) {
                documentEntity.setSliceFailReason("文件解析失败,资源不包含文本内容");
            }
    ​
            documentMapper.updateById(documentEntity);
            return;
        }
    ​
        for (String content : contentList) {
            if (StrUtil.isBlank(content)) {
                continue;
            }
            AiSliceEntity slice = new AiSliceEntity();
            slice.setDocumentId(documentEntity.getId());
            slice.setContent(content);
            slice.setName(documentEntity.getName());
            slice.setSliceStatus(SliceStatusEnums.UNSLICED.getStatus());
            slice.setCharCount((long) content.length());
            slice.setCreateBy(documentEntity.getCreateBy());
            slice.setCreateTime(LocalDateTime.now());
            sliceMapper.insert(slice);
        }
    ​
        documentEntity.setSummaryStatus(SummaryStatusEnums.UNSUMMARY.getStatus());
        documentEntity.setSliceStatus(SliceStatusEnums.SLICED.getStatus());
        documentMapper.updateById(documentEntity);
    }
  • 定时进行文档总结处理

    文档总结作用:减少token消耗、避免切片过于碎片化、提升检索质量、避免信息过载、支持多轮推理

    public void summaryDocument(AiDocumentEntity documentEntity) {
        AiDatasetEntity aiDataset = datasetMapper.selectById(documentEntity.getDatasetId());
        AiNoMemoryStreamAssistantService memoryStreamAssistantService = modelProvider
                .getAiNoMemoryStreamAssistant(aiDataset.getSummaryModel())
                .getValue();
    ​
        // 如果未开启文档总结,则跳过
        if (YesNoEnum.NO.getCode().equals(aiDataset.getPreSummary())) {
            return;
        }
    ​
        // 如果是 QA 文档则跳过总结
        if (SourceTypeEnums.QA.getType().equals(documentEntity.getSourceType())) {
            return;
        }
    ​
        // 查询文档下所有数据,如果文档下没有未切片的数据,则跳过
        List<AiSliceEntity> sliceEntityList = sliceService
                .list(Wrappers.<AiSliceEntity>lambdaQuery().eq(AiSliceEntity::getDocumentId, documentEntity.getId()));
    ​
        if (sliceEntityList.isEmpty()) {
            return;
        }
    ​
        // 当前文档下的所有未训练数据,拼接成一个文档
        String documentContent = sliceEntityList.stream()
                .map(AiSliceEntity::getContent)
                .collect(Collectors.joining(StrUtil.LF));
    ​
        try {
            Mono<String> resultFlux = memoryStreamAssistantService.chat(PromptBuilder.render("knowledge-rag-summary.st",
                            Map.of(AiDocumentEntity.Fields.summary,
                                    StrUtil.subSufByLength(documentContent, knowledgeProperties.getMaxSummary()))))
                    .reduce(StrUtil.EMPTY, (acc, value) -> acc + value);
    ​
            // 针对R1 模型特殊处理 思维链删掉
            String replacedAll = resultFlux.block().replaceAll("<think>[\\s\\S]*?</think>", StrUtil.EMPTY);
            documentEntity.setSummary(documentEntity.getName() + replacedAll);
            documentEntity.setSummaryStatus(SummaryStatusEnums.SUMMARYED.getStatus());
        } catch (Exception e) {
            log.warn("文档 {} 总结失败", documentEntity.getName(), e);
            documentEntity.setSummaryFailReason(e.getMessage());
            documentEntity.setSummaryStatus(SummaryStatusEnums.FAILED.getStatus());
        }
    ​
        baseMapper.updateById(documentEntity);
    }
  • 切片内容向量化

    /**
     * 嵌入切片
     *
     * @param documentEntity   Document 实体
     * @param sliceStatusEnums 切片状态枚举
     */
    @Override
    public void embedSlice(AiDocumentEntity documentEntity, SliceStatusEnums sliceStatusEnums) {
        List<AiSliceEntity> aiSliceEntityList = this.list(Wrappers.<AiSliceEntity>lambdaQuery()
                .eq(AiSliceEntity::getDocumentId, documentEntity.getId())
                .eq(AiSliceEntity::getSliceStatus, sliceStatusEnums.getStatus()));
    ​
        for (AiSliceEntity slice : aiSliceEntityList) {
            if (StrUtil.isBlank(slice.getContent())) {
                handleEmptyContent(slice);
                continue;
            }
    ​
            // 删除脏数据
            AiDatasetEntity aiDataset = aiDatasetMapper.selectById(documentEntity.getDatasetId());
            if (Objects.isNull(aiDataset)) {
                documentMapper.deleteById(documentEntity.getId());
                continue;
            }
    ​
            if (shouldSkipSummary(aiDataset, documentEntity)) {
                continue;
            }
    ​
            if (StrUtil.isNotBlank(slice.getQdrantId())) {
                removeExistingEmbedding(aiDataset, slice);
            }
    ​
            try {
                String qdrantId = buildSlice(aiDataset, slice, documentEntity);
                slice.setSliceStatus(SliceStatusEnums.SLICED.getStatus());
                slice.setQdrantId(qdrantId);
            } catch (Exception e) {
                log.warn("切片 {} 训练失败,等待定时任务处理", slice.getName(), e);
                slice.setSliceStatus(SliceStatusEnums.FAILED.getStatus());
            }
            this.updateById(slice);
        }
    }

1、创建对外连接

/**
 * 创建对外连接
 *
 * @param chatMessageDTO 消息内容体
 * @return R<uuid>
 */
@Inner(value = false)
@PostMapping("/create")
public R<String> createPublicConnection(@Valid @RequestBody ChatMessageDTO chatMessageDTO) {
    return chatService.saveConnectionParams(chatMessageDTO);
}

2、发起聊天 后端入口

实际创建sse链接调用大模型的入口 依赖于上文的createConnection ,所以即使此接口设置暴露,没有message key也无法调用。

@GetMapping 配合 produces = MediaType.TEXT_EVENT_STREAM_VALUE 用于实现 Server-Sent Events (SSE) 实时数据推送

@Inner(value = false)
@GetMapping(value = "/msg/list", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<AiMessageResultDTO> msg(@RequestParam Long key) {
    try {
        return chatService.chatList(key).concatWithValues(new AiMessageResultDTO("[DONE]"));
    }
    catch (Exception e) {
        log.error("chat error", e);
        return Flux.just(new AiMessageResultDTO(e.getMessage())).concatWithValues(new AiMessageResultDTO("[DONE]"));
    }
}

Mono和Flux核心概念对比

特性

Mono

Flux

数据量

0或1个元素

0~N个元素(无限流可能)

典型场景

单次返回(如HTTP GET请求)

流式返回(如SSE、实时数据流)

类比传统对象

Optional<T>Future<T>

List<T>Stream<T>

空值处理

Mono.empty() 表示空值

Flux.empty() 表示空流

选择时机

1、查询数据库单条记录 2、保存实体到数据库(返回保存结果) 3、HTTP请求的单个响应

1、返回实时数据流 2、批量查询数据库结果 3、Server-Sent Events (SSE)

  • 2.1 首先根据key查找消息聊天记录,第一次发起聊天时会默认创建消息聊天记录

  • 2.2 基于线程局部变量(ThreadLocal)的工具类,用于在多线程环境中传递和存储聊天消息的上下文信息

  • 2.3 执行风控逻辑flowRisk:

    // 如果开启了规则引擎
    if (flowExecutorOptional.isPresent()) {
        Flux<AiMessageResultDTO> aiMessageResultDTO = flowRisk(chatMessageDTO);
        if (aiMessageResultDTO != null)
            return aiMessageResultDTO;
    }

    风控逻辑主要包括:

    • 敏感词检测:检测用户输入是否包含敏感词

    • 限流控制:根据调用方式不同采取不同的限流策略

      • 内部调用:按用户名 + 总量控制

      • 外部调用:按 IP+ 总量控制

  • 2.4 自动装配聊天规则映射(根据请求类型匹配处理规则):基于@Autowired或构造函数需要Map<String, ChatRule>时,会自动注入private final Map<String, ChatRule> chatRuleMap; Spring会自动将所有实现ChatRule接口的Bean收集为Map,键为Bean名称,值为实例。

    ChatTypeEnums chatTypeEnums = ChatTypeEnums.fromCode(chatMessageDTO.getDatasetId());
    ChatMessageContextHolder.set(chatMessageDTO);
    return chatRuleMap.get(chatTypeEnums.getType()).process(chatMessageDTO);

    系统根据 datasetId 确定请求消息类型,主要包括:

    Chat Type

    Code

    Description

    FUNCTION_CHAT

    -1L

    功能聊天

    SIMPLE_CHAT

    0L

    简单聊天

    DATABASE_CHAT

    -2L

    数据库聊天

    IMAGE_CHAT

    -3L

    生成图片

    MARKMAP_CHAT

    -4L

    生成脑图

    FLOW_CHAT

    -5L

    编排

    JSON_CHAT

    -6L

    JSON 聊天

    REASON_CHAT

    -7L

    推理聊天

    VECTOR_CHAT

    1L

    知识库聊天

  • 2.5 多个请求类型的实现类通过策略模式来实现同一接口,运行时动态选择

    public interface ChatRule {
        /**
         * 处理聊天信息
         *
         * @param chatMessageDTO 聊天上文
         * @return flux stream
         */
        default Flux<AiMessageResultDTO> process(ChatMessageDTO chatMessageDTO) {
            return Flux.empty();
        }
    }

3、知识库聊天处理流程(VectorChatRule)

@Override
public Flux<AiMessageResultDTO> process(ChatMessageDTO chatMessageDTO) {
    AiDatasetEntity dataset = aiDatasetService.getById(chatMessageDTO.getDatasetId());
    DimensionAwareEmbeddingModel embeddingModel = modelProvider.getEmbeddingModel(dataset.getEmbeddingModel());
    Embedding queryEmbedding = embeddingModel.embed(chatMessageDTO.getContent()).content();
    
    // 使用标注数据处理结果
    if (YesNoEnum.YES.getCode().equals(dataset.getStandardFlag())) {
        Flux<AiMessageResultDTO> q2qFluxResult = q2QStandardRagChatHandler
            .process(queryEmbedding, dataset, chatMessageDTO)
            .cache();
        // 如果 q2qFluxResult 不是 empty 则直接返回,如果是 empty 则继续执行 q2AVectorRagChatHandler
        return q2qFluxResult
            .switchIfEmpty(q2AVectorRagChatHandler.process(queryEmbedding, dataset, chatMessageDTO));
    }
    
​
    return q2AVectorRagChatHandler.process(queryEmbedding, dataset, chatMessageDTO);
}

处理流程包括:

  1. 获取知识库数据集信息

  2. 获取嵌入模型并将用户问题转换为向量

  3. 根据知识库配置决定处理方式:

    • 如果启用了标准问答(standardFlag=YES),先尝试问题匹配(Q2QStandardRagChatHandler)

    • 如果问题匹配无结果,或未启用标准问答,则使用答案匹配(Q2AVectorRagChatHandler)

3.1 向量搜索处理 (Q2AVectorRagChatHandler)

public Flux<AiMessageResultDTO> process(Embedding embeddedList, AiDatasetEntity dataset,
        ChatMessageDTO chatMessageDTO) {
    double minScore = NumberUtil.div(Double.parseDouble(dataset.getScore().toString()), Double.parseDouble("100"),
            2);
​
    EmbeddingSearchRequest embeddingSearchRequest = EmbeddingSearchRequest.builder()
        .queryEmbedding(embeddedList)
        .maxResults(dataset.getTopK())
        .filter(metadataKey(AiDocumentEntity.Fields.datasetId).isEqualTo(dataset.getId().toString())
            .and(metadataKey(DocumentTypeEnums.Fields.type).isEqualTo(DocumentTypeEnums.ANSWER.getType())))
        .minScore(minScore)
        .build();
​
    EmbeddingSearchResult<TextSegment> searchResult = embeddingStoreService
        .embeddingStore(dataset.getCollectionName())
        .search(embeddingSearchRequest);
    List<EmbeddingMatch<TextSegment>> embeddingMatchList = searchResult.matches();
​
    // 未匹配
    if (CollUtil.isEmpty(embeddingMatchList)) {
        return Flux.just(new AiMessageResultDTO(dataset.getEmptyDesc()));
    }
​
    // 更新命中次数
    List<String> embeddingIdList = embeddingMatchList.stream().map(EmbeddingMatch::embeddingId).toList();
    aiSliceService.updateHitCount(embeddingIdList);
    
    // 对向量结果进行总结
    Flux<AiMessageResultDTO> aiMessageResultDTOFlux = summaryResult(dataset, chatMessageDTO,
            embeddingMatchList.stream().map(EmbeddingMatch::embedded).map(TextSegment::text).toList())
        .cache();
​
    // 修改 map 逻辑在最后拼接一下参考资料
    AiMessageResultDTO aiMessageResultDTO = new AiMessageResultDTO();
    aiMessageResultDTO.setMessage(StrUtil.EMPTY);
    List<AiMessageResultDTO.ExtLink> extLinks = buildExtMessage(embeddingMatchList);
    aiMessageResultDTO.setExtLinks(extLinks);
    return aiMessageResultDTOFlux.concatWithValues(aiMessageResultDTO);
}

向量搜索处理流程:

  1. 根据知识库配置设置最小相似度分数

  2. 构建向量搜索请求,包括:

    • 查询向量

    • 最大结果数

    • 过滤条件(数据集 ID 和文档类型)

    • 最小分数

  3. 执行向量搜索

  4. 处理搜索结果:

    • 如果没有匹配结果,返回知识库配置的空结果描述

    • 如果有匹配结果,更新命中次数

  5. 调用大模型对搜索结果进行总结(summaryResult)

  6. 构建参考资料链接

  7. 返回总结结果和参考资料

4、结果总结处理

结果总结处理流程:

  1. 创建提示模板,加载系统提示(knowledge-system.st)

  2. 向模板添加参数:

    • 搜索结果内容

    • 用户问题

    • 空结果描述

  3. 获取大模型服务

  4. 调用大模型进行聊天,生成总结

  5. 将总结结果转换为 AiMessageResultDTO 对象并返回


Comment