im_wower
·
2026-03-31
analyze_emergent.py
1"""
2CIE 涌现结构分析
3=================
4用真实课本数据跑 runtime,分析图上涌现了什么结构。
5
6验证核心命题:
7 - 词/属性/概念边界不是预制的,是流动反复重叠后涌现的稳定结构
8 - 对称→不动点(知识),非对称→极限环(技能)
9 - 高频字成为 hub(路径汇聚度 κ 高)
10 - 锚点核自动浮出
11"""
12
13import sys
14import os
15import json
16import math
17from collections import Counter
18
19sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
20from cie import CIERuntime
21
22DATA_DIR = "/Users/george/code/china-text-book-md"
23
24
25def load_textbook_clean(filename):
26 """加载课本,提取纯中文段落"""
27 path = os.path.join(DATA_DIR, filename)
28 with open(path, "r", encoding="utf-8") as f:
29 raw = f.read()
30 paragraphs = []
31 for line in raw.split("\n"):
32 line = line.strip()
33 if not line or line.startswith("#") or line.startswith("**") or line.startswith("---"):
34 continue
35 if line.startswith("!["):
36 continue
37 ctrl = sum(1 for c in line if ord(c) < 32 and c not in '\n\t')
38 if ctrl > len(line) * 0.3:
39 continue
40 cn = sum(1 for c in line if '\u4e00' <= c <= '\u9fff')
41 if cn >= 2:
42 paragraphs.append(line)
43 return paragraphs
44
45
46def analyze_graph(rt, title):
47 """分析图的涌现结构"""
48 g = rt.graph
49 s = rt.state
50
51 result = {"title": title}
52
53 # ── 基本统计 ──
54 result["nodes"] = g.node_count
55 result["edges"] = g.edge_count
56
57 # ── Hub 节点:路径汇聚度 κ 最高的节点 ──
58 kappas = {}
59 for nid in g.nodes:
60 kappas[nid] = g.convergence(nid)
61 top_hubs = sorted(kappas.items(), key=lambda x: -x[1])[:20]
62 result["top_hubs"] = [(nid, round(k, 2)) for nid, k in top_hubs]
63
64 # ── 高频字 vs Hub 的一致性 ──
65 # 高频字应该成为 hub(κ 高)
66 hits = sorted(s.experience_hits.items(), key=lambda x: -x[1])[:20]
67 result["top_hits"] = [(nid, h) for nid, h in hits]
68
69 # hub 和高频字的重叠度
70 hub_set = set(nid for nid, _ in top_hubs[:10])
71 hit_set = set(nid for nid, _ in hits[:10])
72 overlap = hub_set & hit_set
73 result["hub_hit_overlap"] = len(overlap)
74 result["hub_hit_overlap_pct"] = round(len(overlap) / max(len(hub_set), 1) * 100)
75
76 # ── 非对称度分布 ──
77 asymmetries = []
78 for src_edges in g.fwd_edges.values():
79 for dst, edge in src_edges.items():
80 bwd = g.get_bwd_weight(edge.src, dst)
81 asym = abs(edge.weight - bwd)
82 asymmetries.append(asym)
83 if asymmetries:
84 result["asym_mean"] = round(sum(asymmetries) / len(asymmetries), 4)
85 result["asym_max"] = round(max(asymmetries), 4)
86 result["asym_nonzero_pct"] = round(sum(1 for a in asymmetries if a > 0.01) / len(asymmetries) * 100)
87
88 # ── 闭环检测:找长度3-4的环并计算环流 ──
89 circuits = []
90 nodes_list = list(g.nodes.keys())
91 checked = 0
92 for i, a in enumerate(nodes_list[:50]): # 只检查前50个节点
93 for b in g.neighbors_fwd(a)[:10]:
94 for c in g.neighbors_fwd(b)[:10]:
95 if a in g.neighbors_fwd(c):
96 circ = g.circulation([a, b, c, a])
97 rev_circ = g.circulation([a, c, b, a])
98 if abs(circ - rev_circ) > 0.1: # 非对称环流
99 circuits.append({
100 'path': f"{a}→{b}→{c}→{a}",
101 'circulation': round(circ, 2),
102 'reverse': round(rev_circ, 2),
103 'asymmetry': round(abs(circ - rev_circ), 2)
104 })
105 checked += 1
106 if checked > 30:
107 break
108 if checked > 30:
109 break
110
111 circuits.sort(key=lambda x: -x['asymmetry'])
112 result["top_circuits"] = circuits[:10]
113 result["total_circuits_found"] = len(circuits)
114
115 # ── 锚点核 ──
116 result["anchor_count"] = len(s.anchor_nodes)
117 result["anchor_nodes"] = list(s.anchor_nodes)[:20]
118
119 # ── 能力核 ──
120 result["ability_cores"] = {
121 cid: list(nodes)[:10] for cid, nodes in s.ability_cores.items()
122 }
123 result["ability_core_count"] = len(s.ability_cores)
124 result["total_core_nodes"] = sum(len(n) for n in s.ability_cores.values())
125
126 # ── 经验层 / 技能带 ──
127 exp = s.experience_regions.get('experience', set())
128 result["experience_count"] = len(exp)
129 result["skill_belt_count"] = len(s.skill_belt_candidates)
130 result["top_skill_belts"] = sorted(
131 s.skill_belt_candidates.items(), key=lambda x: -x[1])[:10]
132
133 # ── 置信度分布 ──
134 confidences = [(nid, s.get_confidence(nid)) for nid in g.nodes]
135 confidences.sort(key=lambda x: -x[1])
136 result["top_confidence"] = [(nid, round(c, 3)) for nid, c in confidences[:10]]
137 c_vals = [c for _, c in confidences]
138 if c_vals:
139 result["confidence_mean"] = round(sum(c_vals) / len(c_vals), 3)
140 result["confidence_high_pct"] = round(sum(1 for c in c_vals if c > 0.5) / len(c_vals) * 100)
141
142 # ── φ 分布 ──
143 phi_vals = sorted(s.phi.items(), key=lambda x: -abs(x[1]))
144 result["top_phi"] = [(nid, round(v, 4)) for nid, v in phi_vals[:10]]
145 result["phi_positive"] = sum(1 for _, v in phi_vals if v > 0.01)
146 result["phi_negative"] = sum(1 for _, v in phi_vals if v < -0.01)
147
148 # ── 沉积统计 ──
149 transitions = Counter(t['transition'] for t in s.sedimentation_trace)
150 result["sedimentation_transitions"] = dict(transitions)
151 result["total_sed_events"] = len(s.sedimentation_trace)
152 result["total_merge_events"] = len(s.merge_events)
153 result["total_decay_events"] = len(s.decay_events)
154
155 return result
156
157
158def run_analysis():
159 """对每本课本单独分析 + 全部灌入分析"""
160
161 textbooks = [
162 ("小学语文一上", "小学_语文_统编版_义务教育教科书·语文一年级上册.md"),
163 ("小学数学一上", "小学_数学_人教版_义务教育教科书 · 数学一年级上册.md"),
164 ("初中语文七上", "初中_语文_统编版-人民教育出版社_七年级_义务教育教科书·语文七年级上册.md"),
165 ("初中数学七上", "初中_数学_人教版-人民教育出版社_七年级_义务教育教科书·数学七年级上册.md"),
166 ("高中语文必修上", "高中_语文_统编版-人民教育出版社_普通高中教科书·语文必修 上册.md"),
167 ]
168
169 all_results = []
170
171 # ── 单本分析 ──
172 for name, filename in textbooks:
173 print(f"\n{'='*60}")
174 print(f" 分析: {name}")
175 print(f"{'='*60}")
176
177 rt = CIERuntime(seed=42)
178 paras = load_textbook_clean(filename)
179 print(f" 段落数: {len(paras)}")
180
181 # 喂入(前100段,每段截60字)
182 feed_count = min(100, len(paras))
183 for p in paras[:feed_count]:
184 rt.ingest(p[:60])
185 rt.step(n=2)
186
187 result = analyze_graph(rt, name)
188 result["paragraphs_fed"] = feed_count
189 all_results.append(result)
190
191 print_result(result)
192
193 # ── 全灌分析 ──
194 print(f"\n{'='*60}")
195 print(f" 分析: 全5本课本灌入同一 runtime")
196 print(f"{'='*60}")
197
198 rt_all = CIERuntime(seed=42)
199 total_fed = 0
200 for name, filename in textbooks:
201 paras = load_textbook_clean(filename)
202 for p in paras[:60]:
203 rt_all.ingest(p[:50])
204 rt_all.step(n=2)
205 total_fed += 1
206
207 result_all = analyze_graph(rt_all, "全5本课本")
208 result_all["paragraphs_fed"] = total_fed
209 all_results.append(result_all)
210 print_result(result_all)
211
212 # ── 输出 JSON ──
213 with open("/tmp/cie_analysis.json", "w") as f:
214 json.dump(all_results, f, ensure_ascii=False, indent=2, default=str)
215 print(f"\n\n完整数据已写入 /tmp/cie_analysis.json")
216
217 # ── 核心结论 ──
218 print(f"\n{'='*60}")
219 print(f" 核心结论")
220 print(f"{'='*60}")
221
222 for r in all_results:
223 hub_pct = r.get("hub_hit_overlap_pct", 0)
224 asym_pct = r.get("asym_nonzero_pct", 0)
225 circuits = r.get("total_circuits_found", 0)
226 anchors = r.get("anchor_count", 0)
227 cores = r.get("ability_core_count", 0)
228
229 print(f"\n {r['title']}:")
230 print(f" Hub=高频字一致性: {hub_pct}%")
231 print(f" 非对称边比例: {asym_pct}%")
232 print(f" 闭环数: {circuits}")
233 print(f" 锚点核: {anchors} 个")
234 print(f" 能力核: {cores} 个")
235
236 return all_results
237
238
239def print_result(r):
240 print(f"\n 图规模: {r['nodes']} nodes, {r['edges']} edges")
241
242 print(f"\n Top Hub 节点 (κ 最高):")
243 for nid, k in r['top_hubs'][:10]:
244 h = dict(r['top_hits']).get(nid, 0)
245 print(f" '{nid}': κ={k}, hits={h}")
246
247 print(f"\n Hub-高频字重叠: {r['hub_hit_overlap']}/10 ({r['hub_hit_overlap_pct']}%)")
248
249 if r.get('asym_nonzero_pct') is not None:
250 print(f" 非对称边: {r['asym_nonzero_pct']}%, mean={r['asym_mean']}, max={r['asym_max']}")
251
252 if r['top_circuits']:
253 print(f"\n Top 闭环 (非对称环流):")
254 for c in r['top_circuits'][:5]:
255 print(f" {c['path']}: fwd={c['circulation']}, rev={c['reverse']}, asym={c['asymmetry']}")
256
257 print(f" 闭环总数: {r['total_circuits_found']}")
258 print(f" 锚点核: {r['anchor_count']} 个 — {r['anchor_nodes'][:10]}")
259 print(f" 能力核: {r['ability_core_count']} 个, 含 {r['total_core_nodes']} 节点")
260
261 print(f"\n 沉积统计: {r['sedimentation_transitions']}")
262 print(f" 总沉积事件: {r['total_sed_events']}, 合并: {r['total_merge_events']}, 衰减: {r['total_decay_events']}")
263
264 print(f"\n Top 置信度:")
265 for nid, c in r['top_confidence'][:5]:
266 print(f" '{nid}': c={c}")
267 print(f" 高置信度(>0.5)占比: {r.get('confidence_high_pct', 0)}%")
268
269
270if __name__ == '__main__':
271 run_analysis()