消息队列详解

文档版本: 1.0.0 最后更新: 2025-10-06 前置阅读: 架构概述


目录


消息队列架构

整体设计

Weather Station 使用 RabbitMQ Topic Exchange 实现节点间的信号传递。

┌─────────────────────────────────────────────────────────┐
│            signals_exchange (Topic Exchange)            │
└──────────┬──────────────────────────────────────────────┘

           │ Routing Rules (Based on Routing Key)

    ┌──────┴─────────┬─────────────┬────────────┐
    │                │             │            │
    ↓                ↓             ↓            ↓
┌─────────┐    ┌─────────┐   ┌─────────┐  ┌─────────┐
│ Queue A │    │ Queue B │   │ Queue C │  │ Queue D │
└─────────┘    └─────────┘   └─────────┘  └─────────┘
    ↓                ↓             ↓            ↓
 Node A          Node B        Node C       Node D

RabbitMQ 配置

配置项
说明

Exchange Name

signals_exchange

全局信号交换机

Exchange Type

topic

主题交换机,支持通配符路由

Queue Durability

True

队列持久化

Message Durability

True

消息持久化

Auto Delete

False

队列不自动删除

Exclusive

False

非独占队列

设计优势

  1. 解耦合:上游节点无需知道下游节点的具体信息

  2. 灵活路由:基于 Routing Key 的精确匹配和模式匹配

  3. 并发支持:多个下游节点可以并行接收同一信号

  4. 周期隔离:不同 Cycle 的消息完全隔离

  5. 可扩展性:支持动态添加节点和消费者


Routing Key 设计

标准 Routing Key 格式

字段说明

字段
类型
说明
示例

flow_id

string

Flow 唯一标识

trading_decision_flow

component_id

int

连通分量 ID

0, 1, 2

cycle

int

执行周期号

0, 1, 2

source_node

string

源节点 ID

binance_price

output_handle

string

源句柄名称

kline_data

target_node

string

目标节点 ID

ai_model

input_handle

string

目标句柄名称

price_input

Routing Key 示例

1. 精确路由

匹配规则:

  • Flow ID: my_flow

  • Component: 0

  • Cycle: 0

  • Source: node_A, Handle: data

  • Target: node_B, Handle: input

2. 通配符路由(发送到多个 Handle)

场景: 一个输出连接到目标节点的多个输入 Handle

3. 停止执行信号

场景: 广播停止执行信号到当前 Cycle 的所有节点

订阅模式(Binding Key)

节点订阅消息时使用通配符:

通配符说明:

  • * : 匹配一个词(word)

  • # : 匹配零个或多个词


Queue 命名规范

标准 Queue 命名

设计原则

  1. 唯一性:每个节点在每个 Cycle 都有唯一队列

  2. 隔离性:不同 Cycle 的消息不会混淆

  3. 可追溯性:队列名称清晰标识所属 Flow 和节点

Queue 示例

Queue 特性

  • 持久化:队列持久化到磁盘

  • 手动确认:消息处理完成后手动 ACK

  • 预取数量prefetch_count=1,公平分发

  • TTL:消息 TTL 根据 Flow 配置设置


信号传递流程

完整流程图

关键代码实现

发送信号(Publisher)

接收信号(Consumer)


Handle 匹配机制

输入边映射(Input Edges Map)

每个节点维护输入信号的状态映射:

Edge Key 格式:

信号匹配过程

聚合 Handle(Aggregate Handle)

场景: 多个输出连接到同一个输入 Handle

聚合逻辑:

结果格式:


特殊场景处理

1. 一对多连接

场景: 一个输出连接到多个下游节点

实现:

2. 多对一连接

场景: 多个输出连接到同一个输入 Handle

实现: 使用聚合 Handle(见上文)

3. 通配符 Handle

场景: 接收任意上游信号

处理逻辑:

4. 停止执行信号

Routing Key:

特点:

  • 广播到当前 Cycle 的所有节点

  • 不包含具体的目标节点和 Handle 信息

  • 优先级高于普通信号

处理逻辑:


实战示例

示例 1:简单的价格数据流

工作流:

消息流:

  1. Node A → Node B

  1. Node B → Node C

示例 2:聚合多源数据

工作流:

Node D 的 Handle 定义:

消息流:

  1. Node A → Node D

  1. Node B → Node D

  1. Node C → Node D

Node D 接收到的聚合数据:

示例 3:停止执行信号

场景: Condition Node 判断不满足条件,停止当前 Cycle 执行

代码:

Routing Key:

所有节点接收:


下一步

继续阅读相关文档:


维护者: TradingFlow 开发团队 版本历史:

  • v1.0.0 (2025-10-06): 初始版本

Last updated