Redis 状态管理
文档版本: 1.0.0 最后更新: 2025-10-06 前置阅读: 架构概述
目录
Redis 架构
设计原则
分布式共享:支持多 Worker 实例并发访问
数据隔离:不同 Flow 和 Cycle 的数据完全隔离
快速查询:使用 Hash、Set 等高效数据结构
TTL 管理:自动清理过期数据
防腐层设计:StateStore 抽象层隔离 Redis 实现
Redis 数据结构使用
String
简单 KV 存储
Flow 配置 JSON
Hash
对象属性存储
Flow 元数据、Cycle 状态
Set
无序集合
Cycle 中的节点列表
Sorted Set
有序集合(未使用)
预留用于优先级队列
StateStore 架构
┌─────────────────────────────────────────────────┐
│ Application Layer │
│ (FlowScheduler, NodeTaskManager, NodeBase) │
└────────────────────┬────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────┐
│ StateStore (Abstract) │
│ - set_node_task_status() │
│ - get_node_task_status() │
│ - set_value() / get_value() │
│ - add_to_set() / get_set_members() │
└────────────────────┬────────────────────────────┘
│
┌────────────┴─────────────┐
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ RedisStateStore │ │InMemoryStateStore│
│ (Production) │ │ (Testing) │
└─────────────────┘ └─────────────────┘Key 命名规范
全局规范
格式: <prefix>:<entity>:<identifier>[:<sub-entity>]
前缀统一: trading_flow: (可配置)
Key 分类表
Flow 元数据
flow:{flow_id}
Hash
永久
Flow 配置和状态
Cycle 元数据
flow:{flow_id}:cycle:{cycle}
Hash
7天
Cycle 执行信息
Cycle 节点集合
flow:{flow_id}:cycle:{cycle}:nodes
Set
7天
Cycle 中的节点 ID
Component 停止标志
flow:{flow_id}:cycle:{cycle}:component:{component_id}:stop
String
1小时
Component 停止标记
节点任务详情
node_tasks:{node_task_id}
String (JSON)
24小时
节点任务完整信息
节点任务列表
node_tasks_list
Set
永久
所有节点任务 ID
Worker 任务列表
worker_tasks:{worker_id}
Set
永久
Worker 的任务 ID
节点状态
trading_flow:node:{node_task_id}
Hash
24小时
节点运行时状态
终止标志
trading_flow:node:{node_task_id}:terminate
String (JSON)
1小时
节点终止请求
Flow 状态管理
Flow 元数据结构
Key: flow:{flow_id}
类型: Hash
TTL: 永久(或 Flow 删除时清理)
字段说明:
id
string
Flow ID
trading_decision_flow
config
JSON string
Flow 配置(Weather Syntax JSON)
{"nodes":[...],"edges":[...]}
structure
JSON string
DAG 结构分析结果
{"components":{...},"entry_nodes":[...]}
status
string
状态
registered, running, stopped, completed
last_cycle
int
最后执行的 Cycle
0, 1, 2
next_execution
timestamp
下次执行时间戳
1633024800.0
created_at
ISO datetime
创建时间
2025-10-06T10:00:00
Flow 状态转换
registered → running → stopped
↓
completed (interval=0 时)示例数据
# 查看 Flow 元数据
HGETALL flow:trading_decision_flow
1) "id"
2) "trading_decision_flow"
3) "config"
4) "{"nodes":[{"id":"A",...}],"edges":[...]}"
5) "structure"
6) "{"components":{"0":{"nodes":[...]}}}"
7) "status"
8) "running"
9) "last_cycle"
10) "5"
11) "next_execution"
12) "1633024800.0"
13) "created_at"
14) "2025-10-06T10:00:00"Flow 注册流程
# FlowScheduler.register_flow()
flow_data = {
"id": flow_id,
"config": json.dumps(flow_config),
"structure": json.dumps(flow_structure),
"status": "registered",
"last_cycle": -1,
"next_execution": 0,
"created_at": datetime.now().isoformat()
}
await self.redis.hset(f"flow:{flow_id}", mapping=flow_data)Cycle 状态管理
Cycle 元数据结构
Key: flow:{flow_id}:cycle:{cycle}
类型: Hash
TTL: 7 天
字段说明:
flow_id
string
Flow ID
trading_decision_flow
cycle
int
Cycle 号
0, 1, 2
status
string
状态
running, completed, failed
start_time
ISO datetime
开始时间
2025-10-06T10:00:00
end_time
ISO datetime
结束时间
2025-10-06T10:05:00
Cycle 节点集合
Key: flow:{flow_id}:cycle:{cycle}:nodes
类型: Set
TTL: 7 天
成员: Node ID 列表
# 查看 Cycle 中的所有节点
SMEMBERS flow:trading_flow:cycle:0:nodes
1) "binance_price"
2) "ai_model"
3) "buy_node"Cycle 执行流程
# FlowScheduler._execute_flow_cycle()
# 1. 创建 Cycle 记录
cycle_key = f"flow:{flow_id}:cycle:{cycle}"
cycle_data = {
"flow_id": flow_id,
"cycle": str(cycle),
"status": "running",
"start_time": datetime.now().isoformat(),
"end_time": ""
}
await self.redis.hset(cycle_key, mapping=cycle_data)
# 2. 注册节点到 Cycle
nodes_key = f"flow:{flow_id}:cycle:{cycle}:nodes"
for node_id in node_map.keys():
await self.redis.sadd(nodes_key, node_id)
# 3. 执行所有节点...
# 4. 更新 Cycle 状态为完成
await self.redis.hset(
cycle_key,
mapping={"status": "completed", "end_time": datetime.now().isoformat()}
)Component 停止标志
Key: flow:{flow_id}:cycle:{cycle}:component:{component_id}:stop
类型: String
TTL: 1 小时
值: "1" (存在即为停止)
用途: 停止特定连通分量的执行
# 设置停止标志
await self.redis.set(
f"flow:{flow_id}:cycle:{cycle}:component:{component_id}:stop",
"1"
)
# 检查是否停止
stop_flag = await self.redis.get(
f"flow:{flow_id}:cycle:{cycle}:component:{component_id}:stop"
)
if stop_flag:
# 跳过节点执行
return {"status": "skipped", "reason": "component_stopped"}节点任务管理
节点任务详情
Key: node_tasks:{node_task_id}
类型: String (JSON)
TTL: 24 小时
Node Task ID 格式: {flow_id}_{cycle}_{node_id}
数据结构:
{
"node_task_id": "trading_flow_0_binance_price",
"flow_id": "trading_flow",
"cycle": 0,
"node_id": "binance_price",
"node_type": "binance_price_node",
"worker_id": "worker_1",
"status": "running",
"registered_at": "2025-10-06T10:00:00",
"updated_at": "2025-10-06T10:00:05",
"message": "Executing node",
"progress": 50,
"config": { /* 节点配置 */ }
}状态值:
registered: 已注册pending: 等待执行running: 执行中completed: 已完成failed: 失败terminated: 已终止skipped: 已跳过
节点任务列表
Key: node_tasks_list
类型: Set
TTL: 永久
成员: 所有节点任务 ID
SMEMBERS node_tasks_list
1) "trading_flow_0_binance_price"
2) "trading_flow_0_ai_model"
3) "trading_flow_0_buy_node"节点任务操作
注册任务
# NodeTaskManager.register_task()
task_info = {
"node_task_id": node_task_id,
"flow_id": flow_id,
"cycle": cycle,
"node_id": node_id,
"node_type": node_type,
"worker_id": self.worker_id,
"status": "registered",
"registered_at": datetime.now().isoformat()
}
# 保存任务详情
task_key = f"node_tasks:{node_task_id}"
await self.state_store.set_value(task_key, task_info)
# 添加到任务列表
await self.state_store.add_to_set("node_tasks_list", node_task_id)更新状态
# NodeTaskManager.update_task_status()
task_key = f"node_tasks:{node_task_id}"
task_info = await self.state_store.get_value(task_key)
task_info["status"] = status
task_info["updated_at"] = datetime.now().isoformat()
if additional_info:
task_info.update(additional_info)
await self.state_store.set_value(task_key, task_info)查询任务
# NodeTaskManager.get_task()
task_key = f"node_tasks:{node_task_id}"
task_info = await self.state_store.get_value(task_key)
return task_info节点运行时状态
Key: trading_flow:node:{node_task_id}
类型: Hash
TTL: 24 小时
字段:
HGETALL trading_flow:node:trading_flow_0_binance_price
1) "status"
2) "running"
3) "updated_at"
4) "1633024800.0"
5) "error_message"
6) ""节点终止标志
Key: trading_flow:node:{node_task_id}:terminate
类型: String (JSON)
TTL: 1 小时
数据结构:
{
"reason": "Stopped by NodeTaskManager",
"timestamp": "2025-10-06T10:00:00"
}使用场景:
# 设置终止标志
await self.state_store.set_termination_flag(
node_task_id,
reason="User requested stop"
)
# 节点检查终止标志
terminate_flag = await self.state_store.get_termination_flag(node_task_id)
if terminate_flag:
raise NodeStopExecutionException(
reason=terminate_flag["reason"],
source_node="system"
)Worker 管理
Worker 任务列表
Key: worker_tasks:{worker_id}
类型: Set
TTL: 永久
成员: Worker 执行的节点任务 ID
SMEMBERS worker_tasks:worker_1
1) "trading_flow_0_binance_price"
2) "trading_flow_0_ai_model"
3) "trading_flow_1_buy_node"Worker 注册流程
# NodeRegistry.register_worker()
worker_key = f"workers:{worker_id}"
worker_info = {
"id": worker_id,
"api_url": api_url,
"supported_nodes": supported_node_types,
"status": "active",
"last_heartbeat": datetime.now().isoformat()
}
await self.redis.hset(worker_key, mapping=worker_info)
await self.redis.expire(worker_key, heartbeat_ttl) # 默认 60 秒Worker 心跳机制
# 定期更新心跳
while True:
await self.redis.hset(
f"workers:{worker_id}",
"last_heartbeat",
datetime.now().isoformat()
)
await self.redis.expire(f"workers:{worker_id}", 60)
await asyncio.sleep(30)Worker 任务查询
# NodeTaskManager.get_worker_tasks()
worker_key = f"worker_tasks:{worker_id}"
task_ids = await self.state_store.get_set_members(worker_key)
tasks = []
for task_id in task_ids:
task_info = await self.get_task(task_id)
if task_info:
tasks.append(task_info)
return tasks状态查询
查询 Flow 状态
# FlowScheduler.get_flow_status()
flow_data = await self.redis.hgetall(f"flow:{flow_id}")
# 解析 JSON 字段
if "config" in flow_data:
flow_data["config"] = json.loads(flow_data["config"])
if "structure" in flow_data:
flow_data["structure"] = json.loads(flow_data["structure"])
# 获取当前 Cycle 状态
current_cycle = int(flow_data.get("last_cycle", -1))
if current_cycle >= 0:
cycle_status = await self.get_cycle_status(flow_id, current_cycle)
flow_data["current_cycle_status"] = cycle_status
return flow_data查询 Cycle 状态
# FlowScheduler.get_cycle_status()
cycle_key = f"flow:{flow_id}:cycle:{cycle}"
cycle_data = await self.redis.hgetall(cycle_key)
# 获取所有节点状态
nodes_key = f"flow:{flow_id}:cycle:{cycle}:nodes"
node_ids = await self.redis.smembers(nodes_key)
# 从 NodeTaskManager 获取节点详情
task_manager = NodeTaskManager.get_instance()
nodes_status = {}
for node_id in node_ids:
node_task_id = f"{flow_id}_{cycle}_{node_id}"
node_data = await task_manager.get_task(node_task_id)
if node_data:
nodes_status[node_id] = node_data
return {
"cycle": cycle,
"status": cycle_data.get("status"),
"start_time": cycle_data.get("start_time"),
"end_time": cycle_data.get("end_time"),
"nodes": nodes_status,
"node_count": len(node_ids)
}综合状态查询
# FlowScheduler.get_comprehensive_flow_status()
comprehensive_nodes = await node_manager.get_comprehensive_node_status(
flow_id=flow_id,
cycle=cycle
)
# 统计信息
total_nodes = len(comprehensive_nodes)
running_nodes = sum(1 for node in comprehensive_nodes.values() if node['status'] == 'running')
completed_nodes = sum(1 for node in comprehensive_nodes.values() if node['status'] == 'completed')
error_nodes = sum(1 for node in comprehensive_nodes.values() if node['status'] == 'error')
return {
"flow_id": flow_id,
"cycle": cycle,
"flow_status": flow_status,
"nodes": comprehensive_nodes,
"statistics": {
"total_nodes": total_nodes,
"running_nodes": running_nodes,
"completed_nodes": completed_nodes,
"error_nodes": error_nodes
}
}数据清理策略
TTL 策略表
Flow 元数据
永久
手动删除
Cycle 元数据
7 天
自动过期
Cycle 节点集合
7 天
自动过期
Component 停止标志
1 小时
自动过期
节点任务详情
24 小时
自动过期
节点任务列表
永久
定期清理
Worker 任务列表
永久
Worker 下线时清理
节点运行时状态
24 小时
自动过期
节点终止标志
1 小时
自动过期
Worker 信息
60 秒
心跳过期
手动清理操作
清理过期任务
# NodeTaskManager.cleanup_old_tasks()
task_ids = await self.state_store.get_set_members("node_tasks_list")
for task_id in task_ids:
task_key = f"node_tasks:{task_id}"
task_info = await self.state_store.get_value(task_key)
# 如果任务不存在(已过期),从列表中移除
if not task_info:
await self.state_store.remove_from_set("node_tasks_list", task_id)清理 Worker 任务
# NodeTaskManager.cleanup_worker_tasks()
worker_key = f"worker_tasks:{worker_id}"
task_ids = await self.state_store.get_set_members(worker_key)
for task_id in task_ids:
# 从任务列表移除
await self.state_store.remove_from_set("node_tasks_list", task_id)
# 删除任务详情
await self.state_store.delete_value(f"node_tasks:{task_id}")
# 删除 Worker 任务列表
await self.state_store.delete_value(worker_key)清理 Flow 数据
# FlowScheduler.delete_flow()
# 1. 删除 Flow 元数据
await self.redis.delete(f"flow:{flow_id}")
# 2. 删除所有 Cycle 数据
for cycle in range(last_cycle + 1):
await self.redis.delete(f"flow:{flow_id}:cycle:{cycle}")
await self.redis.delete(f"flow:{flow_id}:cycle:{cycle}:nodes")
# 3. 删除节点任务
for cycle in range(last_cycle + 1):
node_ids = await self.redis.smembers(f"flow:{flow_id}:cycle:{cycle}:nodes")
for node_id in node_ids:
node_task_id = f"{flow_id}_{cycle}_{node_id}"
await node_manager.remove_task(node_task_id)定期清理任务
# 建议在 FlowScheduler 中添加定期清理任务
async def _cleanup_expired_data(self):
"""定期清理过期数据"""
while True:
try:
# 1. 清理过期任务列表
await self._cleanup_task_list()
# 2. 清理过期 Cycle 数据
await self._cleanup_expired_cycles()
# 3. 清理无效 Worker
await self._cleanup_dead_workers()
except Exception as e:
logger.error("Error in cleanup task: %s", e)
# 每小时执行一次
await asyncio.sleep(3600)下一步
继续阅读相关文档:
维护者: TradingFlow 开发团队 版本历史:
v1.0.0 (2025-10-06): 初始版本
Last updated