im_wower
·
2026-04-01
dynamics.py
1"""
2CIE Dynamics — 动力学引擎
3
4图上扩散、衰减、归巢、沉积——所有高层行为从底层流动规则自然导出。
5代码里只写流动规则,不写标签。
6"""
7
8import math
9import random
10from .graph import Graph
11from .state import CIEState
12
13
14class Dynamics:
15 """
16 动力学引擎——驱动 (φ, μ, J) 的演化。
17
18 参数设计遵循"少参数、强解释性"原则。
19 """
20
21 def __init__(self, graph: Graph, state: CIEState):
22 self.graph = graph
23 self.state = state
24
25 # ── 系统级参数(约束层) ──
26 self.alpha_0 = 0.01 # 基础衰减率
27 self.beta_decay = 1.5 # 衰减指数(置信度越高衰减越慢)
28 self.diffusion_rate = 0.1 # 图上扩散速率 μ
29 self.asym_lambda = 0.05 # 非对称项系数 λ_dir
30 self.homing_lambda1 = 0.1 # 短程归巢力(→能力核)
31 self.homing_lambda2 = 0.02 # 长程归巢力(→锚点核)
32 self.anchor_epsilon = 0.005 # 锚点核阈值
33 self.sediment_threshold = 10 # 经验沉积阈值(激活次数)
34 self.skill_belt_threshold = 30 # 技能带固化阈值
35 self.merge_threshold = 40 # 能力核合并阈值
36 self.phi_damping = 0.02 # φ 全局阻尼——半杯水,不发散
37
38 # ── 图上扩散 ──
39
40 def diffuse_phi(self):
41 """
42 φ_new(v) = φ(v) + μ · (L_G φ)(v) + λ_dir · (W_fwd·φ - W_bwd·φ)(v) - damping·φ(v)
43
44 L_G 是图拉普拉斯。没有维度,没有向量。
45 非对称项是旋度来源。
46 阻尼项防止 φ 无界增长——半杯水原则。
47 Laplacian 按度归一化,防止高权重边放大信号。
48 """
49 phi = self.state.phi
50 new_phi = {}
51 for node_id in self.graph.nodes:
52 lap = self.graph.laplacian_at(node_id, phi)
53 asym = self.graph.asymmetry_at(node_id, phi)
54 # 按加权度归一化(Gemini MATH-001 修复)
55 # 用出入边权重绝对值之和,而非拓扑连接数
56 fwd_w = sum(abs(e.weight) for e in self.graph.fwd_edges.get(node_id, {}).values())
57 bwd_w = sum(abs(e.weight) for e in self.graph.bwd_edges.get(node_id, {}).values())
58 norm = max(fwd_w + bwd_w, 1e-10)
59 phi_v = phi.get(node_id, 0.0)
60 new_phi[node_id] = (
61 phi_v
62 + self.diffusion_rate * (lap / norm)
63 + self.asym_lambda * (asym / norm)
64 - self.phi_damping * phi_v # 全局阻尼
65 )
66 self.state.phi.update(new_phi)
67 # Soft clamp: prevent phi divergence
68 max_phi = max((abs(v) for v in self.state.phi.values()), default=1.0)
69 if max_phi > 10.0:
70 scale = 10.0 / max_phi
71 for k in self.state.phi:
72 self.state.phi[k] *= scale
73
74 # ── 激活传播 ──
75
76 def propagate_mu(self):
77 """
78 激活沿图上的边传播:μ 从高激活节点流向邻居。
79 同时更新边流 J。
80 """
81 mu = self.state.mu
82 new_mu = dict(mu) # copy
83
84 for node_id in list(self.state.active_region):
85 mu_v = mu.get(node_id, 0.0)
86 if mu_v < 1e-10:
87 continue
88
89 neighbors = self.graph.neighbors_fwd(node_id)
90 if not neighbors:
91 continue
92
93 # 按边权重比例传播一部分激活
94 total_weight = sum(
95 self.graph.get_edge_weight(node_id, nb) for nb in neighbors
96 )
97 if total_weight < 1e-10:
98 continue
99
100 spread_ratio = 0.3 # 每步传播 30% 的激活
101 spread_amount = mu_v * spread_ratio
102
103 for nb in neighbors:
104 w = self.graph.get_edge_weight(node_id, nb)
105 flow = spread_amount * (w / total_weight)
106 new_mu[nb] = new_mu.get(nb, 0.0) + flow
107 # 更新边流 J
108 self.state.J[(node_id, nb)] = (
109 self.state.J.get((node_id, nb), 0.0) * 0.9 + flow
110 )
111 # attention accounting: 传播到邻居的激活也要记账
112 self.state.attention.allocate(nb, flow)
113 # 记录经验命中
114 self.state.experience_hits[nb] = (
115 self.state.experience_hits.get(nb, 0) + 1
116 )
117 self.state.active_region.add(nb)
118
119 new_mu[node_id] = mu_v - spread_amount
120 # attention accounting: 释放从源节点流出的激活
121 self.state.attention.release(node_id, spread_amount)
122
123 self.state.mu.update(new_mu)
124 # 清理极低激活
125 dead = [n for n, v in self.state.mu.items() if v < 1e-10]
126 for n in dead:
127 self.state.mu[n] = 0.0
128 self.state.active_region.discard(n)
129
130 # ── 行动释放 ──
131
132 def action_release(self, node_id: str) -> float:
133 """
134 u = o · c · φ(ε)
135 纯乘法,无阈值。
136 o: 能见度(|L_G φ| 的局部值)
137 c: 置信度
138 φ(ε): 残差势场
139 """
140 phi = self.state.phi
141 # o = |L_G φ|(v) 归一化
142 lap = abs(self.graph.laplacian_at(node_id, phi))
143 o = min(lap, 10.0) / 10.0 # 归一化到 [0,1]
144
145 # c = Dirichlet 置信度
146 c = self.state.get_confidence(node_id)
147
148 # φ(ε) = 势场值本身作为残差量度
149 epsilon = abs(phi.get(node_id, 0.0))
150
151 return o * c * epsilon
152
153 # ── 自适应衰减 ──
154
155 def adaptive_decay(self):
156 """
157 α(x) = α₀ · (1 - c(x))^β · (1 / κ(x))
158
159 锚点核不需要显式定义,它自己浮出来:
160 锚点核 = { x : α(x) < ε }
161 """
162 new_anchors = set()
163 decay_rates = []
164
165 for node_id in list(self.graph.nodes):
166 c = self.state.get_confidence(node_id)
167 kappa = self.graph.convergence(node_id)
168
169 # 自适应衰减率
170 alpha = self.alpha_0 * ((1.0 - c) ** self.beta_decay) * (1.0 / kappa)
171
172 # 衰减 φ
173 old_phi = self.state.phi.get(node_id, 0.0)
174 if abs(old_phi) > 1e-10:
175 self.state.phi[node_id] = old_phi * (1.0 - alpha)
176
177 # 衰减 μ(更快)
178 old_mu = self.state.mu.get(node_id, 0.0)
179 if old_mu > 1e-10:
180 decayed = old_mu * alpha * 3.0 # μ 衰减更快
181 self.state.mu[node_id] = max(0.0, old_mu - decayed)
182 if decayed > 1e-6:
183 self.state.attention.release(node_id, decayed)
184 self.state.decay_events.append({
185 'step': self.state.step_count,
186 'node': node_id,
187 'type': 'mu_decay',
188 'amount': decayed,
189 'alpha': alpha,
190 })
191
192 # 记录衰减率用于动态阈值
193 decay_rates.append((node_id, alpha))
194
195 # 动态锚点阈值:取衰减率最低的 10%(最多50个)
196 if decay_rates:
197 decay_rates.sort(key=lambda x: x[1])
198 cutoff = max(1, min(len(decay_rates) // 10, 50))
199 threshold = decay_rates[min(cutoff, len(decay_rates)-1)][1] * 1.1
200 threshold = max(threshold, 1e-6) # 防止零阈值
201 new_anchors = {nid for nid, a in decay_rates if a <= threshold}
202
203 self.state.anchor_nodes = new_anchors
204
205 # ── 三级归巢 ──
206
207 def homing(self):
208 """
209 dx_A/dt = F_task + λ₁·(能力核 - x_A) + λ₂·(锚点核 - x_A)
210
211 无任务时 F_task = 0,激活核先被最近的能力核捕获,
212 最终在锚点核引力场里稳定。
213 """
214 if not self.state.active_region:
215 return
216
217 # 找最近的能力核
218 ability_center_phi = {}
219 for core_id, nodes in self.state.ability_cores.items():
220 if nodes:
221 ability_center_phi[core_id] = (
222 sum(self.state.phi.get(n, 0.0) for n in nodes) / len(nodes)
223 )
224
225 # 锚点核中心
226 anchor_center_phi = 0.0
227 if self.state.anchor_nodes:
228 anchor_center_phi = (
229 sum(self.state.phi.get(n, 0.0) for n in self.state.anchor_nodes)
230 / len(self.state.anchor_nodes)
231 )
232
233 for node_id in list(self.state.active_region):
234 phi_v = self.state.phi.get(node_id, 0.0)
235
236 # 短程归巢——拉向最近的能力核
237 pull1 = 0.0
238 nearest_core = None
239 min_dist = float('inf')
240 for core_id, center in ability_center_phi.items():
241 dist = abs(center - phi_v)
242 if dist < min_dist:
243 min_dist = dist
244 nearest_core = core_id
245 pull1 = self.homing_lambda1 * (center - phi_v)
246
247 # 长程归巢——拉向锚点核
248 pull2 = self.homing_lambda2 * (anchor_center_phi - phi_v)
249
250 # 更新 phi
251 self.state.phi[node_id] = phi_v + pull1 + pull2
252
253 # 归巢也微弱影响 mu:向能力核方向的节点获得微量激活
254 if nearest_core and nearest_core in self.state.ability_cores:
255 for cn in list(self.state.ability_cores[nearest_core])[:3]:
256 if cn != node_id:
257 self.state.mu[cn] = self.state.mu.get(cn, 0.0) + abs(pull1) * 0.01
258
259 if nearest_core:
260 self.state.bound_ability_core = nearest_core
261
262 # ── 经验沉积 ──
263
264 def sediment(self):
265 """
266 沉积路径:记忆层 → 经验层 → 技能带 → 能力核
267
268 使用 experience_hits 检测,但引入"最近沉积步数"避免饱和:
269 只有当 hits 在最近 window 步内增长了才记录新的 trace。
270 """
271 window = 50 # 滑动窗口:最近50步内的新增才算
272 last_sed_hits = getattr(self, "_last_sed_hits", {})
273
274 for node_id, hits in list(self.state.experience_hits.items()):
275 # 检查该节点上次沉积时的 hits
276 prev_hits = last_sed_hits.get(node_id, 0)
277 recent_growth = hits - prev_hits
278
279 # 记忆层 → 经验层
280 if hits >= self.sediment_threshold:
281 if 'experience' not in self.state.experience_regions:
282 self.state.experience_regions['experience'] = set()
283 if node_id not in self.state.experience_regions['experience']:
284 self.state.experience_regions['experience'].add(node_id)
285 self.state.sedimentation_trace.append({
286 'step': self.state.step_count,
287 'node': node_id,
288 'transition': 'memory -> experience',
289 'hits': hits,
290 })
291 last_sed_hits[node_id] = hits
292
293 # 经验层 → 技能带候选(需要持续增长)
294 if hits >= self.skill_belt_threshold:
295 old_score = self.state.skill_belt_candidates.get(node_id, 0.0)
296 new_score = hits / self.merge_threshold
297 if new_score > old_score + 0.05: # 需要显著增长
298 self.state.skill_belt_candidates[node_id] = new_score
299 if old_score == 0.0:
300 self.state.sedimentation_trace.append({
301 'step': self.state.step_count,
302 'node': node_id,
303 'transition': 'experience -> skill_belt',
304 'hits': hits,
305 })
306 last_sed_hits[node_id] = hits
307
308 # 技能带 → 能力核(需要持续增长,且最近有新激活)
309 if hits >= self.merge_threshold and recent_growth >= self.sediment_threshold:
310 merged = False
311 for core_id, core_nodes in self.state.ability_cores.items():
312 for cn in list(core_nodes):
313 if (self.graph.get_edge_weight(node_id, cn) > 0 or
314 self.graph.get_edge_weight(cn, node_id) > 0):
315 core_nodes.add(node_id)
316 self.state.merge_events.append({
317 'step': self.state.step_count,
318 'node': node_id,
319 'core': core_id,
320 'transition': 'skill_belt -> ability_core',
321 })
322 merged = True
323 break
324 if merged:
325 break
326
327 if not merged:
328 core_id = f'core_{len(self.state.ability_cores)}'
329 self.state.ability_cores[core_id] = {node_id}
330 self.state.merge_events.append({
331 'step': self.state.step_count,
332 'node': node_id,
333 'core': core_id,
334 'transition': 'new_ability_core',
335 })
336 last_sed_hits[node_id] = hits
337
338 if not hasattr(self, '_last_sed_hits'):
339 self._last_sed_hits = {}
340 self._last_sed_hits.update(last_sed_hits)
341
342 # ── 边流衰减 ──
343
344 def decay_edges(self):
345 """边流 J 自然衰减"""
346 dead_edges = []
347 for edge_key, flow in self.state.J.items():
348 new_flow = flow * (1.0 - self.alpha_0 * 0.5)
349 if abs(new_flow) < 1e-10:
350 dead_edges.append(edge_key)
351 else:
352 self.state.J[edge_key] = new_flow
353 for k in dead_edges:
354 del self.state.J[k]
355
356 # ── 完整的一步 ──
357
358 def step(self):
359 """
360 一个完整的动力学步骤:
361 1. 图上扩散
362 2. 激活传播
363 3. 自适应衰减
364 4. 归巢
365 5. 沉积检测
366 6. 边流衰减
367 7. 更新输出模式
368 """
369 self.diffuse_phi()
370 self.propagate_mu()
371 self.adaptive_decay()
372 # 置信度自然衰减——遗忘是可塑性的必要条件
373 self.state.decay_all_confidence(rate=0.002)
374 self.homing()
375 self.sediment()
376 self.decay_edges()
377 self.state.update_output_mode()
378 self.state.step_count += 1