架构概述
文档版本: 1.0.0 最后更新: 2025-10-06 目标读者: 后端开发者、AI助手、系统架构师
目录
简介
Weather Station 是 TradingFlow 的核心工作流执行引擎,负责:
Flow 生命周期管理:从注册、启动到停止的完整生命周期
节点编排与执行:基于 DAG 的节点依赖管理和并发执行
信号传递机制:基于 RabbitMQ 的节点间异步通信
状态管理:基于 Redis 的分布式状态存储
周期调度:支持定时、周期性 Flow 执行
设计理念
松耦合通信:节点间通过信号(Signal)通信,不直接调用
分布式架构:支持多 Worker 实例并行处理
灵活扩展:基于装饰器的节点注册机制
高可靠性:完善的错误处理和状态恢复机制
核心概念
1. Weather Syntax (Weather 语法)
Weather Syntax 是 TradingFlow 的智能交易语言,用 JSON 格式描述工作流。
基本结构:
{
"nodes": [
{
"id": "node_1",
"type": "binance_price_node",
"config": { /* 节点配置 */ }
}
],
"edges": [
{
"source": "node_1",
"source_handle": "kline_data",
"target": "node_2",
"target_handle": "input_data"
}
]
}2. DAG (有向无环图)
定义:Weather Syntax 中的节点和边构成 DAG
连通分量:一个 Flow 可包含多个 DAG(连通分量)
Component ID:每个连通分量有唯一 ID,用于隔离控制
3. Flow、Cycle、Node
Weather Syntax (静态描述)
└─ Flow (运行实例)
└─ Cycle 0 (第 1 次执行)
│ └─ Node A, Node B, Node C...
└─ Cycle 1 (第 2 次执行)
└─ Node A, Node B, Node C...关键概念:
Flow ID:一个 Flow 的唯一运行实例标识
Cycle:Flow 的执行周期号,从 0 开始自增
Node ID:Flow 中节点的唯一标识
Node Task ID:格式为
{flow_id}_{cycle}_{node_id},标识一次具体的节点执行
4. Signal(信号)
节点间通信的数据载体。
Signal 结构:
@dataclass
class Signal:
signal_type: SignalType # 信号类型
payload: Dict[str, Any] # 数据负载
timestamp: Optional[datetime] # 时间戳Signal 类型:
DATA_READY: 通用数据就绪PRICE_DATA: 价格数据DATASET: 数据集DEX_TRADE_RECEIPT: 交易收据VAULT_INFO: 金库信息AI_RESPONSE: AI 响应STOP_EXECUTION: 停止执行信号
5. Handle(句柄)
节点的输入输出连接点。
Handle 类型:
Input Handle:接收上游信号
Output Handle:发送信号到下游
Aggregate Handle:聚合多个输入(
is_aggregate=True)
Handle 命名规范:
# 输入句柄
INPUT_HANDLE = "input_data"
# 输出句柄
OUTPUT_HANDLE = "output_data"
# Handle ID 格式
handle_id = f"{field_name}" # 不包含 -handle 后缀系统架构
整体架构图
┌─────────────────────────────────────────────────────────────┐
│ Weather Control │
│ (API Gateway Layer) │
└────────────────┬────────────────────────────────────────────┘
│ HTTP API
↓
┌─────────────────────────────────────────────────────────────┐
│ Weather Station │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ FlowScheduler (调度器) │ │
│ │ - Flow 注册与管理 │ │
│ │ - 周期调度 │ │
│ │ - DAG 分析 │ │
│ └──────────────────┬───────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Worker Instance Pool │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Worker 1 │ │ Worker 2 │ │ Worker N │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ NodeBase │ │ NodeBase │ │ NodeBase │ │ │
│ │ │ Execute │ │ Execute │ │ Execute │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │
┌────────────┴───────────┴────────────┐
│ │
↓ ↓
┌───────────────────┐ ┌──────────────────┐
│ RabbitMQ │ │ Redis │
│ (Message Queue) │ │ (State Store) │
│ │ │ │
│ - signals_exchange│ │ - Flow状态 │
│ - Topic路由 │ │ - Cycle状态 │
│ - Queue隔离 │ │ - Node任务状态 │
└───────────────────┘ └──────────────────┘分层架构
调度层
FlowScheduler
Flow生命周期管理、周期调度
编排层
FlowParser
Weather Syntax解析、DAG分析、连通分量识别
执行层
NodeExecutor, NodeBase
节点创建、执行、生命周期管理
通信层
NodeSignalPublisher/Consumer
信号发布订阅、消息路由
存储层
StateStore, NodeTaskManager
状态持久化、任务管理
关键组件
1. FlowScheduler(Flow 调度器)
位置: tradingflow/station/flow/scheduler.py (1546 行)
核心职责:
Flow 注册与配置管理
周期调度循环
DAG 结构分析
Cycle 执行控制
状态查询接口
关键方法:
async def register_flow(flow_id: str, flow_config: Dict)
async def start_flow(flow_id: str)
async def stop_flow(flow_id: str)
async def execute_cycle(flow_id: str, cycle: int)
async def get_flow_status(flow_id: str) -> Dict
async def get_cycle_status(flow_id: str, cycle: int) -> Dict单例模式:
_scheduler_instance = None
def get_scheduler_instance():
global _scheduler_instance
if _scheduler_instance is None:
_scheduler_instance = FlowScheduler()
return _scheduler_instance2. FlowParser(Flow 解析器)
位置: tradingflow/station/flow/flow_parser.py (216 行)
核心职责:
Weather Syntax JSON 解析
连通分量识别
DAG 环检测
Entry Nodes 识别
关键方法:
def find_dags() -> List[Dict[str, Any]]
def _find_connected_components() -> List[Set[str]]
def _is_dag(component: Set[str]) -> bool
def analyze_flow() -> Dict[str, Any]算法:
连通分量识别:BFS(广度优先搜索)
环检测:DFS(深度优先搜索)+ 三色标记法
3. NodeBase(节点基类)
位置: tradingflow/station/nodes/node_base.py (1481 行)
核心职责:
节点生命周期管理
Input/Output Handle 注册
信号接收与发送
状态管理
日志持久化
生命周期方法:
async def initialize_state_store() -> bool
async def initialize_message_queue() -> bool
async def start() -> bool # 启动节点
async def execute() -> bool # 执行节点逻辑(子类实现)
async def cleanup() # 清理资源信号处理:
async def _on_signal_received(signal: Signal, handle: str)
async def send_signal(source_handle: str, signal_type: SignalType, payload: Dict)4. NodeExecutor(节点执行器)
位置: tradingflow/station/core/node_executor.py (283 行)
核心职责:
节点实例创建
节点执行控制
异常处理与状态更新
资源清理
执行流程:
async def execute_node_task(
node_task_id: str,
flow_id: str,
component_id: int,
cycle: int,
node_id: str,
node_type: str,
node_data: Dict[str, Any]
)5. NodeSignalPublisher/Consumer
位置: tradingflow/depot/python/mq/node_signal_*.py
NodeSignalPublisher(信号发布者)
职责:
发送节点输出信号
动态路由键生成
支持精确发送和通配符发送
关键方法:
async def send_signal(source_handle: str, signal: Signal)
async def send_stop_execution_signal(reason: str)NodeSignalConsumer(信号消费者)
职责:
订阅上游节点信号
Routing Key 解析
Cycle 验证
信号分发到 Handle
关键方法:
async def consume()
async def on_signal(signal: Signal, handle: str, signal_context: Dict)
def _parse_routing_key(routing_key: str) -> Dict[str, Any]6. StateStore(状态存储)
位置: tradingflow/station/common/state_store.py (611 行)
实现:
StateStore:抽象基类(防腐层)
RedisStateStore:Redis 实现
InMemoryStateStore:内存实现(测试用)
StateStoreFactory:工厂模式创建
接口:
# 节点状态
async def set_node_task_status(node_task_id: str, status: str)
async def get_node_task_status(node_task_id: str) -> Dict
# 终止标志
async def set_termination_flag(node_task_id: str, reason: str)
async def get_termination_flag(node_task_id: str) -> Optional[Dict]
# 通用KV
async def set_value(key: str, value: Any)
async def get_value(key: str) -> Any
# 集合操作
async def add_to_set(key: str, value: Any)
async def get_set_members(key: str) -> List[Any]7. NodeTaskManager(任务管理器)
位置: tradingflow/station/common/node_task_manager.py (593 行)
职责:
节点任务注册与跟踪
任务状态更新
Worker 任务管理
任务查询接口
单例模式:
NodeTaskManager.get_instance()关键方法:
async def register_task(node_task_id: str, task_info: Dict)
async def update_task_status(node_task_id: str, status: str, additional_info: Dict)
async def get_task(node_task_id: str) -> Optional[Dict]
async def get_worker_tasks(worker_id: str) -> List[Dict]
async def stop_task(node_task_id: str) -> bool8. NodeRegistry(节点注册表)
位置: tradingflow/station/common/node_registry.py
职责:
节点类型注册
节点实例创建
Worker 实例管理
心跳机制
装饰器注册:
@register_node_type("binance_price_node", default_params={...})
class BinancePriceNode(NodeBase):
pass技术栈
核心技术
Python
主要编程语言
3.9+
asyncio
异步编程框架
内置
RabbitMQ
消息队列
3.x
aio-pika
异步 RabbitMQ 客户端
9.x
Redis
状态存储
6.x+
redis.asyncio
异步 Redis 客户端
内置
PostgreSQL
日志持久化
14+
Python 依赖
# 异步
asyncio
aioredis
# 消息队列
aio-pika
# 数据库
psycopg2-binary
sqlalchemy
# HTTP
httpx
# 工具
dataclasses
typing
logging设计模式
1. 单例模式(Singleton)
应用组件:
FlowScheduler
NodeRegistry
NodeTaskManager
目的: 确保全局唯一实例,共享状态
实现:
class FlowScheduler:
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance2. 工厂模式(Factory)
应用组件:
StateStoreFactory
NodeRegistry.create_node()
目的: 根据配置动态创建对象
实现:
class StateStoreFactory:
@staticmethod
def create(store_type: str, config: Dict):
if store_type == "redis":
return RedisStateStore(**config)
elif store_type == "memory":
return InMemoryStateStore()3. 装饰器模式(Decorator)
应用组件:
@register_node_type
目的: 自动注册节点类型
实现:
def register_node_type(node_type: str, default_params: Dict = None):
def decorator(cls):
NodeRegistry.get_instance().register_node(
node_type,
cls,
default_params
)
return cls
return decorator4. 观察者模式(Observer)
应用场景: 信号机制
实现:
Publisher: NodeSignalPublisher
Subscriber: NodeSignalConsumer
Event: Signal
5. 状态机模式(State Machine)
应用场景: 节点状态管理
状态转换:
PENDING → RUNNING → COMPLETED
↓
FAILED / TERMINATED / SKIPPED数据流
完整执行流程
1. Flow 注册
FlowScheduler.register_flow()
↓
分析 Weather Syntax 结构(FlowParser)
↓
存储 Flow 配置到 Redis
2. Flow 启动
FlowScheduler.start_flow()
↓
启动周期调度循环
3. Cycle 执行
_schedule_flow_execution()
↓
_execute_flow_cycle()
↓
为所有节点创建执行任务
↓
_execute_node() → 调用 Worker API
4. 节点执行(Worker 侧)
execute_node_task()
↓
创建节点实例
↓
node_instance.start()
↓
初始化状态存储
↓
初始化消息队列
↓
等待所有输入信号
↓
node_instance.execute()(子类实现)
↓
发送输出信号
↓
清理资源
5. 信号传递
NodeSignalPublisher.send_signal()
↓
生成 Routing Key
↓
发布到 RabbitMQ Exchange
↓
根据 Routing Key 路由到目标 Queue
↓
NodeSignalConsumer.consume()
↓
解析 Routing Key
↓
验证 Cycle
↓
调用 node._on_signal_received()
↓
更新输入信号状态
↓
检查是否所有信号就绪
↓
触发节点执行信号流示例
节点 A (BinancePriceNode)
↓ 执行完成
↓ send_signal("kline_data", PRICE_DATA, payload)
↓
RabbitMQ
↓ Routing Key: flow.xxx.component.0.cycle.0.from.A.handle.kline_data.to.B.handle.price_input
↓
节点 B (AIModelNode)
↓ consume() 接收消息
↓ 解析 routing_key
↓ _on_signal_received(signal, "price_input")
↓ 所有输入信号就绪
↓ execute()
↓ send_signal("ai_response", AI_RESPONSE, payload)
↓
RabbitMQ
↓ Routing Key: flow.xxx.component.0.cycle.0.from.B.handle.ai_response.to.C.handle.decision_input
↓
节点 C (BuyNode)
↓ consume() 接收消息
↓ execute() 执行交易下一步
阅读以下详细文档以深入了解各个方面:
消息队列详解 - RabbitMQ 路由设计和信号传递
Redis 状态管理 - 状态存储结构和字段设计
节点执行流程 - 节点生命周期和执行机制
Flow 调度机制 - 周期调度和 DAG 编排
开发指南 - 如何开发新节点和扩展功能
维护者: TradingFlow 开发团队 反馈: 如有问题请在相关 Issue 中讨论 版本历史:
v1.0.0 (2025-10-06): 初始版本
Last updated