节点执行流程
文档版本: 1.0.0 最后更新: 2025-10-06 前置阅读: 架构概述, 消息队列详解
目录
节点生命周期
完整生命周期图
┌─────────────────────────────────────────────────────────────┐
│ 1. 创建阶段 (Creation) │
│ NodeRegistry.create_node() │
│ └─> NodeBase.__init__() │
└────────────────────┬────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 2. 初始化阶段 (Initialization) │
│ node.start() │
│ ├─> initialize_state_store() │
│ ├─> initialize_message_queue() │
│ └─> set_status(PENDING) │
└────────────────────┬────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 3. 等待信号阶段 (Waiting for Signals) │
│ ├─> 启动信号消费者 consume() │
│ ├─> 接收上游信号 _on_signal_received() │
│ ├─> 更新输入信号状态 │
│ └─> 检查是否所有信号就绪 can_execute() │
└────────────────────┬────────────────────────────────────────┘
│
↓ (所有信号就绪)
┌─────────────────────────────────────────────────────────────┐
│ 4. 执行阶段 (Execution) │
│ set_status(RUNNING) │
│ └─> execute() [子类实现] │
└────────────────────┬────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 5. 信号发送阶段 (Signal Emission) │
│ send_signal(handle, signal_type, payload) │
│ └─> 下游节点接收信号 │
└────────────────────┬────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 6. 完成阶段 (Completion) │
│ ├─> set_status(COMPLETED / FAILED / TERMINATED) │
│ └─> 更新任务状态到 Redis │
└────────────────────┬────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 7. 清理阶段 (Cleanup) │
│ cleanup() │
│ ├─> close_message_queue() │
│ └─> close_state_store() │
└─────────────────────────────────────────────────────────────┘状态转换图
PENDING (初始状态)
│
↓ (start() 调用)
RUNNING (执行中)
│
├─> COMPLETED (成功完成)
│
├─> FAILED (执行失败)
│
├─> TERMINATED (被终止)
│
└─> SKIPPED (被跳过)节点创建流程
创建入口
节点创建由 NodeExecutor.execute_node_task() 触发:
# tradingflow/station/core/node_executor.py
async def execute_node_task(
node_task_id: str,
flow_id: str,
component_id: int,
cycle: int,
node_id: str,
node_type: str,
node_data: Dict[str, Any]
):
"""执行节点任务的主入口"""
try:
# 1. 更新状态为 PENDING
await _update_node_status(node_task_id, NodeStatus.PENDING, "Task created")
# 2. 创建节点实例
node_instance = await _create_node_instance(
node_type, node_data, flow_id, component_id, cycle, node_id
)
# 3. 启动节点执行
success = await node_instance.start()
# 4. 处理执行结果
if success:
await _update_node_status(node_task_id, NodeStatus.COMPLETED, "Completed")
else:
await _update_node_status(node_task_id, NodeStatus.FAILED, "Failed")
except Exception as e:
# 异常处理
await _update_node_status(node_task_id, NodeStatus.FAILED, str(e))
finally:
# 清理资源
if node_instance:
await node_instance.cleanup()节点实例化
async def _create_node_instance(
node_type: str,
node_data: Dict[str, Any],
flow_id: str,
component_id: int,
cycle: int,
node_id: str
):
"""创建节点实例"""
# 1. 确定节点类型
if node_type == "python":
node_class_type = node_data.get("config", {}).get("node_class_type")
else:
node_class_type = node_type
# 2. 解析输入输出边
input_edges = [Edge.from_dict(edge) for edge in node_data.get("input_edges") or []]
output_edges = [Edge.from_dict(edge) for edge in node_data.get("output_edges") or []]
# 3. 准备配置
config = node_data.get("config", {})
config["state_store"] = node_manager.state_store # 共享状态存储
# 4. 使用 NodeRegistry 创建实例
node_instance = node_registry.create_node(
node_class_type=node_class_type,
flow_id=flow_id,
component_id=component_id,
cycle=cycle,
node_id=node_id,
input_edges=input_edges,
output_edges=output_edges,
config=config
)
return node_instanceNodeBase 构造函数
# tradingflow/station/nodes/node_base.py
class NodeBase(abc.ABC):
def __init__(
self,
flow_id: str,
component_id: int,
cycle: int,
node_id: str,
name: str,
input_edges: List[Edge] = None,
output_edges: List[Edge] = None,
state_store: "StateStore" = None,
**kwargs
):
"""初始化节点"""
# 1. 基本属性
self.logger = logging.getLogger(f"Node.{node_id}")
self.flow_id = flow_id
self.component_id = component_id
self.cycle = cycle
self.node_id = node_id
self.name = name
# 2. 边和句柄
self._input_edges = input_edges or []
self._output_edges = output_edges or []
self._input_signals = {} # 存储接收到的信号
self._input_handles: Dict[str, InputHandle] = {}
# 3. 注册输入句柄(子类实现)
self._register_input_handles()
# 4. 状态管理
self.status = NodeStatus.PENDING
self.state_store = state_store
# 5. 消息队列
if self._input_edges:
self.node_signal_consumer = NodeSignalConsumer(
self.flow_id,
self.component_id,
self.cycle,
self.node_id,
self._input_edges,
on_signal_handler=self._on_signal_received
)
else:
self.node_signal_consumer = None
self.node_signal_publisher = NodeSignalPublisher(
self.flow_id,
self.component_id,
self.cycle,
self.node_id,
self._output_edges
)
# 6. 初始化输入信号状态
for edge in self._input_edges:
if edge.target_node == self.node_id:
edge_key = self._get_edge_key(
edge.source_node,
edge.source_node_handle,
edge.target_node_handle
)
self._input_signals[edge_key] = None
# 7. Signal ready future
self._signal_ready_future = None
# 8. 停止执行标记
self._stop_execution_requested = False信号等待机制
信号就绪检查
def can_execute(self) -> bool:
"""检查是否所有必需的输入信号已就绪"""
# 如果没有输入边,可以立即执行
if not self._input_edges:
return True
# 检查所有输入信号是否已接收
for edge_key, signal in self._input_signals.items():
if signal is None:
self.logger.debug(f"Signal not ready: {edge_key}")
return False
self.logger.debug("All required signals are ready")
return True等待信号流程
async def start(self) -> bool:
"""启动节点执行"""
try:
# 1. 初始化状态存储
if not await self.initialize_state_store():
return False
# 2. 初始化消息队列
if not await self.initialize_message_queue():
return False
# 3. 如果没有输入边,直接执行
if not self._input_edges:
await self.set_status(NodeStatus.RUNNING)
return await self.execute()
# 4. 启动信号消费者
consume_task = asyncio.create_task(self.node_signal_consumer.consume())
# 5. 创建 signal_ready_future
self._signal_ready_future = asyncio.Future()
# 6. 等待所有信号就绪或超时
try:
await asyncio.wait_for(
self._signal_ready_future,
timeout=self.signal_timeout if hasattr(self, 'signal_timeout') else 300
)
except asyncio.TimeoutError:
self.logger.error("Timeout waiting for input signals")
await self.set_status(NodeStatus.FAILED, "Signal timeout")
return False
except asyncio.CancelledError:
# 收到停止信号,正常退出
self.logger.info("Execution cancelled by stop signal")
return False
# 7. 执行节点逻辑
await self.set_status(NodeStatus.RUNNING)
return await self.execute()
except NodeStopExecutionException as e:
# 停止执行异常
await self.set_status(NodeStatus.TERMINATED, f"Stopped: {e.reason}")
return False
except Exception as e:
self.logger.exception("Error in node execution")
await self.set_status(NodeStatus.FAILED, str(e))
return False信号接收处理
async def _on_signal_received(
self,
signal: Signal,
handle: str = None,
signal_context: Dict[str, Any] = None
) -> None:
"""处理接收到的信号"""
try:
# 1. 处理停止执行信号
if handle == "STOP_EXECUTION" or signal.type == SignalType.STOP_EXECUTION:
await self._handle_stop_execution_signal(signal)
return
# 2. 从 context 获取源信息
source_node = signal_context.get("source_node")
source_handle = signal_context.get("source_handle")
# 3. 构建 edge_key
edge_key = self._get_edge_key(source_node, source_handle, handle)
# 4. 更新信号状态
if edge_key in self._input_signals:
self._input_signals[edge_key] = signal
# 附加源信息到信号对象
signal.source_handle = source_handle
signal.source_node = source_node
# 持久化日志
await self.persist_log(
message=f"Signal received at {handle} from {source_node}:{source_handle}",
log_level="INFO",
log_source="node",
log_metadata={
'signal_data': {handle: {...}},
'target_handle': handle,
'source_node': source_node
}
)
# 5. 执行默认的信号处理逻辑(自动更新成员变量)
await self._handle_default_signal_processing(handle, signal)
# 6. 调用自定义处理器(如果存在)
handler_name = self._deduce_input_handler_name(handle)
if hasattr(self, handler_name):
await getattr(self, handler_name)(signal)
# 7. 检查是否所有信号就绪
if self._signal_ready_future and not self._signal_ready_future.done():
if self.can_execute():
self._signal_ready_future.set_result(True)
except Exception as e:
self.logger.error("Error processing signal: %s", str(e))
raise节点执行流程
执行方法签名
@abc.abstractmethod
async def execute(self) -> bool:
"""
执行节点的核心逻辑(子类必须实现)
Returns:
bool: 执行是否成功
"""
pass典型执行模式
模式 1:数据处理节点
class BinancePriceNode(NodeBase):
async def execute(self) -> bool:
"""获取币安价格数据"""
try:
# 1. 准备参数
symbol = self.symbol
interval = self.interval
limit = self.limit
# 2. 调用外部 API
response = await self._fetch_klines(symbol, interval, limit)
# 3. 处理数据
kline_data = self._process_kline_data(response)
# 4. 发送输出信号
await self.send_signal(
source_handle="kline_data",
signal_type=SignalType.PRICE_DATA,
payload=kline_data
)
# 5. 更新状态为完成
await self.set_status(NodeStatus.COMPLETED)
return True
except Exception as e:
self.logger.error(f"Error fetching price: {e}")
await self.set_status(NodeStatus.FAILED, str(e))
return False模式 2:条件判断节点
class ConditionNode(NodeBase):
async def execute(self) -> bool:
"""条件判断节点"""
try:
# 1. 获取输入数据
input_data = await self.get_input_signal("condition_input")
# 2. 执行判断逻辑
condition_met = self._evaluate_condition(input_data.payload)
if condition_met:
# 3a. 条件满足,发送继续信号
await self.send_signal(
source_handle="true_output",
signal_type=SignalType.DATA_READY,
payload={"result": "continue"}
)
else:
# 3b. 条件不满足,发送停止信号
await self.node_signal_publisher.send_stop_execution_signal(
reason="Condition not met"
)
await self.set_status(NodeStatus.COMPLETED)
return True
except Exception as e:
await self.set_status(NodeStatus.FAILED, str(e))
return False模式 3:交易执行节点
class BuyNode(NodeBase):
async def execute(self) -> bool:
"""执行买入交易"""
try:
# 1. 获取交易参数(从输入信号或配置)
chain = self.chain
vault_address = self.vault_address
from_token = self.from_token
to_token = self.to_token
amount = self.amount_in_human_readable
# 2. 选择对应链的服务
if chain == "aptos":
service = AptosVaultService.get_instance()
elif chain == "flow_evm":
service = FlowEvmVaultService.get_instance()
else:
raise ValueError(f"Unsupported chain: {chain}")
# 3. 执行交易
receipt = await service.execute_swap(
vault_address=vault_address,
from_token=from_token,
to_token=to_token,
amount_in=amount,
slippage_tolerance=self.slippery_tolerance
)
# 4. 发送交易收据
await self.send_signal(
source_handle="trade_receipt",
signal_type=SignalType.DEX_TRADE_RECEIPT,
payload=receipt
)
await self.set_status(NodeStatus.COMPLETED)
return True
except Exception as e:
self.logger.error(f"Trade execution failed: {e}")
await self.set_status(NodeStatus.FAILED, str(e))
# 可选:发送错误信号到错误处理句柄
await self.send_signal(
source_handle="error",
signal_type=SignalType.ERROR,
payload={"error": str(e)}
)
return False信号发送机制
发送信号
async def send_signal(
self,
source_handle: str,
signal_type: SignalType,
payload: Dict[str, Any] = None
) -> bool:
"""发送信号"""
try:
# 1. 创建信号对象
signal = Signal(
signal_type=signal_type,
payload=payload,
timestamp=None
)
# 2. 发送到消息队列
await self.node_signal_publisher.send_signal(source_handle, signal)
# 3. 持久化日志
await self.persist_log(
message=f"Signal sent from {source_handle}: {signal_type}",
log_level="INFO",
log_source="node",
log_metadata={
'signal_data': {
source_handle: {
'signal_type': signal_type.value,
'payload': payload or {},
'timestamp': signal.timestamp
}
}
}
)
return True
except Exception as e:
self.logger.error(f"Failed to send signal: {e}")
return False获取输入信号
async def get_input_signal(self, handle_name: str) -> Optional[Signal]:
"""获取指定 handle 的输入信号"""
# 遍历所有输入信号,找到匹配的 handle
for edge_key, signal in self._input_signals.items():
# edge_key 格式: "source_node:source_handle->target_handle"
if edge_key.endswith(f"->{handle_name}"):
return signal
return None异常处理
异常类型
# tradingflow/depot/python/exceptions/tf_exception.py
class NodeExecutionException(Exception):
"""节点执行异常基类"""
pass
class NodeStopExecutionException(NodeExecutionException):
"""节点停止执行异常"""
def __init__(self, reason: str, source_node: str):
self.reason = reason
self.source_node = source_node
class NodeTimeoutException(NodeExecutionException):
"""节点超时异常"""
def __init__(self, message: str, timeout_seconds: int):
self.message = message
self.timeout_seconds = timeout_seconds
class NodeValidationException(NodeExecutionException):
"""节点验证异常"""
def __init__(self, message: str, node_id: str, invalid_params: Dict = None):
self.message = message
self.node_id = node_id
self.invalid_params = invalid_params or {}
class NodeResourceException(NodeExecutionException):
"""节点资源异常"""
def __init__(self, message: str, resource_type: str):
self.message = message
self.resource_type = resource_type异常处理流程
# NodeExecutor.execute_node_task()
try:
# 执行节点
success = await node_instance.start()
except NodeStopExecutionException as e:
# 停止执行(正常中止)
await _update_node_status(
node_task_id,
NodeStatus.TERMINATED,
f"Stopped by signal: {e.reason}",
{"stop_reason": e.reason, "source_node": e.source_node}
)
except NodeTimeoutException as e:
# 超时
await _update_node_status(
node_task_id,
NodeStatus.FAILED,
f"Timeout: {e.message}",
{"timeout_seconds": e.timeout_seconds}
)
except NodeValidationException as e:
# 验证失败
await _update_node_status(
node_task_id,
NodeStatus.FAILED,
f"Validation failed: {e.message}",
{"invalid_params": e.invalid_params}
)
except NodeResourceException as e:
# 资源错误
await _update_node_status(
node_task_id,
NodeStatus.FAILED,
f"Resource error: {e.message}",
{"resource_type": e.resource_type}
)
except Exception as e:
# 未知异常
await _update_node_status(
node_task_id,
NodeStatus.FAILED,
f"Unexpected error: {str(e)}"
)资源清理
清理流程
async def cleanup(self):
"""清理节点资源"""
try:
# 1. 关闭消息队列连接
if self.node_signal_consumer:
await self.node_signal_consumer.close()
if self.node_signal_publisher:
await self.node_signal_publisher.close()
# 2. 关闭状态存储连接(仅当节点创建时关闭)
await self.close_state_store()
# 3. 执行子类自定义清理(如果存在)
if hasattr(self, '_cleanup'):
await self._cleanup()
self.logger.info("Node cleanup completed")
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")强制终止
# 通过 Redis 终止标志实现
async def check_termination_flag(self):
"""检查终止标志"""
terminate_flag = await self.state_store.get_termination_flag(self.node_task_id)
if terminate_flag:
raise NodeStopExecutionException(
reason=terminate_flag["reason"],
source_node="system"
)开发新节点
最小节点模板
from tradingflow.station.nodes.node_base import NodeBase, register_node_type
from tradingflow.station.common.signal_types import SignalType
@register_node_type("my_custom_node", default_params={
"param1": "default_value",
"param2": 100
})
class MyCustomNode(NodeBase):
"""自定义节点"""
def __init__(self, **kwargs):
"""初始化节点"""
super().__init__(**kwargs)
# 获取配置参数
self.param1 = kwargs.get("param1", "default_value")
self.param2 = kwargs.get("param2", 100)
def _register_input_handles(self):
"""注册输入句柄"""
from dataclasses import dataclass
from tradingflow.station.nodes.node_base import InputHandle
self._input_handles = {
"input_data": InputHandle(
name="input_data",
data_type=dict,
description="输入数据",
auto_update_attr="input_data"
)
}
async def execute(self) -> bool:
"""执行节点逻辑"""
try:
# 1. 获取输入数据
input_signal = await self.get_input_signal("input_data")
input_data = input_signal.payload if input_signal else {}
# 2. 执行业务逻辑
result = self._process_data(input_data)
# 3. 发送输出信号
await self.send_signal(
source_handle="output_data",
signal_type=SignalType.DATA_READY,
payload=result
)
# 4. 更新状态
await self.set_status(NodeStatus.COMPLETED)
return True
except Exception as e:
self.logger.error(f"Execution failed: {e}")
await self.set_status(NodeStatus.FAILED, str(e))
return False
def _process_data(self, data: dict) -> dict:
"""处理数据(业务逻辑)"""
# 实现具体的处理逻辑
return {"processed": data}开发清单
✅ 必须实现:
继承
NodeBase使用
@register_node_type装饰器注册实现
execute()方法实现
_register_input_handles()方法(如有输入)
✅ 建议实现: 5. 在 __init__ 中处理配置参数 6. 使用 auto_update_attr 自动更新成员变量 7. 完善错误处理和日志记录 8. 发送有意义的输出信号
✅ 可选实现: 9. 自定义信号处理器 _on_{handle_name}_received() 10. 自定义清理逻辑 _cleanup() 11. 超时控制 signal_timeout 12. 聚合 Handle 支持
下一步
继续阅读相关文档:
维护者: TradingFlow 开发团队 版本历史:
v1.0.0 (2025-10-06): 初始版本
Last updated