架构概述

文档版本: 1.0.0 最后更新: 2025-10-06 目标读者: 后端开发者、AI助手、系统架构师


目录


简介

Weather Station 是 TradingFlow 的核心工作流执行引擎,负责:

  1. Flow 生命周期管理:从注册、启动到停止的完整生命周期

  2. 节点编排与执行:基于 DAG 的节点依赖管理和并发执行

  3. 信号传递机制:基于 RabbitMQ 的节点间异步通信

  4. 状态管理:基于 Redis 的分布式状态存储

  5. 周期调度:支持定时、周期性 Flow 执行

设计理念

  • 松耦合通信:节点间通过信号(Signal)通信,不直接调用

  • 分布式架构:支持多 Worker 实例并行处理

  • 灵活扩展:基于装饰器的节点注册机制

  • 高可靠性:完善的错误处理和状态恢复机制


核心概念

1. Weather Syntax (Weather 语法)

Weather Syntax 是 TradingFlow 的智能交易语言,用 JSON 格式描述工作流。

基本结构:

2. DAG (有向无环图)

  • 定义:Weather Syntax 中的节点和边构成 DAG

  • 连通分量:一个 Flow 可包含多个 DAG(连通分量)

  • Component ID:每个连通分量有唯一 ID,用于隔离控制

3. Flow、Cycle、Node

关键概念:

  • Flow ID:一个 Flow 的唯一运行实例标识

  • Cycle:Flow 的执行周期号,从 0 开始自增

  • Node ID:Flow 中节点的唯一标识

  • Node Task ID:格式为 {flow_id}_{cycle}_{node_id},标识一次具体的节点执行

4. Signal(信号)

节点间通信的数据载体。

Signal 结构:

Signal 类型:

  • DATA_READY: 通用数据就绪

  • PRICE_DATA: 价格数据

  • DATASET: 数据集

  • DEX_TRADE_RECEIPT: 交易收据

  • VAULT_INFO: 金库信息

  • AI_RESPONSE: AI 响应

  • STOP_EXECUTION: 停止执行信号

5. Handle(句柄)

节点的输入输出连接点。

Handle 类型:

  • Input Handle:接收上游信号

  • Output Handle:发送信号到下游

  • Aggregate Handle:聚合多个输入(is_aggregate=True

Handle 命名规范:


系统架构

整体架构图

分层架构

层级
组件
职责

调度层

FlowScheduler

Flow生命周期管理、周期调度

编排层

FlowParser

Weather Syntax解析、DAG分析、连通分量识别

执行层

NodeExecutor, NodeBase

节点创建、执行、生命周期管理

通信层

NodeSignalPublisher/Consumer

信号发布订阅、消息路由

存储层

StateStore, NodeTaskManager

状态持久化、任务管理


关键组件

1. FlowScheduler(Flow 调度器)

位置: tradingflow/station/flow/scheduler.py (1546 行)

核心职责:

  • Flow 注册与配置管理

  • 周期调度循环

  • DAG 结构分析

  • Cycle 执行控制

  • 状态查询接口

关键方法:

单例模式:

2. FlowParser(Flow 解析器)

位置: tradingflow/station/flow/flow_parser.py (216 行)

核心职责:

  • Weather Syntax JSON 解析

  • 连通分量识别

  • DAG 环检测

  • Entry Nodes 识别

关键方法:

算法:

  • 连通分量识别:BFS(广度优先搜索)

  • 环检测:DFS(深度优先搜索)+ 三色标记法

3. NodeBase(节点基类)

位置: tradingflow/station/nodes/node_base.py (1481 行)

核心职责:

  • 节点生命周期管理

  • Input/Output Handle 注册

  • 信号接收与发送

  • 状态管理

  • 日志持久化

生命周期方法:

信号处理:

4. NodeExecutor(节点执行器)

位置: tradingflow/station/core/node_executor.py (283 行)

核心职责:

  • 节点实例创建

  • 节点执行控制

  • 异常处理与状态更新

  • 资源清理

执行流程:

5. NodeSignalPublisher/Consumer

位置: tradingflow/depot/python/mq/node_signal_*.py

NodeSignalPublisher(信号发布者)

职责:

  • 发送节点输出信号

  • 动态路由键生成

  • 支持精确发送和通配符发送

关键方法:

NodeSignalConsumer(信号消费者)

职责:

  • 订阅上游节点信号

  • Routing Key 解析

  • Cycle 验证

  • 信号分发到 Handle

关键方法:

6. StateStore(状态存储)

位置: tradingflow/station/common/state_store.py (611 行)

实现:

  • StateStore:抽象基类(防腐层)

  • RedisStateStore:Redis 实现

  • InMemoryStateStore:内存实现(测试用)

  • StateStoreFactory:工厂模式创建

接口:

7. NodeTaskManager(任务管理器)

位置: tradingflow/station/common/node_task_manager.py (593 行)

职责:

  • 节点任务注册与跟踪

  • 任务状态更新

  • Worker 任务管理

  • 任务查询接口

单例模式:

关键方法:

8. NodeRegistry(节点注册表)

位置: tradingflow/station/common/node_registry.py

职责:

  • 节点类型注册

  • 节点实例创建

  • Worker 实例管理

  • 心跳机制

装饰器注册:


技术栈

核心技术

技术
用途
版本

Python

主要编程语言

3.9+

asyncio

异步编程框架

内置

RabbitMQ

消息队列

3.x

aio-pika

异步 RabbitMQ 客户端

9.x

Redis

状态存储

6.x+

redis.asyncio

异步 Redis 客户端

内置

PostgreSQL

日志持久化

14+

Python 依赖


设计模式

1. 单例模式(Singleton)

应用组件:

  • FlowScheduler

  • NodeRegistry

  • NodeTaskManager

目的: 确保全局唯一实例,共享状态

实现:

2. 工厂模式(Factory)

应用组件:

  • StateStoreFactory

  • NodeRegistry.create_node()

目的: 根据配置动态创建对象

实现:

3. 装饰器模式(Decorator)

应用组件:

  • @register_node_type

目的: 自动注册节点类型

实现:

4. 观察者模式(Observer)

应用场景: 信号机制

实现:

  • Publisher: NodeSignalPublisher

  • Subscriber: NodeSignalConsumer

  • Event: Signal

5. 状态机模式(State Machine)

应用场景: 节点状态管理

状态转换:


数据流

完整执行流程

信号流示例


下一步

阅读以下详细文档以深入了解各个方面:

  1. 消息队列详解 - RabbitMQ 路由设计和信号传递

  2. Redis 状态管理 - 状态存储结构和字段设计

  3. 节点执行流程 - 节点生命周期和执行机制

  4. Flow 调度机制 - 周期调度和 DAG 编排

  5. 开发指南arrow-up-right - 如何开发新节点和扩展功能


维护者: TradingFlow 开发团队 反馈: 如有问题请在相关 Issue 中讨论 版本历史:

  • v1.0.0 (2025-10-06): 初始版本

Last updated