节点执行流程

文档版本: 1.0.0 最后更新: 2025-10-06 前置阅读: 架构概述, 消息队列详解


目录


节点生命周期

完整生命周期图

┌─────────────────────────────────────────────────────────────┐
│ 1. 创建阶段 (Creation)                                      │
│    NodeRegistry.create_node()                               │
│    └─> NodeBase.__init__()                                  │
└────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 2. 初始化阶段 (Initialization)                              │
│    node.start()                                             │
│    ├─> initialize_state_store()                             │
│    ├─> initialize_message_queue()                           │
│    └─> set_status(PENDING)                                  │
└────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 3. 等待信号阶段 (Waiting for Signals)                       │
│    ├─> 启动信号消费者 consume()                             │
│    ├─> 接收上游信号 _on_signal_received()                   │
│    ├─> 更新输入信号状态                                      │
│    └─> 检查是否所有信号就绪 can_execute()                    │
└────────────────────┬────────────────────────────────────────┘

                     ↓ (所有信号就绪)
┌─────────────────────────────────────────────────────────────┐
│ 4. 执行阶段 (Execution)                                      │
│    set_status(RUNNING)                                       │
│    └─> execute() [子类实现]                                 │
└────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 5. 信号发送阶段 (Signal Emission)                           │
│    send_signal(handle, signal_type, payload)                │
│    └─> 下游节点接收信号                                      │
└────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 6. 完成阶段 (Completion)                                     │
│    ├─> set_status(COMPLETED / FAILED / TERMINATED)          │
│    └─> 更新任务状态到 Redis                                  │
└────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 7. 清理阶段 (Cleanup)                                        │
│    cleanup()                                                 │
│    ├─> close_message_queue()                                │
│    └─> close_state_store()                                  │
└─────────────────────────────────────────────────────────────┘

状态转换图


节点创建流程

创建入口

节点创建由 NodeExecutor.execute_node_task() 触发:

节点实例化

NodeBase 构造函数


信号等待机制

信号就绪检查

等待信号流程

信号接收处理


节点执行流程

执行方法签名

典型执行模式

模式 1:数据处理节点

模式 2:条件判断节点

模式 3:交易执行节点


信号发送机制

发送信号

获取输入信号


异常处理

异常类型

异常处理流程


资源清理

清理流程

强制终止


开发新节点

最小节点模板

开发清单

必须实现:

  1. 继承 NodeBase

  2. 使用 @register_node_type 装饰器注册

  3. 实现 execute() 方法

  4. 实现 _register_input_handles() 方法(如有输入)

建议实现: 5. 在 __init__ 中处理配置参数 6. 使用 auto_update_attr 自动更新成员变量 7. 完善错误处理和日志记录 8. 发送有意义的输出信号

可选实现: 9. 自定义信号处理器 _on_{handle_name}_received() 10. 自定义清理逻辑 _cleanup() 11. 超时控制 signal_timeout 12. 聚合 Handle 支持


下一步

继续阅读相关文档:


维护者: TradingFlow 开发团队 版本历史:

  • v1.0.0 (2025-10-06): 初始版本

Last updated