im_wower
·
2026-03-31
graph.py
1"""
2CIE Graph — 图原生拓扑结构
3
4图是纯拓扑,无维度。节点和边的连接关系是唯一本体。
5权重非对称:W_fwd(u,v) != W_bwd(v,u),这是旋度/极限环的来源。
6"""
7
8from collections import defaultdict
9import math
10import random
11
12
13class Node:
14 """图上的一个节点——概念、经验片段、动作原语、感知特征。"""
15 __slots__ = ('id', 'label', 'meta')
16
17 def __init__(self, node_id: str, label: str = '', meta: dict = None):
18 self.id = node_id
19 self.label = label or node_id
20 self.meta = meta or {}
21
22 def __repr__(self):
23 return f'Node({self.id!r})'
24
25 def __hash__(self):
26 return hash(self.id)
27
28 def __eq__(self, other):
29 return isinstance(other, Node) and self.id == other.id
30
31
32class Edge:
33 """
34 有向边。权重非对称是关键——对称矩阵→不动点(知识),
35 非对称矩阵→极限环(技能)。
36 """
37 __slots__ = ('src', 'dst', 'weight', 'edge_type', 'meta')
38
39 def __init__(self, src: str, dst: str, weight: float = 1.0,
40 edge_type: str = 'default', meta: dict = None):
41 self.src = src
42 self.dst = dst
43 self.weight = weight
44 self.edge_type = edge_type
45 self.meta = meta or {}
46
47 def __repr__(self):
48 return f'Edge({self.src}->{self.dst}, w={self.weight:.3f})'
49
50
51class Graph:
52 """
53 图原生拓扑结构。
54
55 - adjacency list 存储
56 - 非对称权重:fwd_edges[src][dst] 和 bwd_edges[dst][src] 独立
57 - 支持节点/边的增删
58 - 提供图拉普拉斯 L_G 计算
59 """
60
61 def __init__(self):
62 self.nodes: dict[str, Node] = {}
63 # fwd_edges[src][dst] = Edge (正向)
64 self.fwd_edges: dict[str, dict[str, Edge]] = defaultdict(dict)
65 # bwd_edges[dst][src] = Edge (反向,权重独立)
66 self.bwd_edges: dict[str, dict[str, Edge]] = defaultdict(dict)
67
68 @property
69 def node_count(self) -> int:
70 return len(self.nodes)
71
72 @property
73 def edge_count(self) -> int:
74 return sum(len(d) for d in self.fwd_edges.values())
75
76 # ── 节点操作 ──
77
78 def add_node(self, node_id: str, label: str = '', meta: dict = None) -> Node:
79 if node_id in self.nodes:
80 return self.nodes[node_id]
81 node = Node(node_id, label, meta)
82 self.nodes[node_id] = node
83 return node
84
85 def has_node(self, node_id: str) -> bool:
86 return node_id in self.nodes
87
88 def get_node(self, node_id: str) -> Node | None:
89 return self.nodes.get(node_id)
90
91 def remove_node(self, node_id: str):
92 if node_id not in self.nodes:
93 return
94 del self.nodes[node_id]
95 # 清理相关边
96 if node_id in self.fwd_edges:
97 for dst in list(self.fwd_edges[node_id]):
98 self.bwd_edges[dst].pop(node_id, None)
99 del self.fwd_edges[node_id]
100 if node_id in self.bwd_edges:
101 for src in list(self.bwd_edges[node_id]):
102 self.fwd_edges[src].pop(node_id, None)
103 del self.bwd_edges[node_id]
104
105 # ── 边操作 ──
106
107 def add_edge(self, src: str, dst: str, weight: float = 1.0,
108 bwd_weight: float | None = None,
109 edge_type: str = 'default', meta: dict = None):
110 """
111 添加有向边。bwd_weight 若为 None 则等于 weight(对称)。
112 非对称权重是产生旋度/极限环的关键。
113 """
114 # 自动创建节点
115 self.add_node(src)
116 self.add_node(dst)
117
118 fwd_edge = Edge(src, dst, weight, edge_type, meta)
119 self.fwd_edges[src][dst] = fwd_edge
120
121 bw = bwd_weight if bwd_weight is not None else weight
122 bwd_edge = Edge(dst, src, bw, edge_type, meta)
123 self.bwd_edges[dst][src] = bwd_edge
124
125 def get_edge_weight(self, src: str, dst: str) -> float:
126 """获取 src->dst 的正向权重"""
127 if src in self.fwd_edges and dst in self.fwd_edges[src]:
128 return self.fwd_edges[src][dst].weight
129 return 0.0
130
131 def get_bwd_weight(self, dst: str, src: str) -> float:
132 """获取 dst<-src 的反向权重"""
133 if dst in self.bwd_edges and src in self.bwd_edges[dst]:
134 return self.bwd_edges[dst][src].weight
135 return 0.0
136
137 def neighbors_fwd(self, node_id: str) -> list[str]:
138 """正向邻居"""
139 return list(self.fwd_edges.get(node_id, {}).keys())
140
141 def neighbors_bwd(self, node_id: str) -> list[str]:
142 """反向邻居(谁指向我)"""
143 return list(self.bwd_edges.get(node_id, {}).keys())
144
145 def neighbors_all(self, node_id: str) -> set[str]:
146 """所有邻居(不分方向)"""
147 fwd = set(self.fwd_edges.get(node_id, {}).keys())
148 bwd = set(self.bwd_edges.get(node_id, {}).keys())
149 return fwd | bwd
150
151 # ── 图拉普拉斯 ──
152
153 def laplacian_at(self, node_id: str, phi: dict[str, float]) -> float:
154 """
155 计算节点 node_id 处的图拉普拉斯 (L_G φ)(v)。
156 L_G φ(v) = Σ_u w(u,v) * (φ(u) - φ(v))
157 只需邻接关系,无维度。
158 """
159 if node_id not in self.nodes:
160 return 0.0
161
162 phi_v = phi.get(node_id, 0.0)
163 result = 0.0
164
165 # 正向邻居贡献
166 for dst, edge in self.fwd_edges.get(node_id, {}).items():
167 result += edge.weight * (phi.get(dst, 0.0) - phi_v)
168
169 # 反向邻居贡献
170 for src, edge in self.bwd_edges.get(node_id, {}).items():
171 result += edge.weight * (phi.get(src, 0.0) - phi_v)
172
173 return result
174
175 def asymmetry_at(self, node_id: str, phi: dict[str, float]) -> float:
176 """
177 计算非对称项 (W_fwd·φ - W_bwd·φ)(v)。
178 这是旋度的来源——如果 fwd 和 bwd 权重一样,此项为零。
179 """
180 if node_id not in self.nodes:
181 return 0.0
182
183 result = 0.0
184 # 对于每条边 (node_id, dst)
185 for dst, fwd_edge in self.fwd_edges.get(node_id, {}).items():
186 bwd_w = self.get_bwd_weight(node_id, dst)
187 result += (fwd_edge.weight - bwd_w) * phi.get(dst, 0.0)
188
189 return result
190
191 # ── 环流/旋度 ──
192
193 def circulation(self, path: list[str]) -> float:
194 """
195 计算闭合路径的环流 Σ J(u,v)。
196 非零环流 = 旋度 ≠ 0 = 技能/极限环。
197 path 应为闭合的:[a, b, c, a]
198 """
199 if len(path) < 2:
200 return 0.0
201 total = 0.0
202 for i in range(len(path) - 1):
203 total += self.get_edge_weight(path[i], path[i + 1])
204 return total
205
206 # ── 路径汇聚度 κ ──
207
208 def convergence(self, node_id: str) -> float:
209 """
210 路径汇聚度 κ(v) = 有多少条不同路径经过此节点。
211 近似为入度+出度的几何均值,避免零值。
212 """
213 in_deg = len(self.bwd_edges.get(node_id, {}))
214 out_deg = len(self.fwd_edges.get(node_id, {}))
215 return math.sqrt(max(in_deg, 1) * max(out_deg, 1))
216
217 # ── 序列化 ──
218
219 def to_dict(self) -> dict:
220 return {
221 'nodes': {nid: {'label': n.label, 'meta': n.meta}
222 for nid, n in self.nodes.items()},
223 'edges': [
224 {'src': e.src, 'dst': e.dst, 'weight': e.weight,
225 'bwd_weight': self.get_bwd_weight(e.src, e.dst),
226 'type': e.edge_type}
227 for src_edges in self.fwd_edges.values()
228 for e in src_edges.values()
229 ]
230 }
231
232 @classmethod
233 def from_dict(cls, data: dict) -> 'Graph':
234 g = cls()
235 for nid, info in data.get('nodes', {}).items():
236 g.add_node(nid, info.get('label', ''), info.get('meta'))
237 for e in data.get('edges', []):
238 g.add_edge(e['src'], e['dst'], e['weight'],
239 e.get('bwd_weight'), e.get('type', 'default'))
240 return g
241
242 def __repr__(self):
243 return f'Graph(nodes={self.node_count}, edges={self.edge_count})'