CIE-Unified

git clone 

CIE-Unified / cie
im_wower  ·  2026-03-31

state.py

  1"""
  2CIE State — 三元组状态 (φ, μ, J) + 注意力池
  3
  4φ(v) = 节点势场(慢变)— 知识/地形
  5μ(v) = 激活分布(快变)— 注意力/激活核位置  
  6J(u,v) = 边流(中速)— 技能/流动偏置
  7
  8注意力池总量 100 点守恒。
  9"""
 10
 11from collections import defaultdict
 12import math
 13import random
 14from typing import Optional
 15
 16
 17class AttentionPool:
 18    """
 19    注意力池——总量守恒,半杯水原则。
 20    
 21    总量 100 点。某区域要喷涌,必须从其他区域借调。
 22    最优工作区间 40-60%
 23    """
 24
 25    def __init__(self, total: float = 100.0):
 26        self.total = total
 27        self.allocated: dict[str, float] = {}  # region_id -> allocated amount
 28
 29    @property
 30    def used(self) -> float:
 31        return sum(self.allocated.values())
 32
 33    @property
 34    def free(self) -> float:
 35        return max(0.0, self.total - self.used)
 36
 37    @property
 38    def utilization(self) -> float:
 39        return self.used / self.total if self.total > 0 else 0.0
 40
 41    def allocate(self, region_id: str, amount: float) -> float:
 42        """分配注意力,返回实际分配量(可能因容量不足而减少)"""
 43        actual = min(amount, self.free)
 44        if actual > 0:
 45            self.allocated[region_id] = self.allocated.get(region_id, 0.0) + actual
 46        return actual
 47
 48    def release(self, region_id: str, amount: float = None) -> float:
 49        """释放注意力,返回实际释放量"""
 50        if region_id not in self.allocated:
 51            return 0.0
 52        current = self.allocated[region_id]
 53        release_amt = min(amount, current) if amount is not None else current
 54        self.allocated[region_id] = current - release_amt
 55        if self.allocated[region_id] <= 1e-10:
 56            del self.allocated[region_id]
 57        return release_amt
 58
 59    def to_dict(self) -> dict:
 60        return {
 61            'total': self.total,
 62            'used': self.used,
 63            'free': self.free,
 64            'utilization': self.utilization,
 65            'allocated': dict(self.allocated)
 66        }
 67
 68
 69class CIEState:
 70    """
 71    CIE 运行时状态——(φ, μ, J) 三元组 + 注意力池。
 72    
 73    三核(锚点核、能力核、激活核)存在于同一张图上,
 74    通过 φ、μ、J 的不同更新速率自然分化。
 75    """
 76
 77    def __init__(self):
 78        # ── 核心三元组 ──
 79        self.phi: dict[str, float] = defaultdict(float)   # 节点势场(慢变)
 80        self.mu: dict[str, float] = defaultdict(float)     # 激活分布(快变)
 81        self.J: dict[tuple[str, str], float] = defaultdict(float)  # 边流(中速)
 82
 83        # ── 注意力池 ──
 84        self.attention = AttentionPool(total=100.0)
 85
 86        # ── 置信度(Dirichlet) ──
 87        # c[node_id] = list of K floats (Dirichlet alpha parameters)
 88        self.confidence: dict[str, list[float]] = {}
 89        self.default_K = 3  # 默认分量数
 90
 91        # ── 沉积追踪 ──
 92        self.sedimentation_trace: list[dict] = []  # 最近N步沉积记录
 93        self.merge_events: list[dict] = []          # 经验→能力核合并事件
 94        self.decay_events: list[dict] = []          # 衰减/遗忘事件
 95
 96        # ── 三核标记 ──
 97        # 锚点核:α(x) < ε 的节点集(自动浮出,不手动指定)
 98        # 能力核:长期稳定的高 κ 区域
 99        # 激活核:当前 μ 值最高的活跃区域
100        self.anchor_nodes: set[str] = set()
101        self.ability_cores: dict[str, set[str]] = {}  # core_id -> node set
102        self.active_region: set[str] = set()
103        self.bound_ability_core: Optional[str] = None  # 当前归属的能力核
104
105        # ── 经验层与技能带 ──
106        self.experience_hits: dict[str, int] = defaultdict(int)  # 节点被激活次数
107        self.experience_regions: dict[str, set[str]] = {}        # region_id -> nodes
108        self.skill_belt_candidates: dict[str, float] = {}        # node -> stability score
109
110        # ── 输出模式 ──
111        self.output_mode: str = 'full'  # full / degraded / minimal
112
113        # ── 步计数 ──
114        self.step_count: int = 0
115
116        # ── 反馈效果 ──
117        self.last_feedback_effect: dict = {}
118
119    # ── 置信度操作 ──
120
121    def get_confidence(self, node_id: str) -> float:
122        """
123        返回节点的置信度 c ∈ [0, 1]。
124        用 Dirichlet 的集中度衡量:max(alpha) / sum(alpha)。
125        分类已在 ingest 中区分(cat0=左, cat1=右, cat2=锚点)。
126        """
127        if node_id not in self.confidence:
128            return 0.0
129        alphas = self.confidence[node_id]
130        total = sum(alphas)
131        if total <= 0:
132            return 0.0
133        return max(alphas) / total
134
135    def init_confidence(self, node_id: str, K: int = None):
136        """初始化节点的 Dirichlet 先验——均匀的弱先验"""
137        k = K or self.default_K
138        self.confidence[node_id] = [1.0] * k  # 均匀先验
139
140    def update_confidence(self, node_id: str, category: int, amount: float = 1.0):
141        """更新 Dirichlet:观察到 category,增加对应 alpha"""
142        if node_id not in self.confidence:
143            self.init_confidence(node_id)
144        alphas = self.confidence[node_id]
145        if 0 <= category < len(alphas):
146            alphas[category] += amount
147
148    def weaken_confidence(self, node_id: str, amount: float = 1.0):
149        """
150        削弱置信度——把 alpha 朝均匀先验回退。
151        负反馈不是简单减少某个 alpha,而是让分布回归不确定。
152        """
153        if node_id not in self.confidence:
154            return
155        alphas = self.confidence[node_id]
156        k = len(alphas)
157        if k == 0:
158            return
159        mean_alpha = sum(alphas) / k
160        for i in range(k):
161            # 朝均值回退
162            alphas[i] = alphas[i] + amount * (mean_alpha - alphas[i]) * 0.3
163            # 不低于 1.0(先验下限)
164            alphas[i] = max(1.0, alphas[i])
165
166    def decay_all_confidence(self, rate: float = 0.001):
167        """
168        全局置信度自然衰减——所有 alpha 缓慢回退向先验。
169        遗忘不是 bug,是地形可塑性的必要条件。
170        """
171        for node_id, alphas in self.confidence.items():
172            k = len(alphas)
173            if k == 0:
174                continue
175            for i in range(k):
176                # 缓慢回退向 1.0(先验)
177                alphas[i] = alphas[i] * (1.0 - rate) + 1.0 * rate
178
179    # ── 势场操作 ──
180
181    def init_node(self, node_id: str, phi_val: float = 0.0):
182        """初始化节点势场和置信度"""
183        self.phi[node_id] = phi_val
184        self.mu[node_id] = 0.0
185        if node_id not in self.confidence:
186            self.init_confidence(node_id)
187
188    # ── 激活操作 ──
189
190    def activate(self, node_id: str, amount: float):
191        """注入激活到节点,消耗注意力池"""
192        actual = self.attention.allocate(node_id, amount)
193        self.mu[node_id] = self.mu.get(node_id, 0.0) + actual
194        self.active_region.add(node_id)
195        self.experience_hits[node_id] = self.experience_hits.get(node_id, 0) + 1
196        return actual
197
198    def deactivate(self, node_id: str):
199        """去激活节点,释放注意力"""
200        mu_val = self.mu.get(node_id, 0.0)
201        if mu_val > 0:
202            self.attention.release(node_id, mu_val)
203            self.mu[node_id] = 0.0
204        self.active_region.discard(node_id)
205
206    # ── 输出模式判断 ──
207
208    def update_output_mode(self):
209        """根据注意力利用率决定输出模式"""
210        util = self.attention.utilization
211        active_count = len(self.active_region)
212        if util > 0.3 and active_count >= 3:
213            self.output_mode = 'full'
214        elif util > 0.1 or active_count >= 1:
215            self.output_mode = 'degraded'
216        else:
217            self.output_mode = 'minimal'
218
219    # ── snapshot ──
220
221    def snapshot(self) -> dict:
222        """
223        导出可比较的运行时摘要(SPEC §6)。
224        """
225        # 计算 drift_score: 激活核偏离能力核/锚点核的程度
226        drift = 0.0
227        if self.active_region and self.anchor_nodes:
228            # 简化:激活区域中不在锚点核附近的比例
229            overlap = self.active_region & self.anchor_nodes
230            drift = 1.0 - (len(overlap) / max(len(self.active_region), 1))
231
232        # anchor_pull: 锚点核对当前状态的回拉强度
233        anchor_pull = 0.0
234        if self.anchor_nodes:
235            anchor_phi = sum(self.phi.get(n, 0.0) for n in self.anchor_nodes)
236            active_phi = sum(self.phi.get(n, 0.0) for n in self.active_region) if self.active_region else 0.0
237            anchor_pull = abs(anchor_phi - active_phi) / max(len(self.anchor_nodes), 1)
238
239        return {
240            'step_count': self.step_count,
241            'phi_summary': {
242                'count': len(self.phi),
243                'mean': sum(self.phi.values()) / max(len(self.phi), 1),
244                'max': max(self.phi.values()) if self.phi else 0.0,
245                'min': min(self.phi.values()) if self.phi else 0.0,
246                'top5': sorted(self.phi.items(), key=lambda x: -x[1])[:5],
247            },
248            'mu_summary': {
249                'count': sum(1 for v in self.mu.values() if v > 0),
250                'total': sum(self.mu.values()),
251                'top5': sorted(self.mu.items(), key=lambda x: -x[1])[:5],
252            },
253            'J_summary': {
254                'count': sum(1 for v in self.J.values() if abs(v) > 1e-10),
255                'total_flow': sum(abs(v) for v in self.J.values()),
256            },
257            'active_region': list(self.active_region)[:20],
258            'bound_ability_core': self.bound_ability_core,
259            'anchor_pull': anchor_pull,
260            'drift_score': drift,
261            'free_capacity': self.attention.free,
262            'experience_regions': {k: list(v)[:10] for k, v in self.experience_regions.items()},
263            'skill_belt_candidates': dict(sorted(
264                self.skill_belt_candidates.items(), key=lambda x: -x[1])[:10]),
265            'sedimentation_trace': self.sedimentation_trace[-20:],
266            'merge_events': self.merge_events[-10:],
267            'decay_events': self.decay_events[-10:],
268            'output_mode': self.output_mode,
269            'feedback_effect': self.last_feedback_effect,
270            'attention': self.attention.to_dict(),
271        }