codex@macbookpro
·
2026-03-31
runtime.py
1from __future__ import annotations
2
3import re
4from typing import Any, Dict, Iterable, List
5
6from .state import PendingSignal, RuntimeState, SedimentationProfile
7
8REQUIRED_SNAPSHOT_KEYS = {
9 "phi_summary",
10 "mu_summary",
11 "J_summary",
12 "active_region",
13 "bound_ability_core",
14 "anchor_pull",
15 "drift_score",
16 "free_capacity",
17 "experience_regions",
18 "skill_belt_candidates",
19 "sedimentation_trace",
20 "merge_events",
21 "decay_events",
22 "output_mode",
23 "feedback_effect",
24}
25
26STAGE_ORDER = ("memory", "experience", "skill_belt", "ability_core")
27
28
29def _round(value: float) -> float:
30 return round(value, 4)
31
32
33class CIERuntime:
34 """A small graph-native runtime centered on (phi, mu, J)."""
35
36 def __init__(
37 self,
38 *,
39 activation_retention: float = 0.54,
40 activation_spread: float = 0.3,
41 potential_decay: float = 0.97,
42 flow_decay: float = 0.94,
43 capacity_limit: float = 4.5,
44 ) -> None:
45 self.state = RuntimeState()
46 self.activation_retention = activation_retention
47 self.activation_spread = activation_spread
48 self.potential_decay = potential_decay
49 self.flow_decay = flow_decay
50 self.capacity_limit = capacity_limit
51
52 def ingest(self, input: Any, context: Any = None, anchors: Any = None) -> Dict[str, Any]:
53 tokens = self._tokenize(input)
54 context_tokens = self._tokenize(context)
55 anchor_tokens = self._tokenize(anchors)
56 signal = PendingSignal(
57 source="external",
58 tokens=tokens,
59 context_tokens=context_tokens,
60 anchor_tokens=anchor_tokens,
61 strength=1.0,
62 )
63 self.state.pending_signals.append(signal)
64 return {
65 "queued_tokens": list(tokens),
66 "queued_context": list(context_tokens),
67 "queued_anchors": list(anchor_tokens),
68 }
69
70 def step(self, n: int = 1) -> Dict[str, Any]:
71 steps = max(1, int(n))
72 for _ in range(steps):
73 self._advance_once()
74 return self.snapshot_state()
75
76 def emit(self) -> str:
77 active_nodes = self._top_nodes(self.state.mu, limit=3)
78 mode = self._choose_output_mode()
79 self.state.output_mode = mode
80 if not active_nodes:
81 output = "minimal: idle"
82 feedback_tokens = ["idle"]
83 elif mode == "full":
84 feedback_tokens = active_nodes[:]
85 output = "full: " + " -> ".join(feedback_tokens)
86 elif mode == "degraded":
87 feedback_tokens = self._degraded_feedback_tokens(active_nodes)
88 output = "degraded: " + " / ".join(feedback_tokens)
89 else:
90 fallback = self.state.bound_ability_core or active_nodes[0]
91 feedback_tokens = [fallback]
92 output = "minimal: " + fallback
93
94 feedback_signal = PendingSignal(
95 source="emit",
96 tokens=feedback_tokens,
97 context_tokens=active_nodes[:2],
98 anchor_tokens=self._top_anchor_nodes(limit=1),
99 strength={"full": 0.56, "degraded": 0.38, "minimal": 0.22}[mode],
100 metadata={
101 "mode": mode,
102 "emitted_nodes": list(active_nodes),
103 "queued_step": self.state.step_index,
104 },
105 )
106 self.state.pending_signals.append(feedback_signal)
107 self.state.last_output = output
108 self.state.feedback_effect = {
109 "source": "emit",
110 "mode": mode,
111 "queued_tokens": list(feedback_tokens),
112 "queued_strength": _round(feedback_signal.strength),
113 "confidence_proxy": _round(self.state.confidence_proxy),
114 "queued_step": self.state.step_index,
115 "last_applied_step": self.state.feedback_effect.get("last_applied_step"),
116 }
117 return output
118
119 def commit_feedback(self, feedback: Any) -> Dict[str, Any]:
120 payload = self._normalize_feedback(feedback)
121 signal = PendingSignal(
122 source="feedback",
123 tokens=payload["tokens"],
124 context_tokens=payload["context_tokens"],
125 strength=payload["strength"],
126 polarity=payload["polarity"],
127 metadata={"mode": "feedback", "queued_step": self.state.step_index},
128 )
129 self.state.pending_signals.append(signal)
130 self.state.feedback_effect = {
131 "source": "commit_feedback",
132 "mode": "feedback",
133 "queued_tokens": list(signal.tokens),
134 "queued_strength": _round(signal.strength),
135 "polarity": signal.polarity,
136 "queued_step": self.state.step_index,
137 "last_applied_step": self.state.feedback_effect.get("last_applied_step"),
138 }
139 return dict(self.state.feedback_effect)
140
141 def snapshot_state(self) -> Dict[str, Any]:
142 snapshot = {
143 "phi_summary": self._phi_summary(),
144 "mu_summary": self._mu_summary(),
145 "J_summary": self._j_summary(),
146 "active_region": list(self.state.active_region),
147 "bound_ability_core": self.state.bound_ability_core,
148 "anchor_pull": _round(self.state.anchor_pull),
149 "drift_score": _round(self.state.drift_score),
150 "free_capacity": _round(self.state.free_capacity),
151 "experience_regions": self._experience_regions(),
152 "skill_belt_candidates": self._skill_belt_candidates(),
153 "sedimentation_trace": list(self.state.sedimentation_trace),
154 "merge_events": list(self.state.merge_events),
155 "decay_events": list(self.state.decay_events),
156 "output_mode": self.state.output_mode,
157 "feedback_effect": dict(self.state.feedback_effect),
158 }
159 missing = REQUIRED_SNAPSHOT_KEYS.difference(snapshot)
160 if missing:
161 raise RuntimeError(f"Missing snapshot keys: {sorted(missing)}")
162 return snapshot
163
164 def reset_session(self) -> None:
165 self.state.mu.clear()
166 self.state.pending_signals.clear()
167 self.state.active_region.clear()
168 self.state.output_mode = "minimal"
169 self.state.last_output = ""
170 self.state.bound_ability_core = None
171 self.state.drift_score = 0.0
172 self.state.anchor_pull = 0.0
173 self.state.free_capacity = 1.0
174 self.state.confidence_proxy = 0.0
175 self.state.feedback_effect = {
176 "source": "reset_session",
177 "mode": "minimal",
178 "queued_tokens": [],
179 "queued_strength": 0.0,
180 "last_applied_step": self.state.step_index,
181 }
182
183 def _advance_once(self) -> None:
184 self.state.step_index += 1
185 pending = list(self.state.pending_signals)
186 self.state.pending_signals.clear()
187 for signal in pending:
188 self._apply_signal(signal)
189 self._propagate_activation()
190 self._apply_homing()
191 self._apply_decay()
192 self._refresh_sedimentation()
193 self._refresh_observability()
194
195 def _apply_signal(self, signal: PendingSignal) -> None:
196 combined = self._ordered_unique(signal.anchor_tokens + signal.context_tokens + signal.tokens)
197 for node in combined:
198 self.state.graph.ensure_node(node)
199 self.state.phi.setdefault(node, 0.0)
200 self.state.mu.setdefault(node, 0.0)
201 self.state.strata.setdefault(node, "memory")
202 self._ensure_profile(node)
203 self.state.touch_count[node] = self.state.touch_count.get(node, 0) + 1
204 self.state.node_last_touched[node] = self.state.step_index
205 for anchor in signal.anchor_tokens:
206 self.state.anchor_nodes[anchor] = self.state.anchor_nodes.get(anchor, 0.0) + (0.7 * signal.strength)
207 self.state.phi[anchor] = self.state.phi.get(anchor, 0.0) + (0.12 * signal.strength)
208 self.state.graph.connect_path(combined, weight=max(0.5, signal.strength))
209 for left, right in zip(combined, combined[1:]):
210 key = (left, right)
211 self.state.J[key] = self.state.J.get(key, 0.0) + (0.24 * signal.strength)
212 self.state.edge_last_touched[key] = self.state.step_index
213 for token in signal.tokens:
214 activation_gain = 0.78 * signal.strength
215 potential_gain = 0.22 * signal.strength
216 if signal.polarity >= 0:
217 self.state.mu[token] = self.state.mu.get(token, 0.0) + activation_gain
218 self.state.phi[token] = self.state.phi.get(token, 0.0) + potential_gain
219 else:
220 self.state.mu[token] = max(0.0, self.state.mu.get(token, 0.0) - (0.6 * activation_gain))
221 self.state.phi[token] = max(0.0, self.state.phi.get(token, 0.0) - (0.4 * potential_gain))
222 self._record_decay("feedback_suppression", token, activation_gain * 0.6, age=0)
223 if signal.source in {"emit", "feedback"}:
224 self._apply_feedback_signal(signal)
225
226 def _propagate_activation(self) -> None:
227 next_mu: Dict[str, float] = {}
228 incoming: Dict[str, float] = {}
229 current_mu = dict(self.state.mu)
230 for node, activation in current_mu.items():
231 if activation <= 0.0:
232 continue
233 retained = activation * self.activation_retention
234 next_mu[node] = next_mu.get(node, 0.0) + retained
235 neighbors = self.state.graph.neighbors(node)
236 if neighbors:
237 scored_neighbors = []
238 for neighbor, weight in neighbors.items():
239 forward = self.state.J.get((node, neighbor), 0.0)
240 reverse = self.state.J.get((neighbor, node), 0.0)
241 phi_bias = max(self.state.phi.get(neighbor, 0.0), 0.0)
242 anchor_bias = 0.2 * self.state.anchor_nodes.get(neighbor, 0.0)
243 score = weight + max(0.0, forward - (0.35 * reverse)) + (0.22 * phi_bias) + anchor_bias
244 scored_neighbors.append((neighbor, max(0.05, score)))
245 total_weight = sum(score for _, score in scored_neighbors) or 1.0
246 spread_budget = activation * self.activation_spread
247 for neighbor, score in scored_neighbors:
248 spread = spread_budget * (score / total_weight)
249 next_mu[neighbor] = next_mu.get(neighbor, 0.0) + spread
250 incoming[neighbor] = incoming.get(neighbor, 0.0) + spread
251 self.state.J[(node, neighbor)] = self.state.J.get((node, neighbor), 0.0) + spread * 0.18
252 self.state.edge_last_touched[(node, neighbor)] = self.state.step_index
253 self.state.phi[node] = self.state.phi.get(node, 0.0) + retained * 0.04
254 for node, gained in incoming.items():
255 self.state.node_last_touched[node] = self.state.step_index
256 self._ensure_profile(node)
257 if gained > 0.12:
258 self.state.touch_count[node] = self.state.touch_count.get(node, 0) + 1
259 recurrence = min(self.state.touch_count.get(node, 0), 4)
260 phi_gain = gained * (0.08 + (0.02 * recurrence))
261 if node in self.state.anchor_nodes:
262 phi_gain += 0.03
263 self.state.phi[node] = self.state.phi.get(node, 0.0) + phi_gain
264 self.state.mu = {node: value for node, value in next_mu.items() if value >= 0.02}
265
266 def _apply_homing(self) -> None:
267 core = self._select_bound_core()
268 self.state.bound_ability_core = core
269 if not core or not self.state.mu:
270 self.state.anchor_pull = 0.0
271 return
272 anchors = self._top_anchor_nodes(limit=1)
273 moved_to_anchor = 0.0
274 moved_to_core = 0.0
275 core_neighbors = set(self.state.graph.neighbors(core))
276 for node, activation in list(self.state.mu.items()):
277 if node == core or activation <= 0.0:
278 continue
279 near_core = node in core_neighbors
280 shift_rate = 0.04 if near_core else 0.08
281 if self.state.strata.get(node, "memory") in {"memory", "experience"}:
282 shift_rate += 0.03
283 if anchors:
284 shift_rate += 0.03
285 shift = activation * min(0.22, shift_rate)
286 if shift <= 0.0:
287 continue
288 self.state.mu[node] = max(0.0, activation - shift)
289 to_anchor = shift * 0.35 if anchors else 0.0
290 to_core = shift - to_anchor
291 self.state.mu[core] = self.state.mu.get(core, 0.0) + to_core
292 self.state.J[(node, core)] = self.state.J.get((node, core), 0.0) + (to_core * 0.18)
293 self.state.edge_last_touched[(node, core)] = self.state.step_index
294 self.state.node_last_touched[core] = self.state.step_index
295 moved_to_core += to_core
296 if anchors and to_anchor > 0.0:
297 anchor = anchors[0]
298 self.state.mu[anchor] = self.state.mu.get(anchor, 0.0) + to_anchor
299 self.state.phi[anchor] = self.state.phi.get(anchor, 0.0) + (to_anchor * 0.08)
300 self.state.J[(node, anchor)] = self.state.J.get((node, anchor), 0.0) + (to_anchor * 0.14)
301 self.state.edge_last_touched[(node, anchor)] = self.state.step_index
302 self.state.node_last_touched[anchor] = self.state.step_index
303 moved_to_anchor += to_anchor
304 self.state.phi[core] = self.state.phi.get(core, 0.0) + (moved_to_core * 0.05)
305 self.state.anchor_pull = moved_to_anchor
306
307 def _apply_decay(self) -> None:
308 for node, value in list(self.state.phi.items()):
309 age = self.state.step_index - self.state.node_last_touched.get(node, self.state.step_index)
310 factor = self.potential_decay - min(0.015 * age, 0.1)
311 if node in self.state.anchor_nodes:
312 factor += 0.03
313 factor = max(0.8, min(0.995, factor))
314 decayed = value * factor
315 if decayed < value - 0.01:
316 self._record_decay("phi_decay", node, value - decayed, age=age)
317 if abs(decayed) < 0.015 and age >= 2:
318 self.state.phi.pop(node, None)
319 self._record_decay("phi_prune", node, decayed, age=age)
320 continue
321 self.state.phi[node] = decayed
322 for node, value in list(self.state.mu.items()):
323 age = self.state.step_index - self.state.node_last_touched.get(node, self.state.step_index)
324 factor = 0.88 - min(0.04 * age, 0.24)
325 if node == self.state.bound_ability_core:
326 factor += 0.05
327 if node in self.state.anchor_nodes:
328 factor += 0.03
329 factor = max(0.52, min(0.96, factor))
330 decayed = value * factor
331 if decayed < value - 0.01:
332 self._record_decay("mu_decay", node, value - decayed, age=age)
333 if decayed < 0.05:
334 self.state.mu.pop(node, None)
335 self._record_decay("mu_prune", node, decayed, age=age)
336 continue
337 self.state.mu[node] = decayed
338 for edge, value in list(self.state.J.items()):
339 age = self.state.step_index - self.state.edge_last_touched.get(edge, self.state.step_index)
340 factor = self.flow_decay - min(0.03 * age, 0.18)
341 if self.state.bound_ability_core in edge:
342 factor += 0.03
343 factor = max(0.58, min(0.98, factor))
344 decayed = value * factor
345 if decayed < value - 0.01:
346 self._record_decay("J_decay", f"{edge[0]}->{edge[1]}", value - decayed, age=age)
347 if decayed < 0.03 and age >= 2:
348 self.state.J.pop(edge, None)
349 self._record_decay("J_prune", f"{edge[0]}->{edge[1]}", decayed, age=age)
350 continue
351 self.state.J[edge] = decayed
352
353 def _refresh_observability(self) -> None:
354 self.state.active_region = self._top_nodes(self.state.mu, limit=4)
355 self.state.bound_ability_core = self._select_bound_core()
356 self.state.drift_score = self._compute_drift_score()
357 self.state.anchor_pull = self._compute_anchor_pull()
358 total_activation = sum(self.state.mu.values())
359 self.state.free_capacity = max(0.0, 1.0 - min(total_activation / self.capacity_limit, 1.0))
360 self.state.confidence_proxy = self._confidence_proxy()
361 self.state.output_mode = self._choose_output_mode()
362 if self.state.feedback_effect.get("last_applied_step") == self.state.step_index:
363 applied = self.state.feedback_effect.get("applied_tokens", [])
364 self.state.feedback_effect["stage_after"] = {
365 node: self._ensure_profile(node).stage for node in applied
366 }
367 self.state.feedback_effect["bound_ability_core"] = self.state.bound_ability_core
368
369 def _ensure_profile(self, node: str) -> SedimentationProfile:
370 profile = self.state.sedimentation.get(node)
371 if profile is None:
372 profile = SedimentationProfile(stage=self.state.strata.get(node, "memory"))
373 self.state.sedimentation[node] = profile
374 self.state.strata[node] = profile.stage
375 return profile
376
377 def _refresh_sedimentation(self) -> None:
378 for node in self.state.graph.nodes():
379 profile = self._ensure_profile(node)
380 support = self._sedimentation_support(node)
381 activation = self.state.mu.get(node, 0.0)
382 touched_now = self.state.node_last_touched.get(node, -1) == self.state.step_index
383 stable_now = touched_now or activation >= 0.16 or (activation >= 0.08 and support >= 0.34)
384 if touched_now and profile.last_active_step != self.state.step_index:
385 profile.activation_hits += 1
386 profile.last_active_step = self.state.step_index
387 if stable_now:
388 profile.stable_steps += 1
389 profile.dormant_steps = 0
390 profile.resonance = min(6.0, (profile.resonance * 0.84) + support)
391 else:
392 profile.dormant_steps += 1
393 profile.stable_steps = max(0, profile.stable_steps - 1)
394 profile.resonance = max(0.0, profile.resonance * 0.7)
395 profile.candidate_score = self._candidate_score(node, profile)
396 self._update_stage_from_profile(node, profile)
397
398 def _sedimentation_support(self, node: str) -> float:
399 activation = self.state.mu.get(node, 0.0)
400 potential = self.state.phi.get(node, 0.0)
401 flow = self._node_flow(node)
402 return activation + (0.35 * potential) + (0.18 * flow)
403
404 def _candidate_score(self, node: str, profile: SedimentationProfile) -> float:
405 touches = min(self.state.touch_count.get(node, 0), 8)
406 flow = min(self._node_flow(node), 3.0)
407 potential = min(self.state.phi.get(node, 0.0), 3.0)
408 score = (
409 (0.12 * touches)
410 + (0.16 * min(profile.activation_hits, 8))
411 + (0.18 * min(profile.stable_steps, 8))
412 + (0.32 * min(profile.resonance, 3.0))
413 + (0.12 * flow)
414 + (0.08 * potential)
415 )
416 if node in self.state.anchor_nodes:
417 score += 0.05
418 return _round(score)
419
420 def _desired_stage(self, node: str, profile: SedimentationProfile) -> str:
421 touches = self.state.touch_count.get(node, 0)
422 flow = self._node_flow(node)
423 score = self._effective_candidate_score(profile)
424 if score >= 2.2 and touches >= 5 and profile.stable_steps >= 4 and flow >= 0.45:
425 return "ability_core"
426 if score >= 1.35 and touches >= 3 and profile.stable_steps >= 2:
427 return "skill_belt"
428 if score >= 0.55 and touches >= 1:
429 return "experience"
430 return "memory"
431
432 def _update_stage_from_profile(self, node: str, profile: SedimentationProfile) -> None:
433 current_index = STAGE_ORDER.index(profile.stage)
434 target_stage = self._desired_stage(node, profile)
435 target_index = STAGE_ORDER.index(target_stage)
436 new_stage: str | None = None
437 reason = "stability"
438 if target_index > current_index:
439 new_stage = STAGE_ORDER[current_index + 1]
440 reason = "promotion"
441 elif target_index < current_index and self._can_demote(profile):
442 new_stage = STAGE_ORDER[current_index - 1]
443 reason = "decay"
444 if new_stage is None or new_stage == profile.stage:
445 self.state.strata[node] = profile.stage
446 return
447 old_stage = profile.stage
448 profile.stage = new_stage
449 profile.last_transition_step = self.state.step_index
450 if new_stage == "ability_core":
451 profile.merged_into = self._record_merge_event(node, profile)
452 elif STAGE_ORDER.index(new_stage) < STAGE_ORDER.index(old_stage):
453 if new_stage != "ability_core":
454 profile.merged_into = None
455 self._record_decay(
456 "sedimentation_demote",
457 node,
458 max(0.01, profile.candidate_score),
459 age=profile.dormant_steps,
460 )
461 self.state.strata[node] = new_stage
462 self.state.sedimentation_trace.append(
463 {
464 "step": self.state.step_index,
465 "node": node,
466 "direction": "promote" if reason == "promotion" else "demote",
467 "from": old_stage,
468 "to": new_stage,
469 "touches": self.state.touch_count.get(node, 0),
470 "stable_steps": profile.stable_steps,
471 "dormant_steps": profile.dormant_steps,
472 "candidate_score": profile.candidate_score,
473 "resonance": _round(profile.resonance),
474 "flow": _round(self._node_flow(node)),
475 }
476 )
477 self.state.sedimentation_trace = self.state.sedimentation_trace[-20:]
478
479 def _can_demote(self, profile: SedimentationProfile) -> bool:
480 stage = profile.stage
481 score = self._effective_candidate_score(profile)
482 if stage == "ability_core":
483 return profile.dormant_steps >= 3 and score < 1.85
484 if stage == "skill_belt":
485 return profile.dormant_steps >= 2 and score < 1.1
486 if stage == "experience":
487 return profile.dormant_steps >= 2 and score < 0.45
488 return False
489
490 def _effective_candidate_score(self, profile: SedimentationProfile) -> float:
491 return max(0.0, profile.candidate_score - (0.35 * profile.dormant_steps))
492
493 def _record_merge_event(self, node: str, profile: SedimentationProfile) -> str:
494 support_nodes = []
495 for neighbor in self.state.graph.neighbors(node):
496 neighbor_profile = self._ensure_profile(neighbor)
497 if neighbor_profile.stage in {"skill_belt", "ability_core"}:
498 support_nodes.append(neighbor)
499 target_core = None
500 ability_cores = [
501 candidate
502 for candidate, candidate_profile in self.state.sedimentation.items()
503 if candidate != node and candidate_profile.stage == "ability_core"
504 ]
505 if ability_cores:
506 target_core = max(
507 ability_cores,
508 key=lambda candidate: (
509 self._ensure_profile(candidate).candidate_score,
510 self.state.phi.get(candidate, 0.0),
511 candidate,
512 ),
513 )
514 else:
515 target_core = node
516 event = {
517 "step": self.state.step_index,
518 "event": "skill_belt_merge",
519 "node": node,
520 "target_core": target_core,
521 "support_nodes": sorted(support_nodes)[:4],
522 "candidate_score": profile.candidate_score,
523 "stable_steps": profile.stable_steps,
524 }
525 self.state.merge_events.append(event)
526 self.state.merge_events = self.state.merge_events[-12:]
527 return target_core
528
529 def _compute_anchor_pull(self) -> float:
530 if not self.state.anchor_nodes or not self.state.mu:
531 return 0.0
532 total_activation = sum(self.state.mu.values()) or 1.0
533 anchor_mass = sum(self.state.mu.get(anchor, 0.0) for anchor in self.state.anchor_nodes)
534 core = self.state.bound_ability_core
535 core_anchor_flow = 0.0
536 if core:
537 for anchor in self.state.anchor_nodes:
538 core_anchor_flow += self.state.J.get((core, anchor), 0.0)
539 core_anchor_flow += self.state.J.get((anchor, core), 0.0)
540 pull = (anchor_mass / total_activation) + min(core_anchor_flow, 1.0) * 0.2
541 return min(1.0, pull)
542
543 def _record_decay(self, kind: str, target: str, amount: float, *, age: int) -> None:
544 self.state.decay_events.append(
545 {
546 "step": self.state.step_index,
547 "kind": kind,
548 "target": target,
549 "amount": _round(amount),
550 "age": age,
551 }
552 )
553 self.state.decay_events = self.state.decay_events[-24:]
554
555 def _select_bound_core(self) -> str | None:
556 nodes = self.state.graph.nodes()
557 if not nodes:
558 return None
559 stage_rank = {stage: index for index, stage in enumerate(STAGE_ORDER)}
560 return max(
561 nodes,
562 key=lambda node: (
563 stage_rank.get(self._ensure_profile(node).stage, 0),
564 self._effective_candidate_score(self._ensure_profile(node))
565 + (0.16 * self.state.phi.get(node, 0.0))
566 + (0.12 * self._node_flow(node))
567 + (0.06 * self.state.graph.weighted_degree(node))
568 + (0.08 * self.state.anchor_nodes.get(node, 0.0)),
569 self.state.mu.get(node, 0.0),
570 node,
571 ),
572 )
573
574 def _compute_drift_score(self) -> float:
575 active_total = sum(self.state.mu.values())
576 if active_total <= 0.0:
577 return 0.0
578 core = self.state.bound_ability_core
579 if not core:
580 return 0.0
581 attached = self.state.mu.get(core, 0.0)
582 core_neighbors = set(self.state.graph.neighbors(core))
583 detached_nodes = 0
584 for node, activation in self.state.mu.items():
585 if node == core:
586 continue
587 if node in core_neighbors:
588 attached += activation * 0.85
589 elif node in self.state.anchor_nodes:
590 attached += activation * 0.6
591 else:
592 detached_nodes += 1
593 support_ratio = min(attached / active_total, 1.0)
594 frontier_penalty = 0.08 * detached_nodes
595 anchor_penalty = 0.0
596 if self.state.anchor_nodes:
597 anchor_mass = sum(self.state.mu.get(anchor, 0.0) for anchor in self.state.anchor_nodes)
598 anchor_penalty = max(0.0, 0.18 - min(anchor_mass / active_total, 0.18))
599 drift = max(0.0, 1.0 - support_ratio + frontier_penalty + anchor_penalty)
600 return min(drift, 1.0)
601
602 def _confidence_proxy(self) -> float:
603 if not self.state.mu:
604 return 0.0
605 ordered = sorted(self.state.mu.values(), reverse=True)
606 total_activation = sum(ordered)
607 top = ordered[0]
608 second = ordered[1] if len(ordered) > 1 else 0.0
609 concentration = top / total_activation
610 separation = max(0.0, top - second) / max(top, 1e-9)
611 core = self.state.bound_ability_core or self._select_bound_core()
612 local_flow = self._node_flow(core) if core else top
613 total_flow = sum(self.state.J.values()) or 1.0
614 flow_ratio = min(local_flow / total_flow, 1.0)
615 if self.state.anchor_nodes:
616 anchor_mass = sum(self.state.mu.get(anchor, 0.0) for anchor in self.state.anchor_nodes)
617 anchor_ratio = min(anchor_mass / total_activation, 1.0)
618 else:
619 anchor_ratio = concentration
620 return min(1.0, (0.45 * concentration) + (0.25 * separation) + (0.2 * flow_ratio) + (0.1 * anchor_ratio))
621
622 def _choose_output_mode(self) -> str:
623 total_activation = sum(self.state.mu.values())
624 if total_activation <= 0.0:
625 return "minimal"
626 confidence = self.state.confidence_proxy or self._confidence_proxy()
627 if total_activation < 0.3 or self.state.free_capacity < 0.15 or confidence < 0.32:
628 return "minimal"
629 if self.state.free_capacity < 0.5 or self.state.drift_score > 0.45 or confidence < 0.55:
630 return "degraded"
631 return "full"
632
633 def _phi_summary(self) -> Dict[str, Any]:
634 total = sum(self.state.phi.values())
635 return {
636 "node_count": len(self.state.phi),
637 "total_potential": _round(total),
638 "top_nodes": self._top_scored(self.state.phi),
639 }
640
641 def _mu_summary(self) -> Dict[str, Any]:
642 total = sum(self.state.mu.values())
643 return {
644 "active_count": len(self.state.mu),
645 "total_activation": _round(total),
646 "top_nodes": self._top_scored(self.state.mu),
647 }
648
649 def _j_summary(self) -> Dict[str, Any]:
650 total = sum(self.state.J.values())
651 ordered = sorted(self.state.J.items(), key=lambda item: (-item[1], item[0]))
652 top_flows = [
653 {"edge": f"{source}->{target}", "flow": _round(flow)}
654 for (source, target), flow in ordered[:5]
655 ]
656 return {
657 "edge_count": len(self.state.J),
658 "total_flow": _round(total),
659 "top_flows": top_flows,
660 }
661
662 def _experience_regions(self) -> List[Dict[str, Any]]:
663 groups: Dict[str, Dict[str, Any]] = {}
664 for node, profile in self.state.sedimentation.items():
665 if profile.stage not in {"experience", "skill_belt", "ability_core"}:
666 continue
667 region = self._region_seed(node)
668 entry = groups.setdefault(
669 region,
670 {
671 "region": region,
672 "nodes": [],
673 "stage": profile.stage,
674 "activation": 0.0,
675 "potential": 0.0,
676 "candidate_score": 0.0,
677 "stable_steps": 0,
678 },
679 )
680 entry["nodes"].append(node)
681 if STAGE_ORDER.index(profile.stage) > STAGE_ORDER.index(entry["stage"]):
682 entry["stage"] = profile.stage
683 entry["activation"] += self.state.mu.get(node, 0.0)
684 entry["potential"] += self.state.phi.get(node, 0.0)
685 entry["candidate_score"] += profile.candidate_score
686 entry["stable_steps"] = max(entry["stable_steps"], profile.stable_steps)
687 items = []
688 for entry in groups.values():
689 items.append(
690 {
691 "region": entry["region"],
692 "nodes": sorted(entry["nodes"]),
693 "stage": entry["stage"],
694 "activation": _round(entry["activation"]),
695 "potential": _round(entry["potential"]),
696 "candidate_score": _round(entry["candidate_score"]),
697 "stable_steps": entry["stable_steps"],
698 }
699 )
700 return sorted(items, key=lambda item: (-item["candidate_score"], item["region"]))[:6]
701
702 def _skill_belt_candidates(self) -> List[Dict[str, Any]]:
703 items: List[Dict[str, Any]] = []
704 for node, profile in self.state.sedimentation.items():
705 if profile.stage == "memory" and profile.candidate_score < 0.9:
706 continue
707 items.append(
708 {
709 "node": node,
710 "score": profile.candidate_score,
711 "stage": profile.stage,
712 "flow": _round(self._node_flow(node)),
713 "stable_steps": profile.stable_steps,
714 "touches": self.state.touch_count.get(node, 0),
715 "target_core": profile.merged_into or self._region_seed(node),
716 }
717 )
718 return sorted(items, key=lambda item: (-item["score"], item["node"]))[:6]
719
720 def _region_seed(self, node: str) -> str:
721 profile = self._ensure_profile(node)
722 if profile.merged_into:
723 return profile.merged_into
724 if profile.stage == "ability_core":
725 return node
726 candidates = []
727 for neighbor in self.state.graph.neighbors(node):
728 neighbor_profile = self._ensure_profile(neighbor)
729 if neighbor_profile.stage == "ability_core":
730 candidates.append(
731 (
732 neighbor_profile.candidate_score + self.state.phi.get(neighbor, 0.0),
733 neighbor,
734 )
735 )
736 if candidates:
737 return max(candidates)[1]
738 return self.state.bound_ability_core or node
739
740 def _apply_feedback_signal(self, signal: PendingSignal) -> None:
741 effect = dict(self.state.feedback_effect)
742 mode = str(signal.metadata.get("mode", "feedback"))
743 focus_nodes = self._ordered_unique(signal.metadata.get("emitted_nodes", signal.tokens))
744 if not focus_nodes:
745 focus_nodes = self._ordered_unique(signal.tokens)
746 mode_scale = {"full": 1.0, "degraded": 0.72, "minimal": 0.45, "feedback": 0.68}.get(mode, 0.68)
747 phi_delta = 0.0
748 mu_delta = 0.0
749 flow_delta = 0.0
750 applied_tokens: List[str] = []
751 for index, node in enumerate(focus_nodes[:4]):
752 weight = (signal.strength * mode_scale) / (index + 1)
753 self.state.graph.ensure_node(node)
754 self.state.phi.setdefault(node, 0.0)
755 self.state.mu.setdefault(node, 0.0)
756 self._ensure_profile(node)
757 if signal.polarity >= 0:
758 phi_gain = 0.08 * weight
759 mu_gain = 0.11 * weight
760 self.state.phi[node] += phi_gain
761 self.state.mu[node] += mu_gain
762 phi_delta += phi_gain
763 mu_delta += mu_gain
764 else:
765 phi_loss = min(self.state.phi[node], 0.06 * weight)
766 mu_loss = min(self.state.mu[node], 0.1 * weight)
767 self.state.phi[node] -= phi_loss
768 self.state.mu[node] = max(0.0, self.state.mu[node] - mu_loss)
769 phi_delta -= phi_loss
770 mu_delta -= mu_loss
771 self.state.node_last_touched[node] = self.state.step_index
772 applied_tokens.append(node)
773 for left, right in zip(applied_tokens, applied_tokens[1:]):
774 gain = 0.09 * signal.strength * mode_scale * signal.polarity
775 self.state.J[(left, right)] = max(0.0, self.state.J.get((left, right), 0.0) + gain)
776 self.state.edge_last_touched[(left, right)] = self.state.step_index
777 flow_delta += gain
778 effect.update(
779 {
780 "source": signal.source,
781 "mode": mode,
782 "last_applied_step": self.state.step_index,
783 "applied_tokens": applied_tokens,
784 "phi_delta": _round(phi_delta),
785 "mu_delta": _round(mu_delta),
786 "flow_delta": _round(flow_delta),
787 }
788 )
789 self.state.feedback_effect = effect
790
791 def _degraded_feedback_tokens(self, active_nodes: List[str]) -> List[str]:
792 focus = []
793 if self.state.bound_ability_core:
794 focus.append(self.state.bound_ability_core)
795 for node in active_nodes:
796 if node not in focus:
797 focus.append(node)
798 if len(focus) >= 2:
799 break
800 return focus[:2] or active_nodes[:2]
801
802 def _node_flow(self, node: str | None) -> float:
803 if not node:
804 return 0.0
805 total = 0.0
806 for neighbor in self.state.graph.neighbors(node):
807 total += self.state.J.get((node, neighbor), 0.0)
808 total += self.state.J.get((neighbor, node), 0.0)
809 return total
810
811 def _top_anchor_nodes(self, limit: int = 2) -> List[str]:
812 ordered = sorted(self.state.anchor_nodes.items(), key=lambda item: (-item[1], item[0]))
813 return [node for node, _ in ordered[:limit]]
814
815 def _top_scored(self, values: Dict[str, float], limit: int = 5) -> List[Dict[str, Any]]:
816 ordered = sorted(values.items(), key=lambda item: (-item[1], item[0]))
817 return [{"node": node, "value": _round(value)} for node, value in ordered[:limit]]
818
819 def _top_nodes(self, values: Dict[str, float], limit: int = 5) -> List[str]:
820 ordered = sorted(values.items(), key=lambda item: (-item[1], item[0]))
821 return [node for node, _ in ordered[:limit]]
822
823 def _normalize_feedback(self, feedback: Any) -> Dict[str, Any]:
824 if isinstance(feedback, dict):
825 text = " ".join(
826 str(feedback.get(key, ""))
827 for key in ("text", "label", "note")
828 if feedback.get(key) is not None
829 )
830 polarity = -1 if float(feedback.get("value", 1.0)) < 0 else 1
831 strength = max(0.2, min(1.2, abs(float(feedback.get("value", 1.0)))))
832 context_tokens = self._tokenize(feedback.get("context"))
833 else:
834 text = str(feedback)
835 polarity = 1
836 strength = 0.8
837 context_tokens = []
838 tokens = self._tokenize(text) or ["feedback"]
839 return {
840 "tokens": tokens,
841 "context_tokens": context_tokens,
842 "strength": strength,
843 "polarity": polarity,
844 }
845
846 def _ordered_unique(self, values: Iterable[Any]) -> List[str]:
847 ordered: List[str] = []
848 seen = set()
849 for value in values:
850 token = str(value)
851 if not token or token in seen:
852 continue
853 seen.add(token)
854 ordered.append(token)
855 return ordered
856
857 def _tokenize(self, payload: Any) -> List[str]:
858 if payload is None:
859 return []
860 if isinstance(payload, str):
861 text = payload
862 elif isinstance(payload, dict):
863 text = " ".join(str(value) for value in payload.values() if value is not None)
864 elif isinstance(payload, Iterable):
865 text = " ".join(str(item) for item in payload if item is not None)
866 else:
867 text = str(payload)
868 tokens = [token.lower() for token in re.findall(r"\w+", text, flags=re.UNICODE)]
869 return tokens[:8]