夜航星
Lila's dream
Lila's dream
Published on 2025-04-01 / 3 Visits
0
0

工作流AI模型编排实现

#AI

个人学习使用,参考PIGX-AI:https://ai.wiki.pig4cloud.com/doc/ai/workflow

Gitee同步:https://gitee.com/rickkkFang/my-notes/tree/master/72.AI

工作流智能体编排

工作流是通过将复杂的任务分解成较小的步骤(节点)降低系统复杂度,减少了对提示词技术和模型推理能力的依赖,提高了 LLM 面向复杂任务的性能,提升了系统的可解释性、稳定性和容错性。将声明不同的节点,并将节点串联成一个流程图。

智能体(Agent)是具备自主决策和执行能力的 AI 实体,通过感知环境、分析决策、执行动作的闭环机制完成特定任务。PIG AI 的智能体编排系统提供可视化流程设计器,支持多智能体协同工作流的构建与优化。

核心特征:

  • 原子化能力封装:将大模型能力封装为可复用的功能单元(LLM Function)

  • 可视化流程编排:通过拖拽方式构建多步骤工作流,支持条件分支、并行执行等复杂逻辑

  • 自动化批处理:支持批量数据输入和并行任务执行,显著提升处理效率

  • 质量校验机制:内置输出质量评估模块,通过二次验证确保结果可靠性

适用场景:

场景类型

典型应用

内容生产

多语言文档翻译、营销文案批量生成

数据分析

报表自动生成、数据洞察提取

流程自动化

邮件自动分类回复、工单智能派发

资源处理

图像批量标注、视频关键帧提取

1、关键概念

  • 节点

    节点是工作流的关键构成,通过连接不同功能的节点,执行工作流的一系列操作。工作流的核心节点包括开始节点、结束节点、LLM 节点、条件分支节点等,每个节点都有特定的功能和配置选项。

  • 变量

    变量用于串联工作流内前后节点的输入与输出,实现流程中的复杂处理逻辑,包含系统变量、环境变量和会话变量。变量作为一种动态数据容器,能够

    存储和传递不固定的内容,在不同的节点内被相互引用,实现信息在节点间的灵活通信。

  • 环境变量

    环境变量用于保护工作流内所涉及的敏感信息,例如运行工作流时所涉及的 API 密钥、数据库密码等。它们被存储在工作流程中,而不是代码中,以便在不同环境中共享。

    环境变量拥有以下特性:

    • 环境变量可在大部分节点内全局引用

    • 环境变量命名不可重复

    • 环境变量为只读变量,不可写入

2、工作流节点类型

  • 开始节点

    开始节点是 Chatflow / Workflow 应用中的关键预设节点。它提供了基本的初始信息,根据用户输入的参数以支持应用和后续工作流节点的正常流程。

    在开始节点的设置页面,你会找到:"变量列表"。用户调用发布的智能体编排的时候需要按照开始节点要求进行入参。

    • 输入字段

      输入字段由应用开发者配置,用于提示用户提供额外信息。例如,在周报应用中,可能需要用户提供姓名、工作日期范围和工作详情等背景信息,这些初步信息有助于 LLM 生成更高质量的回复。

  • 问题分类节点

    问题分类节点通过定义分类描述,使 LLM 能够根据用户输入选择与之相匹配的分类。这个节点特别适用于需要根据用户问题类型进行不同处理路径的场景。

    • 节点参数

      参数

      说明

      输入变量

      固定 arg1,对应选择上一节点传递的参数

      问题分类

      分类序号,分类描述(非常重要);大模型根据分类描述来判断问题分类节点走哪个需要的分支

    • 节点功能

      问题分类节点允许你定义多个分类,每个分类包含:

      • 分类名称:用于标识该分类的唯一名称

      • 分类描述:详细说明该分类所涵盖的问题类型和特征

      • 示例问题:提供该分类的典型问题示例,帮助 LLM 更准确地识别

      当用户提问时,LLM 会分析问题内容,并将其归类到最匹配的分类中,然后工作流可以根据分类结果执行不同的处理路径。

    • 使用场景

      使用场景

      说明

      客户服务

      将用户问题分类为产品咨询、技术支持、投诉建议等

      内容推荐

      根据用户兴趣分类,提供个性化内容推荐

      多功能助手

      根据用户意图分类,执行不同的功能模块

  • 分支节点

    条件分支节点允许你根据 if/else/elif 条件将工作流拆分成多个分支,实现复杂的逻辑控制。

    通过条件判断代码动态控制工作流分支走向,根据输入参数值返回对应的分支索引。以下示例演示如何根据用户输入参数判断执行路径:

    JavaScriptCopyfunction main(args) {
      // 条件判断:当输入参数等于 'ai' 时返回分支索引 0,否则返回分支索引 1
      return args.content === 'ai' ? 0 : 1;
    }
  • 代码执行节点

    代码执行节点支持运行 JS 代码以在工作流程中执行数据转换等自定义逻辑。它可以简化你的工作流程,适用于算术运算、JSON 转换、文本处理等情景。

    // 这里返回值必须是 map,才能被下游感知
    function main(args){
      // 可以通过 args.arg1 的形式获取上文参数
      return {
        arg1: args.arg1,
        arg2: args.arg2
      }
    }
  • LLM节点

    LLM 节点是工作流中的核心组件,用于调用大语言模型执行各种智能任务。通过简单的配置方式,你可以轻松地将大模型能力集成到工作流中,无需编写复杂代码。

    • 节点功能

      功能项

      说明

      模型选择

      可选择系统配置的各种大语言模型,如 qwen-max、GPT-4 等

      系统提示词

      设置模型的基础行为指导和角色定义

      用户提示词

      构建具体的任务指令,可引用工作流变量

      参数配置

      温度参数和最大输出长度控制

    • 使用场景

      使用场景

      说明

      内容生成

      撰写文章、报告、摘要等各类文本内容

      知识问答

      回答用户问题,提供专业领域知识

      数据分析

      解释数据趋势,生成数据分析报告

      决策辅助

      基于输入信息提供建议和决策支持

    • 变量引用

      在提示词中,你可以通过 ${变量名} 的方式引用工作流中的变量,实现动态提示词构建。

  • 数据库节点

    数据库操作节点允许工作流与数据库进行交互,执行查询、插入、更新和删除等操作。这个节点特别适用于需要持久化存储数据或从数据库检索信息的场景。

    • 节点功能

      这里可以直接选择开发平台维护的数据源,然后针对目标数据源写 SQL:

      直接 SQL 查询:允许你编写 SQL 语句直接操作数据库

      SQLCopy-- 可以通过 ${} 获取变量输入
      SELECT * FROM users WHERE age > ${arg1}
    • 数据源维护

      数据源列表:通过开发平台平台 > 新增数据源进行维护

  • HTTP请求节点

    HTTP 请求节点允许通过 HTTP 协议发送服务器请求,适用于获取外部检索结果、webhook、生成图片等情景。

    • 节点功能

      功能项

      说明

      请求类型

      支持 GET、POST、HEAD、PATCH、PUT、DELETE 等 HTTP 方法

      鉴权类型

      支持无鉴权、API-Key 基础、API-Key Bearer、API-Key 自定义等多种鉴权方式

      请求头

      可设置自定义 HTTP 请求头

      请求参数

      支持 URL 参数设置

      请求体

      支持 none、form-data、x-www-form-urlencoded、raw text、JSON等多种格式

  • 消息通知节点

    消息通知节点允许工作流将处理结果通过多种渠道发送通知,支持与企业常用的通信工具集成,实现自动化的消息推送。

    • 节点功能

      功能项

      说明

      通知渠道

      支持通过 PIGX 的消息推送模块进行结果推送

      集成方式

      支持 webhook 方式接入钉钉、企业微信等企业通讯工具

      变量替换

      可在消息内容中引用工作流变量,实现动态内容推送

    • 推送渠道维护

      推送渠道通过系统管理模块进行维护和配置。

  • 知识库节点

    知识库节点允许工作流直接查询已配置的知识库,实现对特定领域知识的检索和利用,是实现 RAG(检索增强生成)的关键组件。

    • 节点功能

      功能项

      说明

      知识库选择

      可选择系统中已配置的任意知识库

      提示词设置

      支持自定义提示词,引导大模型如何利用检索到的知识

      参数动态填充

      通过 ${} 语法在提示词中动态填充工作流变量

    • 使用事项注意

      • 知识库节点为同步查询模式,如果知识库的大模型响应较慢,可能导致超时

      • 对于第三方调用知识库的场景,建议适当增加超时时间设置

      • 可以结合条件节点,根据知识库查询结果决定后续流程走向

  • 结束节点

    结束节点定义一个 workflow 流程结束的最终输出内容。它是工作流的终点,用于汇总处理结果并返回给用户或其他系统。

技术实现原理

一、概述

AI Flow 是 PigX-AI 项目中 pigx-knowledge 模块的核心功能,位于 support.flow.model 包下。它是一个基于 DSL(领域特定语言) 的可视化 AI 工作流引擎,允许用户通过拖拽方式构建复杂的 AI 应用流程。

1.1 核心特性

  • 可视化流程设计:通过 DSL 定义工作流结构

  • 多种节点类型:支持 LLM、RAG、HTTP、DB、Switch、Code 等 12+ 种节点

  • 流式执行支持:支持实时流式输出和回调机制

  • 智能分支控制:支持条件分支和循环检测

  • 上下文管理:完整的变量传递和状态管理机制

1.2 技术架构

┌─────────────────────────────────────────────────────────┐
│                    AI Flow 架构层次                      │
├─────────────────────────────────────────────────────────┤
│  应用层: AiFlowService (流程管理、执行入口)              │
├─────────────────────────────────────────────────────────┤
│  处理器层: AiFlowProcessor (流程执行引擎)                │
│           ├─ AiDefaultAiFlowProcessor (默认实现)       │
│           └─ DFS 深度优先搜索算法                        │
├─────────────────────────────────────────────────────────┤
│  节点层: AiNodeProcessor (节点处理器工厂)                │
│         ├─ LLMNodeProcessor (大模型节点)                │
│         ├─ RagNodeExecutor (知识库检索节点)              │
│         ├─ HttpNodeProcessor (HTTP 请求节点)            │
│         ├─ DbNodeExecutor (数据库节点)                  │
│         ├─ SwitchNodeProcessor (分支节点)               │
│         └─ ... (其他节点类型)                            │
├─────────────────────────────────────────────────────────┤
│  上下文层: FlowContextHolder (执行上下文管理)           │
│           ├─ 变量存储 (variables)                       │
│           ├─ 参数管理 (parameters)                      │
│           ├─ 分支状态 (activeBranches)                  │
│           └─ Token 统计                                │
├─────────────────────────────────────────────────────────┤
│  模型层: DSL 定义                                       │
│         ├─ AiFlowDSLDefinition (流程定义)               │
│         ├─ AiNodeDefinition (节点定义)                  │
│         └─ AiFlowConnectionDefinition (连接定义)         │
└─────────────────────────────────────────────────────────┘

二、核心数据结构

2.1 DSL 定义结构

AiFlowDSLDefinition(流程 DSL 定义)

public class AiFlowDSLDefinition {
    private List<AiNodeDefinition> nodes;           // 节点列表
    private List<AiFlowConnectionDefinition> connections;  // 连接关系
    private List<Dict> params;                      // 执行参数
}

作用:描述整个工作流的完整结构,包含所有节点定义和节点间的连接关系。

AiNodeDefinition(节点定义)

public class AiNodeDefinition {
    private String id;                    // 节点唯一标识
    private String type;                  // 节点类型 (START/END/LLM/RAG/HTTP/DB/SWITCH等)
    private String name;                  // 节点名称
    private String status;                // 执行状态 (PENDING/RUNNING/SUCCESS/ERROR)
    private Long duration;                 // 执行耗时
    private Integer tokens;                // Token 消耗量
    private Map<String, Object> output;   // 输出结果
    
    // 节点类型特定配置
    private AiLLMNode llmParams;          // LLM 节点配置
    private AiRagNode ragParams;         // RAG 节点配置
    private AiHttpNode httpParams;        // HTTP 节点配置
    private AiDbNode dbParams;            // DB 节点配置
    private AiSwitchNode switchParams;    // Switch 节点配置
    // ... 其他节点类型配置
}

作用:定义单个节点的完整信息,包括配置参数、输入输出定义等。

AiFlowConnectionDefinition(连接定义)

public class AiFlowConnectionDefinition {
    private String sourceId;    // 源节点ID
    private String targetId;     // 目标节点ID
    private Integer portIndex;  // 端口索引(用于分支节点)
}

作用:定义节点之间的连接关系,构建流程的执行路径。

2.2 执行上下文

FlowContextHolder(上下文持有者)

public class FlowContextHolder {
    private final Long flowId;                    // 流程ID
    private final String type;                     // 流程类型
    private final Map<String, Object> variables;   // 变量存储 (nodeId.key -> value)
    private final Map<String, Object> parameters; // 输入参数
    private final Map<String, Integer> activeBranches; // 分支状态
    private List<AiNodeDefinition> executedNodes; // 已执行节点列表
    private long totalTokens;                      // Token 累计
    private AiFlowExecuteDTO aiFlowExecuteDTO;    // 执行DTO(包含回调)
}

核心方法

  1. 变量管理

    // 设置变量(带节点ID前缀)
    setVariable(String nodeId, String key, Object value)
    // 获取变量(支持多级查找:variables -> parameters -> envs)
    getVariable(String key)
  2. 参数管理

    getParameter(String key)  // 获取输入参数
    setParameter(String key, Object value)  // 设置参数
  3. 分支状态管理

    setBranchStatus(String nodeId, Integer branchIndex)  // 记录分支选择
    getBranchStatus(String nodeId)  // 获取分支状态

三、执行引擎核心原理

3.1 执行流程

public AiFlowExecuteResult execute(AiFlowEntity flow, 
                                   AiFlowDSLDefinition dsl, 
                                   AiFlowExecuteDTO flowExecuteDTO) {
    // 1. 创建执行上下文
    FlowContextHolder context = new FlowContextHolder(flow.getId(), flow.getType());
    context.setParameters(flowExecuteDTO.getParams());
    context.setEnvs(flowExecuteDTO.getEnvs());
    context.setAiFlowExecuteDTO(flowExecuteDTO);
    
    // 2. 查找开始节点
    AiNodeDefinition startNode = nodes.stream()
        .filter(node -> START.equals(node.getType()))
        .findFirst()
        .orElseThrow(() -> FlowException.invalidFlow("未找到开始节点"));
    
    // 3. 设置开始节点参数
    // 将输入参数映射到开始节点的变量中
    
    // 4. 执行节点(DFS 深度优先搜索)
    executeNode(startNode, context, nodes, connections, executedNodes, visited);
    
    // 5. 构建执行结果
    return AiFlowExecuteResult;
}

3.2 DFS 深度优先搜索算法

核心执行逻辑采用 深度优先搜索(DFS) 算法遍历和执行节点:

private void executeNode(AiNodeDefinition node, 
                         Integer branchIndex,
                         FlowContextHolder context,
                         List<AiNodeDefinition> nodes,
                         List<AiFlowConnectionDefinition> connections,
                         List<AiNodeDefinition> executedNodes,
                         Set<String> visited) {
    
    String nodeId = node.getId();
    
    // 1. 循环依赖检测
    if (visited.contains(nodeId)) {
        if (executedNodes.stream().anyMatch(n -> n.getId().equals(nodeId))) {
            throw FlowException.invalidFlow("检测到循环依赖,涉及节点: " + nodeId);
        }
        return;  // 已访问但未执行,跳过
    }
    
    // 2. 创建执行节点副本(避免修改原始定义)
    AiNodeDefinition execNode = new AiNodeDefinition();
    BeanUtils.copyProperties(node, execNode);
    execNode.setStatus(ExecutionStatusEnums.PENDING.getValue());
    
    visited.add(nodeId);
    long startTime = System.currentTimeMillis();
    
    try {
        // 3. 更新状态为运行中
        execNode.setStatus(ExecutionStatusEnums.RUNNING.getValue());
        
        // 4. 获取节点处理器并执行
        AiNodeProcessor processor = nodeProcessorFactory.getProcessor(node.getType());
        Dict result = processor.execute(execNode, context);
        
        // 5. 更新执行状态和结果
        execNode.setStatus(ExecutionStatusEnums.SUCCESS.getValue());
        execNode.setOutput(result);
        execNode.setDuration(System.currentTimeMillis() - startTime);
        
        // 6. 更新 Token 统计
        int tokens = result.get(TOKENS, 0);
        if (tokens > 0) {
            execNode.setTokens(tokens);
            context.addTokens(tokens);
        }
        
        // 7. 保存节点输出到上下文
        if (!result.isEmpty()) {
            context.setVariables(nodeId, result);
        }
        
        // 8. 添加到已执行列表
        executedNodes.add(execNode);
        context.setExecutedNodes(executedNodes);
        
        // 9. 处理后续节点
        String nodeType = node.getType();
        if (SWITCH.equals(nodeType) || QUESTION.equals(nodeType)) {
            // 分支节点:根据条件选择特定分支
            handleBranchNode(nodeId, result, connections, nodes, context, 
                           executedNodes, visited);
        } else {
            // 普通节点:执行所有后续节点
            handleRegularNode(nodeId, connections, nodes, branchIndex, context, 
                            executedNodes, visited);
        }
        
    } catch (Exception e) {
        // 错误处理
        execNode.setStatus(ExecutionStatusEnums.ERROR.getValue());
        execNode.setError(e.getMessage());
        executedNodes.add(execNode);
    }
}

3.3 分支节点处理

分支节点(Switch/Question)根据执行结果选择特定的后续节点:

private void handleBranchNode(String nodeId, Dict result, 
                              List<AiFlowConnectionDefinition> connections,
                              List<AiNodeDefinition> nodes,
                              FlowContextHolder context,
                              List<AiNodeDefinition> executedNodes,
                              Set<String> visited) {
    // 1. 获取分支索引(从节点输出结果中获取)
    Integer branchIdx = result.getInt(INDEX);
    context.setBranchStatus(nodeId, branchIdx);
    
    // 2. 查找对应分支的连接
    Optional<AiFlowConnectionDefinition> nextConn = connections.stream()
        .filter(conn -> nodeId.equals(conn.getSourceId()) 
                     && branchIdx.equals(conn.getPortIndex()))
        .findFirst();
    
    // 3. 执行选中的分支节点
    if (nextConn.isPresent()) {
        String nextNodeId = nextConn.get().getTargetId();
        AiNodeDefinition nextNode = nodes.stream()
            .filter(n -> nextNodeId.equals(n.getId()))
            .findFirst()
            .orElseThrow(() -> FlowException.invalidFlow("未找到节点: " + nextNodeId));
        
        executeNode(nextNode, branchIdx, context, nodes, connections, 
                   executedNodes, visited);
    }
}

3.4 普通节点处理

普通节点执行所有后续连接的节点:

private void handleRegularNode(String nodeId,
                               List<AiFlowConnectionDefinition> connections,
                               List<AiNodeDefinition> nodes,
                               Integer branchIndex,
                               FlowContextHolder context,
                               List<AiNodeDefinition> executedNodes,
                               Set<String> visited) {
    // 1. 获取所有后续连接(按端口索引排序)
    List<AiFlowConnectionDefinition> nextConnection = connections.stream()
        .filter(conn -> nodeId.equals(conn.getSourceId()))
        .sorted(Comparator.comparing(conn -> conn.getPortIndex() != null 
                                    ? conn.getPortIndex() : 0))
        .toList();
    
    // 2. 依次执行所有后续节点
    for (AiFlowConnectionDefinition conn : nextConnection) {
        String nextNodeId = conn.getTargetId();
        AiNodeDefinition nextNode = nodes.stream()
            .filter(n -> nextNodeId.equals(n.getId()))
            .findFirst()
            .orElseThrow(() -> FlowException.invalidFlow("未找到节点: " + nextNodeId));
        
        executeNode(nextNode, branchIndex, context, nodes, connections, 
                   executedNodes, visited);
    }
}

四、节点处理器机制

4.1 节点处理器工厂

使用 Spring 依赖注入 自动收集所有节点处理器:

@Service
@RequiredArgsConstructor
public class AiNodeProcessorFactory {
    // Spring 自动注入所有实现了 AiNodeProcessor 接口的 Bean
    private final Map<String, AiNodeProcessor> processorMap;
    
    public AiNodeProcessor getProcessor(String type) {
        AiNodeProcessor processor = processorMap.get(type);
        if (processor == null) {
            throw new RuntimeException("未知的节点类型: " + type);
        }
        return processor;
    }
}

4.2 节点处理器接口

public interface AiNodeProcessor {
    Dict execute(AiNodeDefinition node, FlowContextHolder context);
}

4.3 抽象基类

所有节点处理器继承 AbstractNodeProcessor,提供统一的执行框架:

public abstract class AbstractNodeProcessor implements AiNodeProcessor {
    
    @Override
    public Dict execute(AiNodeDefinition node, FlowContextHolder context) {
        // 1. 验证参数
        validateParams(node, context);
        // 2. 执行具体逻辑(由子类实现)
        return doExecute(node, context);
    }
    
    protected abstract Dict doExecute(AiNodeDefinition node, FlowContextHolder context);
    
    // 工具方法:获取输入变量
    protected Dict getInputVariables(AiNodeDefinition node, FlowContextHolder context) {
        Dict variables = Dict.create();
        List<AiParamDefinition> inputParams = node.getInputParams();
        if (inputParams != null) {
            inputParams.forEach(param -> 
                variables.set(param.getName(), context.getVariable(param.getType()))
            );
        }
        return variables;
    }
}

4.4 LLM 节点处理器示例

LLM 节点是最复杂的节点类型,支持流式和非流式两种模式:

@Component(NodeTypeConstants.LLM)
@RequiredArgsConstructor
public class LLMNodeProcessor extends AbstractNodeProcessor {
    
    private final ModelProvider modelProvider;
    private final ChatMemoryAdvisorProvider chatMemoryAdvisorProvider;
    
    @Override
    protected Dict doExecute(AiNodeDefinition node, FlowContextHolder context) {
        // 1. 验证配置
        AiLLMNode config = validateNodeConfig(node);
        
        // 2. 获取输入参数并处理消息模板
        Dict variables = getInputVariables(node, context);
        List<ChatMessage> chatMessages = processChatMessages(config.getMessages(), variables);
        
        // 3. 根据模式调用不同的处理逻辑
        boolean stream = context.isStream();
        if (YesNoEnum.YES.getCode().equals(config.getModelConfig().getIsVision())) {
            return processVisionModel(chatMessages, config.getModelConfig(), context);
        } else if (stream) {
            return processRegularModel(chatMessages, config.getModelConfig(), context, node);
        } else {
            return processNonStreamModel(chatMessages, config.getModelConfig(), context, node);
        }
    }
    
    // 流式处理
    private Dict processRegularModel(...) {
        // 1. 获取流式模型
        Pair<StreamingChatModel, AiStreamAssistantService> assistantServicePair = 
            modelProvider.getAiStreamAssistant(modelConfig.getModel());
        
        // 2. 获取对话记忆
        ChatMemory chatMemory = chatMemoryAdvisorProvider.get(context.getConversationId());
        
        // 3. 构建消息列表(SystemMessage + 历史消息)
        List<ChatMessage> allMessages = buildMessages(chatMessages, chatMemory);
        
        // 4. 流式调用,通过回调实时推送结果
        assistantServicePair.getKey().chat(allMessages, new StreamingChatResponseHandler() {
            @Override
            public void onPartialResponse(String msg) {
                // 实时推送部分响应
                context.getAiFlowExecuteDTO().getCallback().execute(
                    FlowCallbackResult.builder()
                        .data(FlowCallbackData.builder().content(msg).build())
                        .build()
                );
            }
            
            @Override
            public void onCompleteResponse(ChatResponse chatResponse) {
                // 完成响应,保存到记忆并推送最终结果
                chatMemory.add(chatResponse.aiMessage());
                // ... 推送完成消息
            }
        });
        
        return formatResponse(finalResponse[0]);
    }
}

4.5 支持的节点类型

系统支持以下节点类型(通过 @Component 注解注册):

节点类型

处理器类

功能说明

START

StartNodeProcessor

开始节点,接收输入参数

END

EndNodeProcessor

结束节点,输出最终结果

LLM

LLMNodeProcessor

大语言模型节点,支持流式输出

RAG

RagNodeExecutor

知识库检索节点,RAG 增强

HTTP

HttpNodeProcessor

HTTP 请求节点,调用外部 API

DB

DbNodeExecutor

数据库节点,执行 SQL 查询

SWITCH

SwitchNodeProcessor

条件分支节点

QUESTION

QuestionNodeProcessor

问答节点,用户交互

CODE

CodeNodeExecutor

代码执行节点

TEXT

TextNodeProcessor

文本输出节点

NOTICE

NoticeNodeProcessor

通知节点

MCP

MCPNodeProcessor

MCP 协议节点

五、流式执行机制

5.1 流式执行流程

public Object executeFlow(AiFlowExecuteDTO executeDTO) {
    // 1. 解析 DSL
    AiFlowDSLDefinition dsl = JSONUtil.toBean(dslValue, AiFlowDSLDefinition.class);
    
    // 2. 判断执行模式
    if (executeDTO.isStream()) {
        // 流式模式:使用 SSE (Server-Sent Events)
        SseEmitter emitter = new SseEmitter(300000L);
        
        // 设置回调函数
        executeDTO.setCallback(data -> {
            try {
                if (StrUtil.isNotBlank(data.getData().getContent())) {
                    emitter.send(SseEmitter.event().data(data));
                }
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        });
        
        // 异步执行流程
        executorService.execute(() -> {
            try {
                flowProcessor.execute(flow, dsl, executeDTO);
            } finally {
                emitter.complete();
            }
        });
        
        return emitter;
    } else {
        // 非流式模式:直接返回结果
        return R.ok(flowProcessor.execute(flow, dsl, executeDTO));
    }
}

5.2 回调机制

通过 FlowCallback 接口实现执行过程的实时通知:

@FunctionalInterface
public interface FlowCallback {
    void execute(FlowCallbackResult result);
}
​
// 回调数据结构
public static class FlowCallbackData {
    private String status;           // 执行状态
    private String nodeType;         // 节点类型
    private String nodeId;           // 节点ID
    private String content;          // 输出内容
    private Long duration;            // 执行耗时
    private List<AiNodeDefinition> nodes;  // 已执行节点列表
    private Object tokens;            // Token 使用量
    private Boolean isEnd;            // 是否结束
}

5.3 流式输出示例

LLM 节点在流式模式下,通过回调实时推送响应:

assistantServicePair.getKey().chat(allMessages, new StreamingChatResponseHandler() {
    @Override
    public void onPartialResponse(String msg) {
        // 每次收到部分响应时,立即通过回调推送
        context.getAiFlowExecuteDTO().getCallback().execute(
            FlowCallbackResult.builder()
                .data(FlowCallbackData.builder()
                    .content(msg)  // 部分内容
                    .nodeId(node.getId())
                    .nodeType(node.getType())
                    .build())
                .build()
        );
    }
    
    @Override
    public void onCompleteResponse(ChatResponse chatResponse) {
        // 完成时推送最终结果和统计信息
        context.getAiFlowExecuteDTO().getCallback().execute(
            FlowCallbackResult.builder()
                .data(FlowCallbackData.builder()
                    .content(END_MSG)
                    .tokens(chatResponse.tokenUsage().totalTokenCount())
                    .duration(context.getDuration())
                    .nodes(context.getExecutedNodes())
                    .build())
                .build()
        );
    }
});

六、变量传递机制

6.1 变量命名规则

变量采用 节点ID + 变量名 的命名方式,确保变量唯一性:

// 变量键格式:nodeId.variableName
String nodeKey = nodeId + "." + key;
​
// 设置变量
context.setVariable(nodeId, "content", "Hello World");
// 实际存储键:node1.content -> "Hello World"
​
// 获取变量
Object value = context.getVariable("node1.content");

6.2 变量查找优先级

public Object getVariable(String key) {
    // 1. 优先从 variables 中查找(节点输出)
    Object obj = variables.get(key);
    if (obj != null) {
        return obj;
    }
    
    // 2. 如果变量名不包含节点ID前缀,可能是用户输入的固定值
    if (!StrUtil.containsAnyIgnoreCase(key, ".")) {
        return key;  // 直接返回变量名本身
    }
    
    // 3. 从 parameters 中查找(输入参数)
    if (parameters.get(key) != null) {
        return parameters.get(key);
    }
    
    // 4. 从 envs 中查找(环境变量)
    return envs.get(key);
}

6.3 模板变量替换

节点配置中的消息内容支持模板变量替换:

// 使用 Hutool 模板引擎
TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig());
​
// 消息模板:{{node1.content}} 你好
String template = "{{node1.content}} 你好";
​
// 渲染模板(替换变量)
String rendered = engine.getTemplate(template).render(variables);
// 结果:Hello World 你好

七、循环检测机制

7.1 检测原理

使用 访问标记(visited)执行列表(executedNodes) 双重检测:

Set<String> visited = new HashSet<>();  // 已访问节点集合
List<AiNodeDefinition> executedNodes = new ArrayList<>();  // 已执行节点列表
​
if (visited.contains(nodeId)) {
    // 节点已被访问
    if (executedNodes.stream().anyMatch(n -> n.getId().equals(nodeId))) {
        // 如果节点已执行,说明存在循环依赖
        throw FlowException.invalidFlow("检测到循环依赖,涉及节点: " + nodeId);
    }
    // 如果节点未执行,可能是并行分支,直接返回
    return;
}

7.2 循环场景示例

节点A -> 节点B -> 节点C -> 节点A  (循环依赖,会抛出异常)
​
节点A -> 节点B -> 节点C
      -> 节点D -> 节点C  (并行分支,正常执行)

八、执行结果构建

8.1 结果结构

public class AiFlowExecuteResult {
    private List<AiNodeDefinition> nodes;      // 已执行节点列表(按执行顺序)
    private Map<String, Object> variables;     // 所有变量(包含中间结果)
    private Object result;                      // 最终结果
    private String executed;                    // 执行状态
    private Long duration;                      // 总执行时长
    private Long totalTokens;                   // Token 总消耗
}

8.2 结果收集

// 执行完成后构建结果
AiFlowExecuteResult result = new AiFlowExecuteResult();
result.setNodes(executedNodes);              // 所有已执行节点
result.setVariables(context.getVariables());  // 所有变量
result.setResult(context.getResult());        // 最终结果(从 variables 中获取)
result.setDuration(context.getDuration());    // 总耗时
result.setTotalTokens(context.getTotalTokens()); // Token 统计
result.setExecuted(ExecutionStatusEnums.SUCCESS.getValue());

九、关键技术点总结

9.1 设计模式

  1. 模板方法模式AbstractNodeProcessor 定义执行框架,子类实现具体逻辑

  2. 工厂模式AiNodeProcessorFactory 管理节点处理器

  3. 策略模式:不同节点类型使用不同的处理策略

  4. 观察者模式:通过回调机制实现流式输出

9.2 算法

  1. DFS 深度优先搜索:遍历和执行节点

  2. 循环检测算法:防止无限循环

  3. 分支选择算法:根据条件选择执行路径

9.3 并发处理

  1. 异步执行:流式模式下使用线程池异步执行

  2. 线程安全:上下文变量使用线程安全的数据结构

  3. 回调机制:通过回调函数实现实时通知

十、使用示例

10.1 简单流程示例

{
  "nodes": [
    {
      "id": "start1",
      "type": "START",
      "name": "开始"
    },
    {
      "id": "llm1",
      "type": "LLM",
      "name": "AI 对话",
      "llmParams": {
        "modelConfig": {
          "model": "gpt-4"
        },
        "messages": [
          {
            "role": "USER",
            "content": "{{start1.input}}"
          }
        ]
      }
    },
    {
      "id": "end1",
      "type": "END",
      "name": "结束"
    }
  ],
  "connections": [
    {
      "sourceId": "start1",
      "targetId": "llm1"
    },
    {
      "sourceId": "llm1",
      "targetId": "end1"
    }
  ]
}

10.2 执行流程

  1. 开始节点:接收输入参数 input

  2. LLM 节点:使用模板 {{start1.input}} 调用大模型

  3. 结束节点:输出最终结果

10.3 执行调用

AiFlowExecuteDTO executeDTO = new AiFlowExecuteDTO();
executeDTO.setId(flowId);
executeDTO.setParams(Dict.create().set("input", "你好"));
executeDTO.setStream(true);  // 流式执行
executeDTO.setConversationId("conv-123");
​
// 执行流程
Object result = aiFlowService.executeFlow(executeDTO);
// 流式模式返回 SseEmitter,客户端通过 SSE 接收实时数据

十一、总结

AI Flow 技术实现的核心特点:

  1. DSL 驱动:通过 JSON DSL 定义工作流,灵活可扩展

  2. DFS 执行:使用深度优先搜索算法遍历节点

  3. 插件化架构:节点处理器通过 Spring 自动注册,易于扩展

  4. 流式支持:完整的流式执行和回调机制

  5. 上下文管理:完善的变量传递和状态管理

  6. 错误处理:完善的异常处理和循环检测机制

该实现为构建复杂的 AI 应用提供了强大的工作流引擎支持,是 PigX-AI 项目的核心创新点之一。

十二、核心内容总结

1. 架构设计

  • DSL 驱动:通过 JSON 定义工作流结构

  • 分层架构:应用层 → 处理器层 → 节点层 → 上下文层 → 模型层

  • 插件化:节点处理器通过 Spring 自动注册

2. 执行引擎

  • DFS 深度优先搜索:递归遍历和执行节点

  • 循环检测:使用 visited 和 executedNodes 双重检测

  • 分支控制:支持条件分支和并行执行

3. 节点处理器机制

  • 工厂模式:AiNodeProcessorFactory 管理所有处理器

  • 模板方法:AbstractNodeProcessor 提供统一执行框架

  • 12+ 种节点类型:LLM、RAG、HTTP、DB、Switch 等

4. 流式执行

  • SSE 推送:使用 Server-Sent Events 实时推送结果

  • 回调机制:通过 FlowCallback 接口实现事件通知

  • 异步执行:使用线程池处理流式任务

5. 变量传递

  • 命名规则:nodeId.variableName 格式

  • 查找优先级:variables → parameters → envs

  • 模板替换:支持 Hutool 模板引擎变量替换

6. 关键技术点

  • 设计模式:模板方法、工厂、策略、观察者

  • 算法:DFS、循环检测、分支选择

  • 并发处理:异步执行、线程安全、回调机制


Comment