im_wower
·
2026-04-01
runtime.py
1"""
2CIE Runtime — 统一接口(SPEC §5)
3
4六个方法:ingest, step, emit, commit_feedback, snapshot_state, reset_session
5这是 Branch B 的工程稳健 runtime 实现。
6"""
7
8import random
9import math
10from typing import Optional
11from .graph import Graph
12from .state import CIEState, PendingSignal
13from .dynamics import Dynamics
14
15
16class CIERuntime:
17 """
18 CIE 运行时——图原生认知推理引擎。
19
20 Branch B 定位:工程稳健增强 runtime。
21 先能跑、先能验证、先能出结果。
22 """
23
24 def __init__(self, seed: int = None):
25 self.graph = Graph()
26 self.state = CIEState()
27 self.dynamics = Dynamics(self.graph, self.state)
28 self.rng = random.Random(seed)
29
30 # ── 输出缓冲 ──
31 self._output_buffer: list[dict] = []
32 self._last_output: Optional[dict] = None
33
34
35 # ──────────────────────────────────────
36 # §5.1 ingest — 接收输入,注入图中
37 # ──────────────────────────────────────
38
39 def ingest(self, input_data, context=None, anchors=None):
40 """
41 接收新的输入。创建 PendingSignal 排队,在下一步 step 时消费。
42 PendingSignal 是唯一的状态写入入口。
43 """
44 if isinstance(input_data, str):
45 tokens = list(input_data)
46 elif isinstance(input_data, (list, tuple)):
47 tokens = list(input_data)
48 else:
49 tokens = [str(input_data)]
50
51 anchor_tokens = []
52 if anchors:
53 if isinstance(anchors, str):
54 anchor_tokens = [anchors]
55 elif isinstance(anchors, (list, tuple)):
56 anchor_tokens = list(anchors)
57
58 context_tokens = []
59 if context:
60 if isinstance(context, str):
61 context_tokens = list(context)[:10]
62 elif isinstance(context, (list, tuple)):
63 context_tokens = list(context)[:10]
64
65 signal = PendingSignal(
66 source="external",
67 tokens=tokens,
68 context_tokens=context_tokens,
69 anchor_tokens=anchor_tokens,
70 strength=1.0,
71 polarity=1,
72 )
73 self.state.pending_signals.append(signal)
74
75 def step(self, n: int = 1):
76 """
77 推进 n 步内部动力学演化。
78 第一步先消费所有 pending_signals,然后跑动力学。
79 """
80 for i in range(n):
81 # 第一步消费所有信号
82 if i == 0:
83 self._consume_signals()
84 self.dynamics.step()
85
86 def _consume_signals(self):
87 """消费所有排队的信号——来自 ingest/emit/feedback"""
88 signals = list(self.state.pending_signals)
89 self.state.pending_signals.clear()
90
91 for signal in signals:
92 self._apply_signal(signal)
93
94 def _apply_signal(self, signal):
95 """
96 应用一个信号到图上。
97
98 信号路径:
99 - tokens: 主输入,注入节点+激活+bigram 边+Dirichlet cat0/cat1
100 - context_tokens: 上下文,与主 tokens 建立弱关联边(不注入激活)
101 - anchor_tokens: 锚点,高置信度 cat2 + 高势场
102 - source="emit": 回灌信号,弱激活
103 - polarity=-1: 负向信号,衰减+削弱置信度
104
105 context 语义:context 提供"这段输入是在什么背景下出现的",
106 和主 token 建立弱非对称边但不直接注入激活。
107 这比完全忽略 context 更忠于 README 的"并行归位"概念——
108 context 字符落在图的不同层级,为主 token 提供方向参照。
109 """
110 tokens = signal.tokens
111 if not tokens:
112 return
113
114 # ── 注入节点和激活 ──
115 for token in tokens:
116 if not self.graph.has_node(token):
117 self.graph.add_node(token, label=token)
118 self.state.init_node(token, phi_val=self.rng.gauss(0.0, 0.1))
119
120 inject_amount = (100.0 / max(len(tokens), 1) * 0.5) * signal.strength
121 if signal.polarity >= 0:
122 self.state.activate(token, inject_amount)
123 else:
124 # 负极性:衰减
125 self.state.phi[token] = self.state.phi.get(token, 0.0) * 0.5
126 self.state.weaken_confidence(token, amount=abs(signal.strength))
127
128 # ── 建立 bigram 边(非对称) ──
129 for i in range(len(tokens) - 1):
130 src, dst = tokens[i], tokens[i + 1]
131 existing_w = self.graph.get_edge_weight(src, dst)
132 asym = self.rng.gauss(0.0, 0.1)
133 increment = 1.0 / (1.0 + existing_w * 0.1) * signal.strength
134 self.graph.add_edge(
135 src, dst,
136 weight=existing_w + increment + abs(asym),
137 bwd_weight=existing_w + increment - abs(asym) * 0.5,
138 edge_type='bigram'
139 )
140 self.state.update_confidence(src, 0, amount=0.5 * signal.strength)
141 self.state.update_confidence(dst, 1, amount=0.5 * signal.strength)
142
143 # ── context 消费:context tokens 与主 tokens 建立弱关联 ──
144 for ctx in signal.context_tokens:
145 if not ctx:
146 continue
147 if not self.graph.has_node(ctx):
148 self.graph.add_node(ctx, label=ctx)
149 self.state.init_node(ctx, phi_val=self.rng.gauss(0.0, 0.05))
150 # context 与每个主 token 建立弱边
151 for token in tokens[:5]: # 只取前 5 个避免爆炸
152 if token and token != ctx:
153 existing = self.graph.get_edge_weight(ctx, token)
154 self.graph.add_edge(
155 ctx, token,
156 weight=existing + 0.3 * signal.strength,
157 bwd_weight=existing + 0.2 * signal.strength,
158 edge_type='context'
159 )
160
161 # ── 锚点注入 ──
162 for anchor in signal.anchor_tokens:
163 if not self.graph.has_node(anchor):
164 self.graph.add_node(anchor, label=anchor)
165 self.state.init_node(anchor, phi_val=1.0)
166 self.state.update_confidence(anchor, 2, amount=10.0 * signal.strength)
167 self.state.phi[anchor] = self.state.phi.get(anchor, 0.0) + 1.0 * signal.strength
168
169 # ── 回灌信号额外处理 ──
170 if signal.source == "emit":
171 # 回灌的激活量较弱
172 for token in tokens:
173 mu_val = signal.metadata.get("mu_" + token, 0.0)
174 feedback_amount = mu_val * 0.05
175 if feedback_amount > 0.001 and self.graph.has_node(token):
176 self.state.activate(token, feedback_amount)
177 self.state.phi[token] = self.state.phi.get(token, 0.0) + feedback_amount * 0.01
178
179 # ──────────────────────────────────────
180 # §5.3 emit — 生成输出
181 # ──────────────────────────────────────
182
183 def emit(self) -> dict:
184 """
185 生成当前输出。允许完整输出或降级输出。
186
187 输出基于当前激活区域的行动释放值排序。
188 半杯水也要能流,不等满了再倒。
189 """
190 self.state.update_output_mode()
191
192 # 计算每个活跃节点的行动释放
193 releases = {}
194 for node_id in self.state.active_region:
195 u = self.dynamics.action_release(node_id)
196 if u > 1e-10:
197 releases[node_id] = u
198
199 # 按释放值排序
200 sorted_nodes = sorted(releases.items(), key=lambda x: -x[1])
201
202 # 根据输出模式决定输出多少
203 mode = self.state.output_mode
204 if mode == 'full':
205 top_nodes = sorted_nodes[:10]
206 elif mode == 'degraded':
207 top_nodes = sorted_nodes[:3]
208 else: # minimal
209 top_nodes = sorted_nodes[:1]
210
211 output = {
212 'mode': mode,
213 'activated': [
214 {
215 'node': nid,
216 'label': self.graph.get_node(nid).label if self.graph.get_node(nid) else nid,
217 'release': u,
218 'phi': self.state.phi.get(nid, 0.0),
219 'mu': self.state.mu.get(nid, 0.0),
220 'confidence': self.state.get_confidence(nid),
221 }
222 for nid, u in top_nodes
223 ],
224 'step': self.state.step_count,
225 'attention_free': self.state.attention.free,
226 'active_count': len(self.state.active_region),
227 }
228
229 # 创建回灌信号排队
230 metadata = {"mode": output["mode"]}
231 for item in output.get("activated", []):
232 metadata["mu_" + item["node"]] = item.get("mu", 0.0)
233
234 feedback_signal = PendingSignal(
235 source="emit",
236 tokens=[item["node"] for item in output.get("activated", [])[:5]],
237 strength={"full": 0.5, "degraded": 0.3, "minimal": 0.1}.get(output["mode"], 0.1),
238 polarity=1,
239 metadata=metadata,
240 )
241 self.state.pending_signals.append(feedback_signal)
242
243 self._last_output = output
244 self._output_buffer.append(output)
245 return output
246
247 # ──────────────────────────────────────
248 # §5.4 commit_feedback — 反馈写回
249 # ──────────────────────────────────────
250
251 def commit_feedback(self, feedback: dict):
252 """
253 把反馈写回系统——通过信号队列。
254 """
255 effect = {'reinforced': [], 'weakened': [], 'reward': 0.0}
256
257 # 正确的节点——创建正向信号
258 correct = feedback.get('correct', [])
259 if correct:
260 signal = PendingSignal(
261 source="feedback",
262 tokens=correct,
263 strength=2.0,
264 polarity=1,
265 metadata={"type": "correct"},
266 )
267 self.state.pending_signals.append(signal)
268 effect['reinforced'] = list(correct)
269
270 # 错误的节点——创建负向信号
271 wrong = feedback.get('wrong', [])
272 if wrong:
273 signal = PendingSignal(
274 source="feedback",
275 tokens=wrong,
276 strength=3.0,
277 polarity=-1,
278 metadata={"type": "wrong"},
279 )
280 self.state.pending_signals.append(signal)
281 effect['weakened'] = list(wrong)
282
283 # 全局奖励——对当前活跃区域
284 reward = feedback.get('reward', 0.0)
285 effect['reward'] = reward
286 if reward != 0 and self.state.active_region:
287 signal = PendingSignal(
288 source="feedback",
289 tokens=list(self.state.active_region)[:10],
290 strength=abs(reward),
291 polarity=1 if reward > 0 else -1,
292 metadata={"type": "reward", "reward": reward},
293 )
294 self.state.pending_signals.append(signal)
295
296 self.state.last_feedback_effect = effect
297
298 # ──────────────────────────────────────
299 # §5.5 snapshot_state — 导出摘要
300 # ──────────────────────────────────────
301
302 def snapshot_state(self) -> dict:
303 """导出可比较的运行时摘要(SPEC §6)。"""
304 state_snap = self.state.snapshot()
305 state_snap['graph'] = {
306 'node_count': self.graph.node_count,
307 'edge_count': self.graph.edge_count,
308 }
309 return state_snap
310
311 # ──────────────────────────────────────
312 # §5.6 reset_session — 清理会话态
313 # ──────────────────────────────────────
314
315 def reset_session(self):
316 """
317 清理当前会话态,但不破坏长期结构层数据。
318 保留:φ(地形)、ability_cores、anchor_nodes
319 清理:μ(激活)、active_region、attention、output_buffer
320 """
321 # 清理激活
322 for node_id in list(self.state.active_region):
323 self.state.deactivate(node_id)
324 self.state.mu = {k: 0.0 for k in self.state.mu}
325 self.state.active_region.clear()
326 self.state.attention = type(self.state.attention)(total=100.0)
327
328 # 清理边流
329 self.state.J.clear()
330
331 # 清理输出缓冲
332 self._output_buffer.clear()
333 self._last_output = None
334
335 # 保留:phi, confidence, anchor_nodes, ability_cores,
336 # experience_hits, experience_regions, skill_belt_candidates
337
338 self.state.output_mode = 'minimal'
339
340
341 # ──────────────────────────────────────
342 # 便利方法
343 # ──────────────────────────────────────
344
345 def run(self, input_data, steps: int = 5, context=None, anchors=None) -> dict:
346 """便利方法:ingest + step + emit 一条龙"""
347 self.ingest(input_data, context=context, anchors=anchors)
348 self.step(n=steps)
349 return self.emit()