Redis 状态管理

文档版本: 1.0.0 最后更新: 2025-10-06 前置阅读: 架构概述


目录


Redis 架构

设计原则

  1. 分布式共享:支持多 Worker 实例并发访问

  2. 数据隔离:不同 Flow 和 Cycle 的数据完全隔离

  3. 快速查询:使用 Hash、Set 等高效数据结构

  4. TTL 管理:自动清理过期数据

  5. 防腐层设计:StateStore 抽象层隔离 Redis 实现

Redis 数据结构使用

数据结构
用途
示例

String

简单 KV 存储

Flow 配置 JSON

Hash

对象属性存储

Flow 元数据、Cycle 状态

Set

无序集合

Cycle 中的节点列表

Sorted Set

有序集合(未使用)

预留用于优先级队列

StateStore 架构


Key 命名规范

全局规范

格式: <prefix>:<entity>:<identifier>[:<sub-entity>]

前缀统一: trading_flow: (可配置)

Key 分类表

类别
Key 格式
数据结构
TTL
说明

Flow 元数据

flow:{flow_id}

Hash

永久

Flow 配置和状态

Cycle 元数据

flow:{flow_id}:cycle:{cycle}

Hash

7天

Cycle 执行信息

Cycle 节点集合

flow:{flow_id}:cycle:{cycle}:nodes

Set

7天

Cycle 中的节点 ID

Component 停止标志

flow:{flow_id}:cycle:{cycle}:component:{component_id}:stop

String

1小时

Component 停止标记

节点任务详情

node_tasks:{node_task_id}

String (JSON)

24小时

节点任务完整信息

节点任务列表

node_tasks_list

Set

永久

所有节点任务 ID

Worker 任务列表

worker_tasks:{worker_id}

Set

永久

Worker 的任务 ID

节点状态

trading_flow:node:{node_task_id}

Hash

24小时

节点运行时状态

终止标志

trading_flow:node:{node_task_id}:terminate

String (JSON)

1小时

节点终止请求


Flow 状态管理

Flow 元数据结构

Key: flow:{flow_id} 类型: Hash TTL: 永久(或 Flow 删除时清理)

字段说明:

字段
类型
说明
示例

id

string

Flow ID

trading_decision_flow

config

JSON string

Flow 配置(Weather Syntax JSON)

{"nodes":[...],"edges":[...]}

structure

JSON string

DAG 结构分析结果

{"components":{...},"entry_nodes":[...]}

status

string

状态

registered, running, stopped, completed

last_cycle

int

最后执行的 Cycle

0, 1, 2

next_execution

timestamp

下次执行时间戳

1633024800.0

created_at

ISO datetime

创建时间

2025-10-06T10:00:00

Flow 状态转换

示例数据

Flow 注册流程


Cycle 状态管理

Cycle 元数据结构

Key: flow:{flow_id}:cycle:{cycle} 类型: Hash TTL: 7 天

字段说明:

字段
类型
说明
示例

flow_id

string

Flow ID

trading_decision_flow

cycle

int

Cycle 号

0, 1, 2

status

string

状态

running, completed, failed

start_time

ISO datetime

开始时间

2025-10-06T10:00:00

end_time

ISO datetime

结束时间

2025-10-06T10:05:00

Cycle 节点集合

Key: flow:{flow_id}:cycle:{cycle}:nodes 类型: Set TTL: 7 天

成员: Node ID 列表

Cycle 执行流程

Component 停止标志

Key: flow:{flow_id}:cycle:{cycle}:component:{component_id}:stop 类型: String TTL: 1 小时 值: "1" (存在即为停止)

用途: 停止特定连通分量的执行


节点任务管理

节点任务详情

Key: node_tasks:{node_task_id} 类型: String (JSON) TTL: 24 小时

Node Task ID 格式: {flow_id}_{cycle}_{node_id}

数据结构:

状态值:

  • registered: 已注册

  • pending: 等待执行

  • running: 执行中

  • completed: 已完成

  • failed: 失败

  • terminated: 已终止

  • skipped: 已跳过

节点任务列表

Key: node_tasks_list 类型: Set TTL: 永久

成员: 所有节点任务 ID

节点任务操作

注册任务

更新状态

查询任务

节点运行时状态

Key: trading_flow:node:{node_task_id} 类型: Hash TTL: 24 小时

字段:

节点终止标志

Key: trading_flow:node:{node_task_id}:terminate 类型: String (JSON) TTL: 1 小时

数据结构:

使用场景:


Worker 管理

Worker 任务列表

Key: worker_tasks:{worker_id} 类型: Set TTL: 永久

成员: Worker 执行的节点任务 ID

Worker 注册流程

Worker 心跳机制

Worker 任务查询


状态查询

查询 Flow 状态

查询 Cycle 状态

综合状态查询


数据清理策略

TTL 策略表

Key 类型
TTL
清理触发

Flow 元数据

永久

手动删除

Cycle 元数据

7 天

自动过期

Cycle 节点集合

7 天

自动过期

Component 停止标志

1 小时

自动过期

节点任务详情

24 小时

自动过期

节点任务列表

永久

定期清理

Worker 任务列表

永久

Worker 下线时清理

节点运行时状态

24 小时

自动过期

节点终止标志

1 小时

自动过期

Worker 信息

60 秒

心跳过期

手动清理操作

清理过期任务

清理 Worker 任务

清理 Flow 数据

定期清理任务


下一步

继续阅读相关文档:


维护者: TradingFlow 开发团队 版本历史:

  • v1.0.0 (2025-10-06): 初始版本

Last updated