CIE-Unified

git clone 

CIE-Unified / cie
im_wower  ·  2026-04-01

graph.py

  1"""
  2CIE Graph — 图原生拓扑结构
  3
  4图是纯拓扑,无维度。节点和边的连接关系是唯一本体。
  5权重非对称:W_fwd(u,v) != W_bwd(v,u),这是旋度/极限环的来源。
  6"""
  7
  8from collections import defaultdict
  9import math
 10import random
 11
 12
 13class Node:
 14    """图上的一个节点——概念、经验片段、动作原语、感知特征。"""
 15    __slots__ = ('id', 'label', 'meta')
 16
 17    def __init__(self, node_id: str, label: str = '', meta: dict = None):
 18        self.id = node_id
 19        self.label = label or node_id
 20        self.meta = meta or {}
 21
 22    def __repr__(self):
 23        return f'Node({self.id!r})'
 24
 25    def __hash__(self):
 26        return hash(self.id)
 27
 28    def __eq__(self, other):
 29        return isinstance(other, Node) and self.id == other.id
 30
 31
 32class Edge:
 33    """
 34    有向边。权重非对称是关键——对称矩阵→不动点(知识),
 35    非对称矩阵→极限环(技能)。
 36    """
 37    __slots__ = ('src', 'dst', 'weight', 'edge_type', 'meta')
 38
 39    def __init__(self, src: str, dst: str, weight: float = 1.0,
 40                 edge_type: str = 'default', meta: dict = None):
 41        self.src = src
 42        self.dst = dst
 43        self.weight = weight
 44        self.edge_type = edge_type
 45        self.meta = meta or {}
 46
 47    def __repr__(self):
 48        return f'Edge({self.src}->{self.dst}, w={self.weight:.3f})'
 49
 50
 51class Graph:
 52    """
 53    图原生拓扑结构。
 54    
 55    - adjacency list 存储
 56    - 非对称权重:fwd_edges[src][dst] 和 bwd_edges[dst][src] 独立
 57    - 支持节点/边的增删
 58    - 提供图拉普拉斯 L_G 计算
 59    """
 60
 61    def __init__(self):
 62        self.nodes: dict[str, Node] = {}
 63        # fwd_edges[src][dst] = Edge (正向)
 64        self.fwd_edges: dict[str, dict[str, Edge]] = defaultdict(dict)
 65        # bwd_edges[dst][src] = Edge (反向,权重独立)
 66        self.bwd_edges: dict[str, dict[str, Edge]] = defaultdict(dict)
 67
 68    @property
 69    def node_count(self) -> int:
 70        return len(self.nodes)
 71
 72    @property
 73    def edge_count(self) -> int:
 74        return sum(len(d) for d in self.fwd_edges.values())
 75
 76    # ── 节点操作 ──
 77
 78    def add_node(self, node_id: str, label: str = '', meta: dict = None) -> Node:
 79        if node_id in self.nodes:
 80            return self.nodes[node_id]
 81        node = Node(node_id, label, meta)
 82        self.nodes[node_id] = node
 83        return node
 84
 85    def has_node(self, node_id: str) -> bool:
 86        return node_id in self.nodes
 87
 88    def get_node(self, node_id: str) -> Node | None:
 89        return self.nodes.get(node_id)
 90
 91    def remove_node(self, node_id: str):
 92        if node_id not in self.nodes:
 93            return
 94        del self.nodes[node_id]
 95        # 清理相关边
 96        if node_id in self.fwd_edges:
 97            for dst in list(self.fwd_edges[node_id]):
 98                self.bwd_edges[dst].pop(node_id, None)
 99            del self.fwd_edges[node_id]
100        if node_id in self.bwd_edges:
101            for src in list(self.bwd_edges[node_id]):
102                self.fwd_edges[src].pop(node_id, None)
103            del self.bwd_edges[node_id]
104
105    # ── 边操作 ──
106
107    def add_edge(self, src: str, dst: str, weight: float = 1.0,
108                 bwd_weight: float | None = None,
109                 edge_type: str = 'default', meta: dict = None):
110        """
111        添加有向边。bwd_weight 若为 None 则等于 weight(对称)。
112        非对称权重是产生旋度/极限环的关键。
113        """
114        # 自动创建节点
115        self.add_node(src)
116        self.add_node(dst)
117
118        fwd_edge = Edge(src, dst, weight, edge_type, meta)
119        self.fwd_edges[src][dst] = fwd_edge
120
121        bw = bwd_weight if bwd_weight is not None else weight
122        bwd_edge = Edge(dst, src, bw, edge_type, meta)
123        self.bwd_edges[dst][src] = bwd_edge
124
125    def get_edge_weight(self, src: str, dst: str) -> float:
126        """获取 src->dst 的正向权重"""
127        if src in self.fwd_edges and dst in self.fwd_edges[src]:
128            return self.fwd_edges[src][dst].weight
129        return 0.0
130
131    def get_bwd_weight(self, dst: str, src: str) -> float:
132        """获取 dst<-src 的反向权重"""
133        if dst in self.bwd_edges and src in self.bwd_edges[dst]:
134            return self.bwd_edges[dst][src].weight
135        return 0.0
136
137    def neighbors_fwd(self, node_id: str) -> list[str]:
138        """正向邻居"""
139        return list(self.fwd_edges.get(node_id, {}).keys())
140
141    def neighbors_bwd(self, node_id: str) -> list[str]:
142        """反向邻居(谁指向我)"""
143        return list(self.bwd_edges.get(node_id, {}).keys())
144
145    def neighbors_all(self, node_id: str) -> set[str]:
146        """所有邻居(不分方向)"""
147        fwd = set(self.fwd_edges.get(node_id, {}).keys())
148        bwd = set(self.bwd_edges.get(node_id, {}).keys())
149        return fwd | bwd
150
151    # ── 图拉普拉斯 ──
152
153    def laplacian_at(self, node_id: str, phi: dict[str, float]) -> float:
154        """
155        计算节点 node_id 处的图拉普拉斯 (L_G φ)(v)。
156        L_G φ(v) = Σ_u w(u,v) * (φ(u) - φ(v))
157        只需邻接关系,无维度。
158        """
159        if node_id not in self.nodes:
160            return 0.0
161
162        phi_v = phi.get(node_id, 0.0)
163        result = 0.0
164
165        # 正向邻居贡献
166        for dst, edge in self.fwd_edges.get(node_id, {}).items():
167            result += edge.weight * (phi.get(dst, 0.0) - phi_v)
168
169        # 反向邻居贡献
170        for src, edge in self.bwd_edges.get(node_id, {}).items():
171            result += edge.weight * (phi.get(src, 0.0) - phi_v)
172
173        return result
174
175    def asymmetry_at(self, node_id: str, phi: dict[str, float]) -> float:
176        """
177        计算非对称项 (W_fwd·φ - W_bwd·φ)(v)。
178        这是旋度的来源——如果 fwd 和 bwd 权重一样,此项为零。
179        """
180        if node_id not in self.nodes:
181            return 0.0
182
183        result = 0.0
184        # 对于每条边 (node_id, dst)
185        for dst, fwd_edge in self.fwd_edges.get(node_id, {}).items():
186            bwd_w = self.get_bwd_weight(dst, node_id)  # dst←node_id 的反向权重
187            result += (fwd_edge.weight - bwd_w) * phi.get(dst, 0.0)
188
189        return result
190
191    # ── 环流/旋度 ──
192
193    def circulation(self, path: list[str]) -> float:
194        """
195        计算闭合路径的净环流 Σ (W_fwd - W_bwd)。
196        净流 ≠ 0 = 旋度 ≠ 0 = 技能/极限环。
197        对称图上净环流恒为零——只有非对称权重才产生旋度。
198        path 应为闭合的:[a, b, c, a]
199        """
200        if len(path) < 2:
201            return 0.0
202        total = 0.0
203        for i in range(len(path) - 1):
204            fwd = self.get_edge_weight(path[i], path[i + 1])  # A→B 正向权重
205            bwd = self.get_bwd_weight(path[i + 1], path[i])   # A→B 反向权重(bwd_edges[B][A])
206            total += fwd - bwd  # 净流:正向减反向
207        return total
208
209    # ── 路径汇聚度 κ ──
210
211    def convergence(self, node_id: str) -> float:
212        """
213        路径汇聚度 κ(v) = 有多少条不同路径经过此节点。
214        近似为入度+出度的几何均值,避免零值。
215        """
216        in_deg = len(self.bwd_edges.get(node_id, {}))
217        out_deg = len(self.fwd_edges.get(node_id, {}))
218        return math.sqrt(max(in_deg, 1) * max(out_deg, 1))
219
220    # ── 序列化 ──
221
222    def to_dict(self) -> dict:
223        return {
224            'nodes': {nid: {'label': n.label, 'meta': n.meta}
225                      for nid, n in self.nodes.items()},
226            'edges': [
227                {'src': e.src, 'dst': e.dst, 'weight': e.weight,
228                 'bwd_weight': self.get_bwd_weight(e.dst, e.src),  # dst←src 的反向权重
229                 'type': e.edge_type}
230                for src_edges in self.fwd_edges.values()
231                for e in src_edges.values()
232            ]
233        }
234
235    @classmethod
236    def from_dict(cls, data: dict) -> 'Graph':
237        g = cls()
238        for nid, info in data.get('nodes', {}).items():
239            g.add_node(nid, info.get('label', ''), info.get('meta'))
240        for e in data.get('edges', []):
241            g.add_edge(e['src'], e['dst'], e['weight'],
242                       e.get('bwd_weight'), e.get('type', 'default'))
243        return g
244
245    def __repr__(self):
246        return f'Graph(nodes={self.node_count}, edges={self.edge_count})'