CIE-Unified

git clone 

CIE-Unified / cie
im_wower  ·  2026-04-01

runtime.py

  1"""
  2CIE Runtime — 统一接口(SPEC §5)
  3
  4六个方法:ingest, step, emit, commit_feedback, snapshot_state, reset_session
  5这是 Branch B 的工程稳健 runtime 实现。
  6"""
  7
  8import random
  9import math
 10from typing import Optional
 11from .graph import Graph
 12from .state import CIEState, PendingSignal
 13from .dynamics import Dynamics
 14
 15
 16class CIERuntime:
 17    """
 18    CIE 运行时——图原生认知推理引擎。
 19    
 20    Branch B 定位:工程稳健增强 runtime。
 21    先能跑、先能验证、先能出结果。
 22    """
 23
 24    def __init__(self, seed: int = None):
 25        self.graph = Graph()
 26        self.state = CIEState()
 27        self.dynamics = Dynamics(self.graph, self.state)
 28        self.rng = random.Random(seed)
 29
 30        # ── 输出缓冲 ──
 31        self._output_buffer: list[dict] = []
 32        self._last_output: Optional[dict] = None
 33
 34
 35    # ──────────────────────────────────────
 36    # §5.1 ingest — 接收输入,注入图中
 37    # ──────────────────────────────────────
 38
 39    def ingest(self, input_data, context=None, anchors=None):
 40        """
 41        接收新的输入。创建 PendingSignal 排队,在下一步 step 时消费。
 42        PendingSignal 是唯一的状态写入入口。
 43        """
 44        if isinstance(input_data, str):
 45            tokens = list(input_data)
 46        elif isinstance(input_data, (list, tuple)):
 47            tokens = list(input_data)
 48        else:
 49            tokens = [str(input_data)]
 50
 51        anchor_tokens = []
 52        if anchors:
 53            if isinstance(anchors, str):
 54                anchor_tokens = [anchors]
 55            elif isinstance(anchors, (list, tuple)):
 56                anchor_tokens = list(anchors)
 57
 58        context_tokens = []
 59        if context:
 60            if isinstance(context, str):
 61                context_tokens = list(context)[:10]
 62            elif isinstance(context, (list, tuple)):
 63                context_tokens = list(context)[:10]
 64
 65        signal = PendingSignal(
 66            source="external",
 67            tokens=tokens,
 68            context_tokens=context_tokens,
 69            anchor_tokens=anchor_tokens,
 70            strength=1.0,
 71            polarity=1,
 72        )
 73        self.state.pending_signals.append(signal)
 74
 75    def step(self, n: int = 1):
 76        """
 77        推进 n 步内部动力学演化。
 78        第一步先消费所有 pending_signals,然后跑动力学。
 79        """
 80        for i in range(n):
 81            # 第一步消费所有信号
 82            if i == 0:
 83                self._consume_signals()
 84            self.dynamics.step()
 85
 86    def _consume_signals(self):
 87        """消费所有排队的信号——来自 ingest/emit/feedback"""
 88        signals = list(self.state.pending_signals)
 89        self.state.pending_signals.clear()
 90
 91        for signal in signals:
 92            self._apply_signal(signal)
 93
 94    def _apply_signal(self, signal):
 95        """
 96        应用一个信号到图上。
 97        
 98        信号路径:
 99          - tokens: 主输入,注入节点+激活+bigram 边+Dirichlet cat0/cat1
100          - context_tokens: 上下文,与主 tokens 建立弱关联边(不注入激活)
101          - anchor_tokens: 锚点,高置信度 cat2 + 高势场
102          - source="emit": 回灌信号,弱激活
103          - polarity=-1: 负向信号,衰减+削弱置信度
104        
105        context 语义:context 提供"这段输入是在什么背景下出现的"
106        和主 token 建立弱非对称边但不直接注入激活。
107        这比完全忽略 context 更忠于 README 的"并行归位"概念——
108        context 字符落在图的不同层级,为主 token 提供方向参照。
109        """
110        tokens = signal.tokens
111        if not tokens:
112            return
113
114        # ── 注入节点和激活 ──
115        for token in tokens:
116            if not self.graph.has_node(token):
117                self.graph.add_node(token, label=token)
118                self.state.init_node(token, phi_val=self.rng.gauss(0.0, 0.1))
119
120            inject_amount = (100.0 / max(len(tokens), 1) * 0.5) * signal.strength
121            if signal.polarity >= 0:
122                self.state.activate(token, inject_amount)
123            else:
124                # 负极性:衰减
125                self.state.phi[token] = self.state.phi.get(token, 0.0) * 0.5
126                self.state.weaken_confidence(token, amount=abs(signal.strength))
127
128        # ── 建立 bigram 边(非对称) ──
129        for i in range(len(tokens) - 1):
130            src, dst = tokens[i], tokens[i + 1]
131            existing_w = self.graph.get_edge_weight(src, dst)
132            asym = self.rng.gauss(0.0, 0.1)
133            increment = 1.0 / (1.0 + existing_w * 0.1) * signal.strength
134            self.graph.add_edge(
135                src, dst,
136                weight=existing_w + increment + abs(asym),
137                bwd_weight=existing_w + increment - abs(asym) * 0.5,
138                edge_type='bigram'
139            )
140            self.state.update_confidence(src, 0, amount=0.5 * signal.strength)
141            self.state.update_confidence(dst, 1, amount=0.5 * signal.strength)
142
143        # ── context 消费:context tokens 与主 tokens 建立弱关联 ──
144        for ctx in signal.context_tokens:
145            if not ctx:
146                continue
147            if not self.graph.has_node(ctx):
148                self.graph.add_node(ctx, label=ctx)
149                self.state.init_node(ctx, phi_val=self.rng.gauss(0.0, 0.05))
150            # context 与每个主 token 建立弱边
151            for token in tokens[:5]:  # 只取前 5 个避免爆炸
152                if token and token != ctx:
153                    existing = self.graph.get_edge_weight(ctx, token)
154                    self.graph.add_edge(
155                        ctx, token,
156                        weight=existing + 0.3 * signal.strength,
157                        bwd_weight=existing + 0.2 * signal.strength,
158                        edge_type='context'
159                    )
160
161        # ── 锚点注入 ──
162        for anchor in signal.anchor_tokens:
163            if not self.graph.has_node(anchor):
164                self.graph.add_node(anchor, label=anchor)
165                self.state.init_node(anchor, phi_val=1.0)
166            self.state.update_confidence(anchor, 2, amount=10.0 * signal.strength)
167            self.state.phi[anchor] = self.state.phi.get(anchor, 0.0) + 1.0 * signal.strength
168
169        # ── 回灌信号额外处理 ──
170        if signal.source == "emit":
171            # 回灌的激活量较弱
172            for token in tokens:
173                mu_val = signal.metadata.get("mu_" + token, 0.0)
174                feedback_amount = mu_val * 0.05
175                if feedback_amount > 0.001 and self.graph.has_node(token):
176                    self.state.activate(token, feedback_amount)
177                    self.state.phi[token] = self.state.phi.get(token, 0.0) + feedback_amount * 0.01
178
179    # ──────────────────────────────────────
180    # §5.3 emit — 生成输出
181    # ──────────────────────────────────────
182
183    def emit(self) -> dict:
184        """
185        生成当前输出。允许完整输出或降级输出。
186        
187        输出基于当前激活区域的行动释放值排序。
188        半杯水也要能流,不等满了再倒。
189        """
190        self.state.update_output_mode()
191
192        # 计算每个活跃节点的行动释放
193        releases = {}
194        for node_id in self.state.active_region:
195            u = self.dynamics.action_release(node_id)
196            if u > 1e-10:
197                releases[node_id] = u
198
199        # 按释放值排序
200        sorted_nodes = sorted(releases.items(), key=lambda x: -x[1])
201
202        # 根据输出模式决定输出多少
203        mode = self.state.output_mode
204        if mode == 'full':
205            top_nodes = sorted_nodes[:10]
206        elif mode == 'degraded':
207            top_nodes = sorted_nodes[:3]
208        else:  # minimal
209            top_nodes = sorted_nodes[:1]
210
211        output = {
212            'mode': mode,
213            'activated': [
214                {
215                    'node': nid,
216                    'label': self.graph.get_node(nid).label if self.graph.get_node(nid) else nid,
217                    'release': u,
218                    'phi': self.state.phi.get(nid, 0.0),
219                    'mu': self.state.mu.get(nid, 0.0),
220                    'confidence': self.state.get_confidence(nid),
221                }
222                for nid, u in top_nodes
223            ],
224            'step': self.state.step_count,
225            'attention_free': self.state.attention.free,
226            'active_count': len(self.state.active_region),
227        }
228
229        # 创建回灌信号排队
230        metadata = {"mode": output["mode"]}
231        for item in output.get("activated", []):
232            metadata["mu_" + item["node"]] = item.get("mu", 0.0)
233
234        feedback_signal = PendingSignal(
235            source="emit",
236            tokens=[item["node"] for item in output.get("activated", [])[:5]],
237            strength={"full": 0.5, "degraded": 0.3, "minimal": 0.1}.get(output["mode"], 0.1),
238            polarity=1,
239            metadata=metadata,
240        )
241        self.state.pending_signals.append(feedback_signal)
242
243        self._last_output = output
244        self._output_buffer.append(output)
245        return output
246
247    # ──────────────────────────────────────
248    # §5.4 commit_feedback — 反馈写回
249    # ──────────────────────────────────────
250
251    def commit_feedback(self, feedback: dict):
252        """
253        把反馈写回系统——通过信号队列。
254        """
255        effect = {'reinforced': [], 'weakened': [], 'reward': 0.0}
256
257        # 正确的节点——创建正向信号
258        correct = feedback.get('correct', [])
259        if correct:
260            signal = PendingSignal(
261                source="feedback",
262                tokens=correct,
263                strength=2.0,
264                polarity=1,
265                metadata={"type": "correct"},
266            )
267            self.state.pending_signals.append(signal)
268            effect['reinforced'] = list(correct)
269
270        # 错误的节点——创建负向信号
271        wrong = feedback.get('wrong', [])
272        if wrong:
273            signal = PendingSignal(
274                source="feedback",
275                tokens=wrong,
276                strength=3.0,
277                polarity=-1,
278                metadata={"type": "wrong"},
279            )
280            self.state.pending_signals.append(signal)
281            effect['weakened'] = list(wrong)
282
283        # 全局奖励——对当前活跃区域
284        reward = feedback.get('reward', 0.0)
285        effect['reward'] = reward
286        if reward != 0 and self.state.active_region:
287            signal = PendingSignal(
288                source="feedback",
289                tokens=list(self.state.active_region)[:10],
290                strength=abs(reward),
291                polarity=1 if reward > 0 else -1,
292                metadata={"type": "reward", "reward": reward},
293            )
294            self.state.pending_signals.append(signal)
295
296        self.state.last_feedback_effect = effect
297
298    # ──────────────────────────────────────
299    # §5.5 snapshot_state — 导出摘要
300    # ──────────────────────────────────────
301
302    def snapshot_state(self) -> dict:
303        """导出可比较的运行时摘要(SPEC §6)。"""
304        state_snap = self.state.snapshot()
305        state_snap['graph'] = {
306            'node_count': self.graph.node_count,
307            'edge_count': self.graph.edge_count,
308        }
309        return state_snap
310
311    # ──────────────────────────────────────
312    # §5.6 reset_session — 清理会话态
313    # ──────────────────────────────────────
314
315    def reset_session(self):
316        """
317        清理当前会话态,但不破坏长期结构层数据。
318        保留:φ(地形)、ability_cores、anchor_nodes
319        清理:μ(激活)、active_region、attention、output_buffer
320        """
321        # 清理激活
322        for node_id in list(self.state.active_region):
323            self.state.deactivate(node_id)
324        self.state.mu = {k: 0.0 for k in self.state.mu}
325        self.state.active_region.clear()
326        self.state.attention = type(self.state.attention)(total=100.0)
327
328        # 清理边流
329        self.state.J.clear()
330
331        # 清理输出缓冲
332        self._output_buffer.clear()
333        self._last_output = None
334
335        # 保留:phi, confidence, anchor_nodes, ability_cores,
336        #        experience_hits, experience_regions, skill_belt_candidates
337
338        self.state.output_mode = 'minimal'
339
340
341    # ──────────────────────────────────────
342    # 便利方法
343    # ──────────────────────────────────────
344
345    def run(self, input_data, steps: int = 5, context=None, anchors=None) -> dict:
346        """便利方法:ingest + step + emit 一条龙"""
347        self.ingest(input_data, context=context, anchors=anchors)
348        self.step(n=steps)
349        return self.emit()