CIE-Unified

git clone 

CIE-Unified / cie
im_wower  ·  2026-03-31

dynamics.py

  1"""
  2CIE Dynamics — 动力学引擎
  3
  4图上扩散、衰减、归巢、沉积——所有高层行为从底层流动规则自然导出。
  5代码里只写流动规则,不写标签。
  6"""
  7
  8import math
  9import random
 10from .graph import Graph
 11from .state import CIEState
 12
 13
 14class Dynamics:
 15    """
 16    动力学引擎——驱动 (φ, μ, J) 的演化。
 17    
 18    参数设计遵循"少参数、强解释性"原则。
 19    """
 20
 21    def __init__(self, graph: Graph, state: CIEState):
 22        self.graph = graph
 23        self.state = state
 24
 25        # ── 系统级参数(约束层) ──
 26        self.alpha_0 = 0.01       # 基础衰减率
 27        self.beta_decay = 1.5     # 衰减指数(置信度越高衰减越慢)
 28        self.diffusion_rate = 0.1 # 图上扩散速率 μ
 29        self.asym_lambda = 0.05   # 非对称项系数 λ_dir
 30        self.homing_lambda1 = 0.1 # 短程归巢力(→能力核)
 31        self.homing_lambda2 = 0.02  # 长程归巢力(→锚点核)
 32        self.anchor_epsilon = 0.005  # 锚点核阈值
 33        self.sediment_threshold = 10  # 经验沉积阈值(激活次数)
 34        self.skill_belt_threshold = 30  # 技能带固化阈值
 35        self.merge_threshold = 40   # 能力核合并阈值
 36        self.phi_damping = 0.02     # φ 全局阻尼——半杯水,不发散
 37
 38    # ── 图上扩散 ──
 39
 40    def diffuse_phi(self):
 41        """
 42        φ_new(v) = φ(v) + μ · (L_G φ)(v) + λ_dir · (W_fwd·φ - W_bwd·φ)(v) - damping·φ(v)
 43        
 44        L_G 是图拉普拉斯。没有维度,没有向量。
 45        非对称项是旋度来源。
 46        阻尼项防止 φ 无界增长——半杯水原则。
 47        Laplacian 按度归一化,防止高权重边放大信号。
 48        """
 49        phi = self.state.phi
 50        new_phi = {}
 51        for node_id in self.graph.nodes:
 52            lap = self.graph.laplacian_at(node_id, phi)
 53            asym = self.graph.asymmetry_at(node_id, phi)
 54            # 按节点度归一化,防止高权重累积放大
 55            degree = len(self.graph.neighbors_all(node_id))
 56            norm = max(degree, 1)
 57            phi_v = phi.get(node_id, 0.0)
 58            new_phi[node_id] = (
 59                phi_v
 60                + self.diffusion_rate * (lap / norm)
 61                + self.asym_lambda * (asym / norm)
 62                - self.phi_damping * phi_v  # 全局阻尼
 63            )
 64        self.state.phi.update(new_phi)
 65        # Soft clamp: prevent phi divergence
 66        max_phi = max((abs(v) for v in self.state.phi.values()), default=1.0)
 67        if max_phi > 10.0:
 68            scale = 10.0 / max_phi
 69            for k in self.state.phi:
 70                self.state.phi[k] *= scale
 71
 72    # ── 激活传播 ──
 73
 74    def propagate_mu(self):
 75        """
 76        激活沿图上的边传播:μ 从高激活节点流向邻居。
 77        同时更新边流 J。
 78        """
 79        mu = self.state.mu
 80        new_mu = dict(mu)  # copy
 81        
 82        for node_id in list(self.state.active_region):
 83            mu_v = mu.get(node_id, 0.0)
 84            if mu_v < 1e-10:
 85                continue
 86            
 87            neighbors = self.graph.neighbors_fwd(node_id)
 88            if not neighbors:
 89                continue
 90            
 91            # 按边权重比例传播一部分激活
 92            total_weight = sum(
 93                self.graph.get_edge_weight(node_id, nb) for nb in neighbors
 94            )
 95            if total_weight < 1e-10:
 96                continue
 97
 98            spread_ratio = 0.3  # 每步传播 30% 的激活
 99            spread_amount = mu_v * spread_ratio
100
101            for nb in neighbors:
102                w = self.graph.get_edge_weight(node_id, nb)
103                flow = spread_amount * (w / total_weight)
104                new_mu[nb] = new_mu.get(nb, 0.0) + flow
105                # 更新边流 J
106                self.state.J[(node_id, nb)] = (
107                    self.state.J.get((node_id, nb), 0.0) * 0.9 + flow
108                )
109                # 记录经验命中
110                self.state.experience_hits[nb] = (
111                    self.state.experience_hits.get(nb, 0) + 1
112                )
113                self.state.active_region.add(nb)
114
115            new_mu[node_id] = mu_v - spread_amount
116
117        self.state.mu.update(new_mu)
118        # 清理极低激活
119        dead = [n for n, v in self.state.mu.items() if v < 1e-10]
120        for n in dead:
121            self.state.mu[n] = 0.0
122            self.state.active_region.discard(n)
123
124    # ── 行动释放 ──
125
126    def action_release(self, node_id: str) -> float:
127        """
128        u = o · c · φ(ε)
129        纯乘法,无阈值。
130        o: 能见度(|L_G φ| 的局部值)
131        c: 置信度
132        φ(ε): 残差势场
133        """
134        phi = self.state.phi
135        # o = |L_G φ|(v) 归一化
136        lap = abs(self.graph.laplacian_at(node_id, phi))
137        o = min(lap, 10.0) / 10.0  # 归一化到 [0,1]
138
139        # c = Dirichlet 置信度
140        c = self.state.get_confidence(node_id)
141
142        # φ(ε) = 势场值本身作为残差量度
143        epsilon = abs(phi.get(node_id, 0.0))
144
145        return o * c * epsilon
146
147    # ── 自适应衰减 ──
148
149    def adaptive_decay(self):
150        """
151        α(x) = α₀ · (1 - c(x))^β · (1 / κ(x))
152        
153        锚点核不需要显式定义,它自己浮出来:
154        锚点核 = { x : α(x) < ε }
155        """
156        new_anchors = set()
157        decay_rates = []
158        
159        for node_id in list(self.graph.nodes):
160            c = self.state.get_confidence(node_id)
161            kappa = self.graph.convergence(node_id)
162            
163            # 自适应衰减率
164            alpha = self.alpha_0 * ((1.0 - c) ** self.beta_decay) * (1.0 / kappa)
165            
166            # 衰减 φ
167            old_phi = self.state.phi.get(node_id, 0.0)
168            if abs(old_phi) > 1e-10:
169                self.state.phi[node_id] = old_phi * (1.0 - alpha)
170
171            # 衰减 μ(更快)
172            old_mu = self.state.mu.get(node_id, 0.0)
173            if old_mu > 1e-10:
174                decayed = old_mu * alpha * 3.0  # μ 衰减更快
175                self.state.mu[node_id] = max(0.0, old_mu - decayed)
176                if decayed > 1e-6:
177                    self.state.attention.release(node_id, decayed)
178                    self.state.decay_events.append({
179                        'step': self.state.step_count,
180                        'node': node_id,
181                        'type': 'mu_decay',
182                        'amount': decayed,
183                        'alpha': alpha,
184                    })
185
186            # 记录衰减率用于动态阈值
187            decay_rates.append((node_id, alpha))
188
189        # 动态锚点阈值:取衰减率最低的 10%(最多50个)
190        if decay_rates:
191            decay_rates.sort(key=lambda x: x[1])
192            cutoff = max(1, min(len(decay_rates) // 10, 50))
193            threshold = decay_rates[min(cutoff, len(decay_rates)-1)][1] * 1.1
194            threshold = max(threshold, 1e-6)  # 防止零阈值
195            new_anchors = {nid for nid, a in decay_rates if a <= threshold}
196
197        self.state.anchor_nodes = new_anchors
198
199    # ── 三级归巢 ──
200
201    def homing(self):
202        """
203        dx_A/dt = F_task + λ₁·(能力核 - x_A) + λ₂·(锚点核 - x_A)
204        
205        无任务时 F_task = 0,激活核先被最近的能力核捕获,
206        最终在锚点核引力场里稳定。
207        """
208        if not self.state.active_region:
209            return
210
211        # 找最近的能力核
212        ability_center_phi = {}
213        for core_id, nodes in self.state.ability_cores.items():
214            if nodes:
215                ability_center_phi[core_id] = (
216                    sum(self.state.phi.get(n, 0.0) for n in nodes) / len(nodes)
217                )
218
219        # 锚点核中心
220        anchor_center_phi = 0.0
221        if self.state.anchor_nodes:
222            anchor_center_phi = (
223                sum(self.state.phi.get(n, 0.0) for n in self.state.anchor_nodes)
224                / len(self.state.anchor_nodes)
225            )
226
227        for node_id in list(self.state.active_region):
228            phi_v = self.state.phi.get(node_id, 0.0)
229            
230            # 短程归巢——拉向最近的能力核
231            pull1 = 0.0
232            nearest_core = None
233            min_dist = float('inf')
234            for core_id, center in ability_center_phi.items():
235                dist = abs(center - phi_v)
236                if dist < min_dist:
237                    min_dist = dist
238                    nearest_core = core_id
239                    pull1 = self.homing_lambda1 * (center - phi_v)
240
241            # 长程归巢——拉向锚点核
242            pull2 = self.homing_lambda2 * (anchor_center_phi - phi_v)
243
244            # 更新 phi
245            self.state.phi[node_id] = phi_v + pull1 + pull2
246
247            # 归巢也微弱影响 mu:向能力核方向的节点获得微量激活
248            if nearest_core and nearest_core in self.state.ability_cores:
249                for cn in list(self.state.ability_cores[nearest_core])[:3]:
250                    if cn != node_id:
251                        self.state.mu[cn] = self.state.mu.get(cn, 0.0) + abs(pull1) * 0.01
252
253            if nearest_core:
254                self.state.bound_ability_core = nearest_core
255
256    # ── 经验沉积 ──
257
258    def sediment(self):
259        """
260        沉积路径:记忆层 → 经验层 → 技能带 → 能力核
261        
262        使用 experience_hits 检测,但引入"最近沉积步数"避免饱和:
263        只有当 hits 在最近 window 步内增长了才记录新的 trace。
264        """
265        window = 50  # 滑动窗口:最近50步内的新增才算
266        last_sed_hits = getattr(self, "_last_sed_hits", {})
267        
268        for node_id, hits in list(self.state.experience_hits.items()):
269            # 检查该节点上次沉积时的 hits
270            prev_hits = last_sed_hits.get(node_id, 0)
271            recent_growth = hits - prev_hits
272            
273            # 记忆层 → 经验层
274            if hits >= self.sediment_threshold:
275                if 'experience' not in self.state.experience_regions:
276                    self.state.experience_regions['experience'] = set()
277                if node_id not in self.state.experience_regions['experience']:
278                    self.state.experience_regions['experience'].add(node_id)
279                    self.state.sedimentation_trace.append({
280                        'step': self.state.step_count,
281                        'node': node_id,
282                        'transition': 'memory -> experience',
283                        'hits': hits,
284                    })
285                    last_sed_hits[node_id] = hits
286
287            # 经验层 → 技能带候选(需要持续增长)
288            if hits >= self.skill_belt_threshold:
289                old_score = self.state.skill_belt_candidates.get(node_id, 0.0)
290                new_score = hits / self.merge_threshold
291                if new_score > old_score + 0.05:  # 需要显著增长
292                    self.state.skill_belt_candidates[node_id] = new_score
293                    if old_score == 0.0:
294                        self.state.sedimentation_trace.append({
295                            'step': self.state.step_count,
296                            'node': node_id,
297                            'transition': 'experience -> skill_belt',
298                            'hits': hits,
299                        })
300                    last_sed_hits[node_id] = hits
301
302            # 技能带 → 能力核(需要持续增长,且最近有新激活)
303            if hits >= self.merge_threshold and recent_growth >= self.sediment_threshold:
304                merged = False
305                for core_id, core_nodes in self.state.ability_cores.items():
306                    for cn in list(core_nodes):
307                        if (self.graph.get_edge_weight(node_id, cn) > 0 or
308                            self.graph.get_edge_weight(cn, node_id) > 0):
309                            core_nodes.add(node_id)
310                            self.state.merge_events.append({
311                                'step': self.state.step_count,
312                                'node': node_id,
313                                'core': core_id,
314                                'transition': 'skill_belt -> ability_core',
315                            })
316                            merged = True
317                            break
318                    if merged:
319                        break
320
321                if not merged:
322                    core_id = f'core_{len(self.state.ability_cores)}'
323                    self.state.ability_cores[core_id] = {node_id}
324                    self.state.merge_events.append({
325                        'step': self.state.step_count,
326                        'node': node_id,
327                        'core': core_id,
328                        'transition': 'new_ability_core',
329                    })
330                last_sed_hits[node_id] = hits
331        
332        if not hasattr(self, '_last_sed_hits'):
333            self._last_sed_hits = {}
334        self._last_sed_hits.update(last_sed_hits)
335
336    # ── 边流衰减 ──
337
338    def decay_edges(self):
339        """边流 J 自然衰减"""
340        dead_edges = []
341        for edge_key, flow in self.state.J.items():
342            new_flow = flow * (1.0 - self.alpha_0 * 0.5)
343            if abs(new_flow) < 1e-10:
344                dead_edges.append(edge_key)
345            else:
346                self.state.J[edge_key] = new_flow
347        for k in dead_edges:
348            del self.state.J[k]
349
350    # ── 完整的一步 ──
351
352    def step(self):
353        """
354        一个完整的动力学步骤:
355        1. 图上扩散
356        2. 激活传播
357        3. 自适应衰减
358        4. 归巢
359        5. 沉积检测
360        6. 边流衰减
361        7. 更新输出模式
362        """
363        self.diffuse_phi()
364        self.propagate_mu()
365        self.adaptive_decay()
366        # 置信度自然衰减——遗忘是可塑性的必要条件
367        self.state.decay_all_confidence(rate=0.002)
368        self.homing()
369        self.sediment()
370        self.decay_edges()
371        self.state.update_output_mode()
372        self.state.step_count += 1