CIE-Unified

git clone 

CIE-Unified / cie
im_wower  ·  2026-04-01

dynamics.py

  1"""
  2CIE Dynamics — 动力学引擎
  3
  4图上扩散、衰减、归巢、沉积——所有高层行为从底层流动规则自然导出。
  5代码里只写流动规则,不写标签。
  6"""
  7
  8import math
  9import random
 10from .graph import Graph
 11from .state import CIEState
 12
 13
 14class Dynamics:
 15    """
 16    动力学引擎——驱动 (φ, μ, J) 的演化。
 17    
 18    参数设计遵循"少参数、强解释性"原则。
 19    """
 20
 21    def __init__(self, graph: Graph, state: CIEState):
 22        self.graph = graph
 23        self.state = state
 24
 25        # ── 系统级参数(约束层) ──
 26        self.alpha_0 = 0.01       # 基础衰减率
 27        self.beta_decay = 1.5     # 衰减指数(置信度越高衰减越慢)
 28        self.diffusion_rate = 0.1 # 图上扩散速率 μ
 29        self.asym_lambda = 0.05   # 非对称项系数 λ_dir
 30        self.homing_lambda1 = 0.1 # 短程归巢力(→能力核)
 31        self.homing_lambda2 = 0.02  # 长程归巢力(→锚点核)
 32        self.anchor_epsilon = 0.005  # 锚点核阈值
 33        self.sediment_threshold = 10  # 经验沉积阈值(激活次数)
 34        self.skill_belt_threshold = 30  # 技能带固化阈值
 35        self.merge_threshold = 40   # 能力核合并阈值
 36        self.phi_damping = 0.02     # φ 全局阻尼——半杯水,不发散
 37
 38    # ── 图上扩散 ──
 39
 40    def diffuse_phi(self):
 41        """
 42        φ_new(v) = φ(v) + μ · (L_G φ)(v) + λ_dir · (W_fwd·φ - W_bwd·φ)(v) - damping·φ(v)
 43        
 44        L_G 是图拉普拉斯。没有维度,没有向量。
 45        非对称项是旋度来源。
 46        阻尼项防止 φ 无界增长——半杯水原则。
 47        Laplacian 按度归一化,防止高权重边放大信号。
 48        """
 49        phi = self.state.phi
 50        new_phi = {}
 51        for node_id in self.graph.nodes:
 52            lap = self.graph.laplacian_at(node_id, phi)
 53            asym = self.graph.asymmetry_at(node_id, phi)
 54            # 按加权度归一化(Gemini MATH-001 修复)
 55            # 用出入边权重绝对值之和,而非拓扑连接数
 56            fwd_w = sum(abs(e.weight) for e in self.graph.fwd_edges.get(node_id, {}).values())
 57            bwd_w = sum(abs(e.weight) for e in self.graph.bwd_edges.get(node_id, {}).values())
 58            norm = max(fwd_w + bwd_w, 1e-10)
 59            phi_v = phi.get(node_id, 0.0)
 60            new_phi[node_id] = (
 61                phi_v
 62                + self.diffusion_rate * (lap / norm)
 63                + self.asym_lambda * (asym / norm)
 64                - self.phi_damping * phi_v  # 全局阻尼
 65            )
 66        self.state.phi.update(new_phi)
 67        # Soft clamp: prevent phi divergence
 68        max_phi = max((abs(v) for v in self.state.phi.values()), default=1.0)
 69        if max_phi > 10.0:
 70            scale = 10.0 / max_phi
 71            for k in self.state.phi:
 72                self.state.phi[k] *= scale
 73
 74    # ── 激活传播 ──
 75
 76    def propagate_mu(self):
 77        """
 78        激活沿图上的边传播:μ 从高激活节点流向邻居。
 79        同时更新边流 J。
 80        """
 81        mu = self.state.mu
 82        new_mu = dict(mu)  # copy
 83        
 84        for node_id in list(self.state.active_region):
 85            mu_v = mu.get(node_id, 0.0)
 86            if mu_v < 1e-10:
 87                continue
 88            
 89            neighbors = self.graph.neighbors_fwd(node_id)
 90            if not neighbors:
 91                continue
 92            
 93            # 按边权重比例传播一部分激活
 94            total_weight = sum(
 95                self.graph.get_edge_weight(node_id, nb) for nb in neighbors
 96            )
 97            if total_weight < 1e-10:
 98                continue
 99
100            spread_ratio = 0.3  # 每步传播 30% 的激活
101            spread_amount = mu_v * spread_ratio
102
103            for nb in neighbors:
104                w = self.graph.get_edge_weight(node_id, nb)
105                flow = spread_amount * (w / total_weight)
106                new_mu[nb] = new_mu.get(nb, 0.0) + flow
107                # 更新边流 J
108                self.state.J[(node_id, nb)] = (
109                    self.state.J.get((node_id, nb), 0.0) * 0.9 + flow
110                )
111                # attention accounting: 传播到邻居的激活也要记账
112                self.state.attention.allocate(nb, flow)
113                # 记录经验命中
114                self.state.experience_hits[nb] = (
115                    self.state.experience_hits.get(nb, 0) + 1
116                )
117                self.state.active_region.add(nb)
118
119            new_mu[node_id] = mu_v - spread_amount
120            # attention accounting: 释放从源节点流出的激活
121            self.state.attention.release(node_id, spread_amount)
122
123        self.state.mu.update(new_mu)
124        # 清理极低激活
125        dead = [n for n, v in self.state.mu.items() if v < 1e-10]
126        for n in dead:
127            self.state.mu[n] = 0.0
128            self.state.active_region.discard(n)
129
130    # ── 行动释放 ──
131
132    def action_release(self, node_id: str) -> float:
133        """
134        u = o · c · φ(ε)
135        纯乘法,无阈值。
136        o: 能见度(|L_G φ| 的局部值)
137        c: 置信度
138        φ(ε): 残差势场
139        """
140        phi = self.state.phi
141        # o = |L_G φ|(v) 归一化
142        lap = abs(self.graph.laplacian_at(node_id, phi))
143        o = min(lap, 10.0) / 10.0  # 归一化到 [0,1]
144
145        # c = Dirichlet 置信度
146        c = self.state.get_confidence(node_id)
147
148        # φ(ε) = 势场值本身作为残差量度
149        epsilon = abs(phi.get(node_id, 0.0))
150
151        return o * c * epsilon
152
153    # ── 自适应衰减 ──
154
155    def adaptive_decay(self):
156        """
157        α(x) = α₀ · (1 - c(x))^β · (1 / κ(x))
158        
159        锚点核不需要显式定义,它自己浮出来:
160        锚点核 = { x : α(x) < ε }
161        """
162        new_anchors = set()
163        decay_rates = []
164        
165        for node_id in list(self.graph.nodes):
166            c = self.state.get_confidence(node_id)
167            kappa = self.graph.convergence(node_id)
168            
169            # 自适应衰减率
170            alpha = self.alpha_0 * ((1.0 - c) ** self.beta_decay) * (1.0 / kappa)
171            
172            # 衰减 φ
173            old_phi = self.state.phi.get(node_id, 0.0)
174            if abs(old_phi) > 1e-10:
175                self.state.phi[node_id] = old_phi * (1.0 - alpha)
176
177            # 衰减 μ(更快)
178            old_mu = self.state.mu.get(node_id, 0.0)
179            if old_mu > 1e-10:
180                decayed = old_mu * alpha * 3.0  # μ 衰减更快
181                self.state.mu[node_id] = max(0.0, old_mu - decayed)
182                if decayed > 1e-6:
183                    self.state.attention.release(node_id, decayed)
184                    self.state.decay_events.append({
185                        'step': self.state.step_count,
186                        'node': node_id,
187                        'type': 'mu_decay',
188                        'amount': decayed,
189                        'alpha': alpha,
190                    })
191
192            # 记录衰减率用于动态阈值
193            decay_rates.append((node_id, alpha))
194
195        # 动态锚点阈值:取衰减率最低的 10%(最多50个)
196        if decay_rates:
197            decay_rates.sort(key=lambda x: x[1])
198            cutoff = max(1, min(len(decay_rates) // 10, 50))
199            threshold = decay_rates[min(cutoff, len(decay_rates)-1)][1] * 1.1
200            threshold = max(threshold, 1e-6)  # 防止零阈值
201            new_anchors = {nid for nid, a in decay_rates if a <= threshold}
202
203        self.state.anchor_nodes = new_anchors
204
205    # ── 三级归巢 ──
206
207    def homing(self):
208        """
209        dx_A/dt = F_task + λ₁·(能力核 - x_A) + λ₂·(锚点核 - x_A)
210        
211        无任务时 F_task = 0,激活核先被最近的能力核捕获,
212        最终在锚点核引力场里稳定。
213        """
214        if not self.state.active_region:
215            return
216
217        # 找最近的能力核
218        ability_center_phi = {}
219        for core_id, nodes in self.state.ability_cores.items():
220            if nodes:
221                ability_center_phi[core_id] = (
222                    sum(self.state.phi.get(n, 0.0) for n in nodes) / len(nodes)
223                )
224
225        # 锚点核中心
226        anchor_center_phi = 0.0
227        if self.state.anchor_nodes:
228            anchor_center_phi = (
229                sum(self.state.phi.get(n, 0.0) for n in self.state.anchor_nodes)
230                / len(self.state.anchor_nodes)
231            )
232
233        for node_id in list(self.state.active_region):
234            phi_v = self.state.phi.get(node_id, 0.0)
235            
236            # 短程归巢——拉向最近的能力核
237            pull1 = 0.0
238            nearest_core = None
239            min_dist = float('inf')
240            for core_id, center in ability_center_phi.items():
241                dist = abs(center - phi_v)
242                if dist < min_dist:
243                    min_dist = dist
244                    nearest_core = core_id
245                    pull1 = self.homing_lambda1 * (center - phi_v)
246
247            # 长程归巢——拉向锚点核
248            pull2 = self.homing_lambda2 * (anchor_center_phi - phi_v)
249
250            # 更新 phi
251            self.state.phi[node_id] = phi_v + pull1 + pull2
252
253            # 归巢也微弱影响 mu:向能力核方向的节点获得微量激活
254            if nearest_core and nearest_core in self.state.ability_cores:
255                for cn in list(self.state.ability_cores[nearest_core])[:3]:
256                    if cn != node_id:
257                        self.state.mu[cn] = self.state.mu.get(cn, 0.0) + abs(pull1) * 0.01
258
259            if nearest_core:
260                self.state.bound_ability_core = nearest_core
261
262    # ── 经验沉积 ──
263
264    def sediment(self):
265        """
266        沉积路径:记忆层 → 经验层 → 技能带 → 能力核
267        
268        使用 experience_hits 检测,但引入"最近沉积步数"避免饱和:
269        只有当 hits 在最近 window 步内增长了才记录新的 trace。
270        """
271        window = 50  # 滑动窗口:最近50步内的新增才算
272        last_sed_hits = getattr(self, "_last_sed_hits", {})
273        
274        for node_id, hits in list(self.state.experience_hits.items()):
275            # 检查该节点上次沉积时的 hits
276            prev_hits = last_sed_hits.get(node_id, 0)
277            recent_growth = hits - prev_hits
278            
279            # 记忆层 → 经验层
280            if hits >= self.sediment_threshold:
281                if 'experience' not in self.state.experience_regions:
282                    self.state.experience_regions['experience'] = set()
283                if node_id not in self.state.experience_regions['experience']:
284                    self.state.experience_regions['experience'].add(node_id)
285                    self.state.sedimentation_trace.append({
286                        'step': self.state.step_count,
287                        'node': node_id,
288                        'transition': 'memory -> experience',
289                        'hits': hits,
290                    })
291                    last_sed_hits[node_id] = hits
292
293            # 经验层 → 技能带候选(需要持续增长)
294            if hits >= self.skill_belt_threshold:
295                old_score = self.state.skill_belt_candidates.get(node_id, 0.0)
296                new_score = hits / self.merge_threshold
297                if new_score > old_score + 0.05:  # 需要显著增长
298                    self.state.skill_belt_candidates[node_id] = new_score
299                    if old_score == 0.0:
300                        self.state.sedimentation_trace.append({
301                            'step': self.state.step_count,
302                            'node': node_id,
303                            'transition': 'experience -> skill_belt',
304                            'hits': hits,
305                        })
306                    last_sed_hits[node_id] = hits
307
308            # 技能带 → 能力核(需要持续增长,且最近有新激活)
309            if hits >= self.merge_threshold and recent_growth >= self.sediment_threshold:
310                merged = False
311                for core_id, core_nodes in self.state.ability_cores.items():
312                    for cn in list(core_nodes):
313                        if (self.graph.get_edge_weight(node_id, cn) > 0 or
314                            self.graph.get_edge_weight(cn, node_id) > 0):
315                            core_nodes.add(node_id)
316                            self.state.merge_events.append({
317                                'step': self.state.step_count,
318                                'node': node_id,
319                                'core': core_id,
320                                'transition': 'skill_belt -> ability_core',
321                            })
322                            merged = True
323                            break
324                    if merged:
325                        break
326
327                if not merged:
328                    core_id = f'core_{len(self.state.ability_cores)}'
329                    self.state.ability_cores[core_id] = {node_id}
330                    self.state.merge_events.append({
331                        'step': self.state.step_count,
332                        'node': node_id,
333                        'core': core_id,
334                        'transition': 'new_ability_core',
335                    })
336                last_sed_hits[node_id] = hits
337        
338        if not hasattr(self, '_last_sed_hits'):
339            self._last_sed_hits = {}
340        self._last_sed_hits.update(last_sed_hits)
341
342    # ── 边流衰减 ──
343
344    def decay_edges(self):
345        """边流 J 自然衰减"""
346        dead_edges = []
347        for edge_key, flow in self.state.J.items():
348            new_flow = flow * (1.0 - self.alpha_0 * 0.5)
349            if abs(new_flow) < 1e-10:
350                dead_edges.append(edge_key)
351            else:
352                self.state.J[edge_key] = new_flow
353        for k in dead_edges:
354            del self.state.J[k]
355
356    # ── 完整的一步 ──
357
358    def step(self):
359        """
360        一个完整的动力学步骤:
361        1. 图上扩散
362        2. 激活传播
363        3. 自适应衰减
364        4. 归巢
365        5. 沉积检测
366        6. 边流衰减
367        7. 更新输出模式
368        """
369        self.diffuse_phi()
370        self.propagate_mu()
371        self.adaptive_decay()
372        # 置信度自然衰减——遗忘是可塑性的必要条件
373        self.state.decay_all_confidence(rate=0.002)
374        self.homing()
375        self.sediment()
376        self.decay_edges()
377        self.state.update_output_mode()
378        self.state.step_count += 1