im_wower
·
2026-03-31
dynamics.py
1"""
2CIE Dynamics — 动力学引擎
3
4图上扩散、衰减、归巢、沉积——所有高层行为从底层流动规则自然导出。
5代码里只写流动规则,不写标签。
6"""
7
8import math
9import random
10from .graph import Graph
11from .state import CIEState
12
13
14class Dynamics:
15 """
16 动力学引擎——驱动 (φ, μ, J) 的演化。
17
18 参数设计遵循"少参数、强解释性"原则。
19 """
20
21 def __init__(self, graph: Graph, state: CIEState):
22 self.graph = graph
23 self.state = state
24
25 # ── 系统级参数(约束层) ──
26 self.alpha_0 = 0.01 # 基础衰减率
27 self.beta_decay = 1.5 # 衰减指数(置信度越高衰减越慢)
28 self.diffusion_rate = 0.1 # 图上扩散速率 μ
29 self.asym_lambda = 0.05 # 非对称项系数 λ_dir
30 self.homing_lambda1 = 0.1 # 短程归巢力(→能力核)
31 self.homing_lambda2 = 0.02 # 长程归巢力(→锚点核)
32 self.anchor_epsilon = 0.005 # 锚点核阈值
33 self.sediment_threshold = 10 # 经验沉积阈值(激活次数)
34 self.skill_belt_threshold = 30 # 技能带固化阈值
35 self.merge_threshold = 40 # 能力核合并阈值
36 self.phi_damping = 0.02 # φ 全局阻尼——半杯水,不发散
37
38 # ── 图上扩散 ──
39
40 def diffuse_phi(self):
41 """
42 φ_new(v) = φ(v) + μ · (L_G φ)(v) + λ_dir · (W_fwd·φ - W_bwd·φ)(v) - damping·φ(v)
43
44 L_G 是图拉普拉斯。没有维度,没有向量。
45 非对称项是旋度来源。
46 阻尼项防止 φ 无界增长——半杯水原则。
47 Laplacian 按度归一化,防止高权重边放大信号。
48 """
49 phi = self.state.phi
50 new_phi = {}
51 for node_id in self.graph.nodes:
52 lap = self.graph.laplacian_at(node_id, phi)
53 asym = self.graph.asymmetry_at(node_id, phi)
54 # 按节点度归一化,防止高权重累积放大
55 degree = len(self.graph.neighbors_all(node_id))
56 norm = max(degree, 1)
57 phi_v = phi.get(node_id, 0.0)
58 new_phi[node_id] = (
59 phi_v
60 + self.diffusion_rate * (lap / norm)
61 + self.asym_lambda * (asym / norm)
62 - self.phi_damping * phi_v # 全局阻尼
63 )
64 self.state.phi.update(new_phi)
65 # Soft clamp: prevent phi divergence
66 max_phi = max((abs(v) for v in self.state.phi.values()), default=1.0)
67 if max_phi > 10.0:
68 scale = 10.0 / max_phi
69 for k in self.state.phi:
70 self.state.phi[k] *= scale
71
72 # ── 激活传播 ──
73
74 def propagate_mu(self):
75 """
76 激活沿图上的边传播:μ 从高激活节点流向邻居。
77 同时更新边流 J。
78 """
79 mu = self.state.mu
80 new_mu = dict(mu) # copy
81
82 for node_id in list(self.state.active_region):
83 mu_v = mu.get(node_id, 0.0)
84 if mu_v < 1e-10:
85 continue
86
87 neighbors = self.graph.neighbors_fwd(node_id)
88 if not neighbors:
89 continue
90
91 # 按边权重比例传播一部分激活
92 total_weight = sum(
93 self.graph.get_edge_weight(node_id, nb) for nb in neighbors
94 )
95 if total_weight < 1e-10:
96 continue
97
98 spread_ratio = 0.3 # 每步传播 30% 的激活
99 spread_amount = mu_v * spread_ratio
100
101 for nb in neighbors:
102 w = self.graph.get_edge_weight(node_id, nb)
103 flow = spread_amount * (w / total_weight)
104 new_mu[nb] = new_mu.get(nb, 0.0) + flow
105 # 更新边流 J
106 self.state.J[(node_id, nb)] = (
107 self.state.J.get((node_id, nb), 0.0) * 0.9 + flow
108 )
109 # 记录经验命中
110 self.state.experience_hits[nb] = (
111 self.state.experience_hits.get(nb, 0) + 1
112 )
113 self.state.active_region.add(nb)
114
115 new_mu[node_id] = mu_v - spread_amount
116
117 self.state.mu.update(new_mu)
118 # 清理极低激活
119 dead = [n for n, v in self.state.mu.items() if v < 1e-10]
120 for n in dead:
121 self.state.mu[n] = 0.0
122 self.state.active_region.discard(n)
123
124 # ── 行动释放 ──
125
126 def action_release(self, node_id: str) -> float:
127 """
128 u = o · c · φ(ε)
129 纯乘法,无阈值。
130 o: 能见度(|L_G φ| 的局部值)
131 c: 置信度
132 φ(ε): 残差势场
133 """
134 phi = self.state.phi
135 # o = |L_G φ|(v) 归一化
136 lap = abs(self.graph.laplacian_at(node_id, phi))
137 o = min(lap, 10.0) / 10.0 # 归一化到 [0,1]
138
139 # c = Dirichlet 置信度
140 c = self.state.get_confidence(node_id)
141
142 # φ(ε) = 势场值本身作为残差量度
143 epsilon = abs(phi.get(node_id, 0.0))
144
145 return o * c * epsilon
146
147 # ── 自适应衰减 ──
148
149 def adaptive_decay(self):
150 """
151 α(x) = α₀ · (1 - c(x))^β · (1 / κ(x))
152
153 锚点核不需要显式定义,它自己浮出来:
154 锚点核 = { x : α(x) < ε }
155 """
156 new_anchors = set()
157 decay_rates = []
158
159 for node_id in list(self.graph.nodes):
160 c = self.state.get_confidence(node_id)
161 kappa = self.graph.convergence(node_id)
162
163 # 自适应衰减率
164 alpha = self.alpha_0 * ((1.0 - c) ** self.beta_decay) * (1.0 / kappa)
165
166 # 衰减 φ
167 old_phi = self.state.phi.get(node_id, 0.0)
168 if abs(old_phi) > 1e-10:
169 self.state.phi[node_id] = old_phi * (1.0 - alpha)
170
171 # 衰减 μ(更快)
172 old_mu = self.state.mu.get(node_id, 0.0)
173 if old_mu > 1e-10:
174 decayed = old_mu * alpha * 3.0 # μ 衰减更快
175 self.state.mu[node_id] = max(0.0, old_mu - decayed)
176 if decayed > 1e-6:
177 self.state.attention.release(node_id, decayed)
178 self.state.decay_events.append({
179 'step': self.state.step_count,
180 'node': node_id,
181 'type': 'mu_decay',
182 'amount': decayed,
183 'alpha': alpha,
184 })
185
186 # 记录衰减率用于动态阈值
187 decay_rates.append((node_id, alpha))
188
189 # 动态锚点阈值:取衰减率最低的 10%(最多50个)
190 if decay_rates:
191 decay_rates.sort(key=lambda x: x[1])
192 cutoff = max(1, min(len(decay_rates) // 10, 50))
193 threshold = decay_rates[min(cutoff, len(decay_rates)-1)][1] * 1.1
194 threshold = max(threshold, 1e-6) # 防止零阈值
195 new_anchors = {nid for nid, a in decay_rates if a <= threshold}
196
197 self.state.anchor_nodes = new_anchors
198
199 # ── 三级归巢 ──
200
201 def homing(self):
202 """
203 dx_A/dt = F_task + λ₁·(能力核 - x_A) + λ₂·(锚点核 - x_A)
204
205 无任务时 F_task = 0,激活核先被最近的能力核捕获,
206 最终在锚点核引力场里稳定。
207 """
208 if not self.state.active_region:
209 return
210
211 # 找最近的能力核
212 ability_center_phi = {}
213 for core_id, nodes in self.state.ability_cores.items():
214 if nodes:
215 ability_center_phi[core_id] = (
216 sum(self.state.phi.get(n, 0.0) for n in nodes) / len(nodes)
217 )
218
219 # 锚点核中心
220 anchor_center_phi = 0.0
221 if self.state.anchor_nodes:
222 anchor_center_phi = (
223 sum(self.state.phi.get(n, 0.0) for n in self.state.anchor_nodes)
224 / len(self.state.anchor_nodes)
225 )
226
227 for node_id in list(self.state.active_region):
228 phi_v = self.state.phi.get(node_id, 0.0)
229
230 # 短程归巢——拉向最近的能力核
231 pull1 = 0.0
232 nearest_core = None
233 min_dist = float('inf')
234 for core_id, center in ability_center_phi.items():
235 dist = abs(center - phi_v)
236 if dist < min_dist:
237 min_dist = dist
238 nearest_core = core_id
239 pull1 = self.homing_lambda1 * (center - phi_v)
240
241 # 长程归巢——拉向锚点核
242 pull2 = self.homing_lambda2 * (anchor_center_phi - phi_v)
243
244 # 更新 phi
245 self.state.phi[node_id] = phi_v + pull1 + pull2
246
247 # 归巢也微弱影响 mu:向能力核方向的节点获得微量激活
248 if nearest_core and nearest_core in self.state.ability_cores:
249 for cn in list(self.state.ability_cores[nearest_core])[:3]:
250 if cn != node_id:
251 self.state.mu[cn] = self.state.mu.get(cn, 0.0) + abs(pull1) * 0.01
252
253 if nearest_core:
254 self.state.bound_ability_core = nearest_core
255
256 # ── 经验沉积 ──
257
258 def sediment(self):
259 """
260 沉积路径:记忆层 → 经验层 → 技能带 → 能力核
261
262 使用 experience_hits 检测,但引入"最近沉积步数"避免饱和:
263 只有当 hits 在最近 window 步内增长了才记录新的 trace。
264 """
265 window = 50 # 滑动窗口:最近50步内的新增才算
266 last_sed_hits = getattr(self, "_last_sed_hits", {})
267
268 for node_id, hits in list(self.state.experience_hits.items()):
269 # 检查该节点上次沉积时的 hits
270 prev_hits = last_sed_hits.get(node_id, 0)
271 recent_growth = hits - prev_hits
272
273 # 记忆层 → 经验层
274 if hits >= self.sediment_threshold:
275 if 'experience' not in self.state.experience_regions:
276 self.state.experience_regions['experience'] = set()
277 if node_id not in self.state.experience_regions['experience']:
278 self.state.experience_regions['experience'].add(node_id)
279 self.state.sedimentation_trace.append({
280 'step': self.state.step_count,
281 'node': node_id,
282 'transition': 'memory -> experience',
283 'hits': hits,
284 })
285 last_sed_hits[node_id] = hits
286
287 # 经验层 → 技能带候选(需要持续增长)
288 if hits >= self.skill_belt_threshold:
289 old_score = self.state.skill_belt_candidates.get(node_id, 0.0)
290 new_score = hits / self.merge_threshold
291 if new_score > old_score + 0.05: # 需要显著增长
292 self.state.skill_belt_candidates[node_id] = new_score
293 if old_score == 0.0:
294 self.state.sedimentation_trace.append({
295 'step': self.state.step_count,
296 'node': node_id,
297 'transition': 'experience -> skill_belt',
298 'hits': hits,
299 })
300 last_sed_hits[node_id] = hits
301
302 # 技能带 → 能力核(需要持续增长,且最近有新激活)
303 if hits >= self.merge_threshold and recent_growth >= self.sediment_threshold:
304 merged = False
305 for core_id, core_nodes in self.state.ability_cores.items():
306 for cn in list(core_nodes):
307 if (self.graph.get_edge_weight(node_id, cn) > 0 or
308 self.graph.get_edge_weight(cn, node_id) > 0):
309 core_nodes.add(node_id)
310 self.state.merge_events.append({
311 'step': self.state.step_count,
312 'node': node_id,
313 'core': core_id,
314 'transition': 'skill_belt -> ability_core',
315 })
316 merged = True
317 break
318 if merged:
319 break
320
321 if not merged:
322 core_id = f'core_{len(self.state.ability_cores)}'
323 self.state.ability_cores[core_id] = {node_id}
324 self.state.merge_events.append({
325 'step': self.state.step_count,
326 'node': node_id,
327 'core': core_id,
328 'transition': 'new_ability_core',
329 })
330 last_sed_hits[node_id] = hits
331
332 if not hasattr(self, '_last_sed_hits'):
333 self._last_sed_hits = {}
334 self._last_sed_hits.update(last_sed_hits)
335
336 # ── 边流衰减 ──
337
338 def decay_edges(self):
339 """边流 J 自然衰减"""
340 dead_edges = []
341 for edge_key, flow in self.state.J.items():
342 new_flow = flow * (1.0 - self.alpha_0 * 0.5)
343 if abs(new_flow) < 1e-10:
344 dead_edges.append(edge_key)
345 else:
346 self.state.J[edge_key] = new_flow
347 for k in dead_edges:
348 del self.state.J[k]
349
350 # ── 完整的一步 ──
351
352 def step(self):
353 """
354 一个完整的动力学步骤:
355 1. 图上扩散
356 2. 激活传播
357 3. 自适应衰减
358 4. 归巢
359 5. 沉积检测
360 6. 边流衰减
361 7. 更新输出模式
362 """
363 self.diffuse_phi()
364 self.propagate_mu()
365 self.adaptive_decay()
366 # 置信度自然衰减——遗忘是可塑性的必要条件
367 self.state.decay_all_confidence(rate=0.002)
368 self.homing()
369 self.sediment()
370 self.decay_edges()
371 self.state.update_output_mode()
372 self.state.step_count += 1