# tradingflow/station/flow/flow_parser.py
class FlowParser:
"""Flow 解析器:解析 Weather Syntax 并识别 DAG 结构"""
def __init__(self, flow_json: dict):
"""初始化解析器"""
self.flow_json = flow_json
self.nodes = []
self.edges = []
self.graph = defaultdict(list)
self.node_map = {}
if self.flow_json:
self._parse_flow()
def _parse_flow(self):
"""解析 Flow JSON,提取节点和边"""
self.nodes = self.flow_json.get("nodes", [])
self.edges = self.flow_json.get("edges", [])
# 创建节点映射
for i, node in enumerate(self.nodes):
node_id = node.get("id")
self.node_map[node_id] = i
# 构建图的邻接表
for edge in self.edges:
source = edge.get("source")
target = edge.get("target")
if source in self.node_map and target in self.node_map:
self.graph[source].append(target)
def _find_connected_components(self) -> List[Set[str]]:
"""找出图中的所有连通分量"""
# 1. 构建无向图
undirected_graph = defaultdict(list)
for source, targets in self.graph.items():
for target in targets:
undirected_graph[source].append(target)
undirected_graph[target].append(source)
# 2. BFS 遍历
visited = set()
components = []
for node_id in self.node_map:
if node_id not in visited:
# 找到一个新的连通分量
component = set()
queue = deque([node_id])
visited.add(node_id)
component.add(node_id)
while queue:
current = queue.popleft()
for neighbor in undirected_graph[current]:
if neighbor not in visited:
visited.add(neighbor)
component.add(neighbor)
queue.append(neighbor)
components.append(component)
return components
def _is_dag(self, component: Set[str]) -> bool:
"""检查连通分量是否为 DAG(无环)"""
# 三色标记:0=未访问, 1=正在访问, 2=已访问完成
status = {node_id: 0 for node_id in component}
def dfs(node_id):
status[node_id] = 1 # 正在访问
for neighbor in self.graph[node_id]:
if neighbor in component:
if status[neighbor] == 0: # 未访问
if not dfs(neighbor):
return False
elif status[neighbor] == 1: # 回边,有环
return False
status[node_id] = 2 # 访问完成
return True
for node_id in component:
if status[node_id] == 0:
if not dfs(node_id):
return False
return True
def _find_entry_nodes(self, component: Set[str]) -> List[str]:
"""
找出连通分量的入口节点(没有入边的节点)
Returns:
入口节点 ID 列表
"""
# 找出所有有入边的节点
nodes_with_incoming = set()
for source in component:
for target in self.graph[source]:
if target in component:
nodes_with_incoming.add(target)
# 入口节点 = 所有节点 - 有入边的节点
entry_nodes = component - nodes_with_incoming
return list(entry_nodes)
def analyze_flow(self) -> Dict[str, Any]:
"""分析 Flow 结构"""
components = self._find_connected_components()
result = {
"component_count": len(components),
"components": {}
}
for i, component in enumerate(components):
if self._is_dag(component):
entry_nodes = self._find_entry_nodes(component)
result["components"][str(i)] = {
"nodes": list(component),
"entry_nodes": entry_nodes,
"node_count": len(component),
"is_dag": True
}
else:
result["components"][str(i)] = {
"nodes": list(component),
"is_dag": False,
"error": "Contains cycle"
}
return result