CIE-Unified

git clone 

CIE-Unified / tests
im_wower  ·  2026-03-31

analyze_emergent.py

  1"""
  2CIE 涌现结构分析
  3=================
  4用真实课本数据跑 runtime,分析图上涌现了什么结构。
  5
  6验证核心命题:
  7  - 词/属性/概念边界不是预制的,是流动反复重叠后涌现的稳定结构
  8  - 对称→不动点(知识),非对称→极限环(技能)
  9  - 高频字成为 hub(路径汇聚度 κ 高)
 10  - 锚点核自动浮出
 11"""
 12
 13import sys
 14import os
 15import json
 16import math
 17from collections import Counter
 18
 19sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 20from cie import CIERuntime
 21
 22DATA_DIR = "/Users/george/code/china-text-book-md"
 23
 24
 25def load_textbook_clean(filename):
 26    """加载课本,提取纯中文段落"""
 27    path = os.path.join(DATA_DIR, filename)
 28    with open(path, "r", encoding="utf-8") as f:
 29        raw = f.read()
 30    paragraphs = []
 31    for line in raw.split("\n"):
 32        line = line.strip()
 33        if not line or line.startswith("#") or line.startswith("**") or line.startswith("---"):
 34            continue
 35        if line.startswith("!["):
 36            continue
 37        ctrl = sum(1 for c in line if ord(c) < 32 and c not in '\n\t')
 38        if ctrl > len(line) * 0.3:
 39            continue
 40        cn = sum(1 for c in line if '\u4e00' <= c <= '\u9fff')
 41        if cn >= 2:
 42            paragraphs.append(line)
 43    return paragraphs
 44
 45
 46def analyze_graph(rt, title):
 47    """分析图的涌现结构"""
 48    g = rt.graph
 49    s = rt.state
 50    
 51    result = {"title": title}
 52    
 53    # ── 基本统计 ──
 54    result["nodes"] = g.node_count
 55    result["edges"] = g.edge_count
 56    
 57    # ── Hub 节点:路径汇聚度 κ 最高的节点 ──
 58    kappas = {}
 59    for nid in g.nodes:
 60        kappas[nid] = g.convergence(nid)
 61    top_hubs = sorted(kappas.items(), key=lambda x: -x[1])[:20]
 62    result["top_hubs"] = [(nid, round(k, 2)) for nid, k in top_hubs]
 63    
 64    # ── 高频字 vs Hub 的一致性 ──
 65    # 高频字应该成为 hub(κ 高)
 66    hits = sorted(s.experience_hits.items(), key=lambda x: -x[1])[:20]
 67    result["top_hits"] = [(nid, h) for nid, h in hits]
 68    
 69    # hub 和高频字的重叠度
 70    hub_set = set(nid for nid, _ in top_hubs[:10])
 71    hit_set = set(nid for nid, _ in hits[:10])
 72    overlap = hub_set & hit_set
 73    result["hub_hit_overlap"] = len(overlap)
 74    result["hub_hit_overlap_pct"] = round(len(overlap) / max(len(hub_set), 1) * 100)
 75    
 76    # ── 非对称度分布 ──
 77    asymmetries = []
 78    for src_edges in g.fwd_edges.values():
 79        for dst, edge in src_edges.items():
 80            bwd = g.get_bwd_weight(edge.src, dst)
 81            asym = abs(edge.weight - bwd)
 82            asymmetries.append(asym)
 83    if asymmetries:
 84        result["asym_mean"] = round(sum(asymmetries) / len(asymmetries), 4)
 85        result["asym_max"] = round(max(asymmetries), 4)
 86        result["asym_nonzero_pct"] = round(sum(1 for a in asymmetries if a > 0.01) / len(asymmetries) * 100)
 87    
 88    # ── 闭环检测:找长度3-4的环并计算环流 ──
 89    circuits = []
 90    nodes_list = list(g.nodes.keys())
 91    checked = 0
 92    for i, a in enumerate(nodes_list[:50]):  # 只检查前50个节点
 93        for b in g.neighbors_fwd(a)[:10]:
 94            for c in g.neighbors_fwd(b)[:10]:
 95                if a in g.neighbors_fwd(c):
 96                    circ = g.circulation([a, b, c, a])
 97                    rev_circ = g.circulation([a, c, b, a])
 98                    if abs(circ - rev_circ) > 0.1:  # 非对称环流
 99                        circuits.append({
100                            'path': f"{a}{b}{c}{a}",
101                            'circulation': round(circ, 2),
102                            'reverse': round(rev_circ, 2),
103                            'asymmetry': round(abs(circ - rev_circ), 2)
104                        })
105                        checked += 1
106            if checked > 30:
107                break
108        if checked > 30:
109            break
110    
111    circuits.sort(key=lambda x: -x['asymmetry'])
112    result["top_circuits"] = circuits[:10]
113    result["total_circuits_found"] = len(circuits)
114    
115    # ── 锚点核 ──
116    result["anchor_count"] = len(s.anchor_nodes)
117    result["anchor_nodes"] = list(s.anchor_nodes)[:20]
118    
119    # ── 能力核 ──
120    result["ability_cores"] = {
121        cid: list(nodes)[:10] for cid, nodes in s.ability_cores.items()
122    }
123    result["ability_core_count"] = len(s.ability_cores)
124    result["total_core_nodes"] = sum(len(n) for n in s.ability_cores.values())
125    
126    # ── 经验层 / 技能带 ──
127    exp = s.experience_regions.get('experience', set())
128    result["experience_count"] = len(exp)
129    result["skill_belt_count"] = len(s.skill_belt_candidates)
130    result["top_skill_belts"] = sorted(
131        s.skill_belt_candidates.items(), key=lambda x: -x[1])[:10]
132    
133    # ── 置信度分布 ──
134    confidences = [(nid, s.get_confidence(nid)) for nid in g.nodes]
135    confidences.sort(key=lambda x: -x[1])
136    result["top_confidence"] = [(nid, round(c, 3)) for nid, c in confidences[:10]]
137    c_vals = [c for _, c in confidences]
138    if c_vals:
139        result["confidence_mean"] = round(sum(c_vals) / len(c_vals), 3)
140        result["confidence_high_pct"] = round(sum(1 for c in c_vals if c > 0.5) / len(c_vals) * 100)
141    
142    # ── φ 分布 ──
143    phi_vals = sorted(s.phi.items(), key=lambda x: -abs(x[1]))
144    result["top_phi"] = [(nid, round(v, 4)) for nid, v in phi_vals[:10]]
145    result["phi_positive"] = sum(1 for _, v in phi_vals if v > 0.01)
146    result["phi_negative"] = sum(1 for _, v in phi_vals if v < -0.01)
147    
148    # ── 沉积统计 ──
149    transitions = Counter(t['transition'] for t in s.sedimentation_trace)
150    result["sedimentation_transitions"] = dict(transitions)
151    result["total_sed_events"] = len(s.sedimentation_trace)
152    result["total_merge_events"] = len(s.merge_events)
153    result["total_decay_events"] = len(s.decay_events)
154    
155    return result
156
157
158def run_analysis():
159    """对每本课本单独分析 + 全部灌入分析"""
160    
161    textbooks = [
162        ("小学语文一上", "小学_语文_统编版_义务教育教科书·语文一年级上册.md"),
163        ("小学数学一上", "小学_数学_人教版_义务教育教科书 · 数学一年级上册.md"),
164        ("初中语文七上", "初中_语文_统编版-人民教育出版社_七年级_义务教育教科书·语文七年级上册.md"),
165        ("初中数学七上", "初中_数学_人教版-人民教育出版社_七年级_义务教育教科书·数学七年级上册.md"),
166        ("高中语文必修上", "高中_语文_统编版-人民教育出版社_普通高中教科书·语文必修 上册.md"),
167    ]
168    
169    all_results = []
170    
171    # ── 单本分析 ──
172    for name, filename in textbooks:
173        print(f"\n{'='*60}")
174        print(f"  分析: {name}")
175        print(f"{'='*60}")
176        
177        rt = CIERuntime(seed=42)
178        paras = load_textbook_clean(filename)
179        print(f"  段落数: {len(paras)}")
180        
181        # 喂入(前100段,每段截60字)
182        feed_count = min(100, len(paras))
183        for p in paras[:feed_count]:
184            rt.ingest(p[:60])
185            rt.step(n=2)
186        
187        result = analyze_graph(rt, name)
188        result["paragraphs_fed"] = feed_count
189        all_results.append(result)
190        
191        print_result(result)
192    
193    # ── 全灌分析 ──
194    print(f"\n{'='*60}")
195    print(f"  分析: 全5本课本灌入同一 runtime")
196    print(f"{'='*60}")
197    
198    rt_all = CIERuntime(seed=42)
199    total_fed = 0
200    for name, filename in textbooks:
201        paras = load_textbook_clean(filename)
202        for p in paras[:60]:
203            rt_all.ingest(p[:50])
204            rt_all.step(n=2)
205            total_fed += 1
206    
207    result_all = analyze_graph(rt_all, "全5本课本")
208    result_all["paragraphs_fed"] = total_fed
209    all_results.append(result_all)
210    print_result(result_all)
211    
212    # ── 输出 JSON ──
213    with open("/tmp/cie_analysis.json", "w") as f:
214        json.dump(all_results, f, ensure_ascii=False, indent=2, default=str)
215    print(f"\n\n完整数据已写入 /tmp/cie_analysis.json")
216    
217    # ── 核心结论 ──
218    print(f"\n{'='*60}")
219    print(f"  核心结论")
220    print(f"{'='*60}")
221    
222    for r in all_results:
223        hub_pct = r.get("hub_hit_overlap_pct", 0)
224        asym_pct = r.get("asym_nonzero_pct", 0)
225        circuits = r.get("total_circuits_found", 0)
226        anchors = r.get("anchor_count", 0)
227        cores = r.get("ability_core_count", 0)
228        
229        print(f"\n  {r['title']}:")
230        print(f"    Hub=高频字一致性: {hub_pct}%")
231        print(f"    非对称边比例: {asym_pct}%")
232        print(f"    闭环数: {circuits}")
233        print(f"    锚点核: {anchors}")
234        print(f"    能力核: {cores}")
235    
236    return all_results
237
238
239def print_result(r):
240    print(f"\n  图规模: {r['nodes']} nodes, {r['edges']} edges")
241    
242    print(f"\n  Top Hub 节点 (κ 最高):")
243    for nid, k in r['top_hubs'][:10]:
244        h = dict(r['top_hits']).get(nid, 0)
245        print(f"    '{nid}': κ={k}, hits={h}")
246    
247    print(f"\n  Hub-高频字重叠: {r['hub_hit_overlap']}/10 ({r['hub_hit_overlap_pct']}%)")
248    
249    if r.get('asym_nonzero_pct') is not None:
250        print(f"  非对称边: {r['asym_nonzero_pct']}%, mean={r['asym_mean']}, max={r['asym_max']}")
251    
252    if r['top_circuits']:
253        print(f"\n  Top 闭环 (非对称环流):")
254        for c in r['top_circuits'][:5]:
255            print(f"    {c['path']}: fwd={c['circulation']}, rev={c['reverse']}, asym={c['asymmetry']}")
256    
257    print(f"  闭环总数: {r['total_circuits_found']}")
258    print(f"  锚点核: {r['anchor_count']} 个 — {r['anchor_nodes'][:10]}")
259    print(f"  能力核: {r['ability_core_count']} 个, 含 {r['total_core_nodes']} 节点")
260    
261    print(f"\n  沉积统计: {r['sedimentation_transitions']}")
262    print(f"  总沉积事件: {r['total_sed_events']}, 合并: {r['total_merge_events']}, 衰减: {r['total_decay_events']}")
263    
264    print(f"\n  Top 置信度:")
265    for nid, c in r['top_confidence'][:5]:
266        print(f"    '{nid}': c={c}")
267    print(f"  高置信度(>0.5)占比: {r.get('confidence_high_pct', 0)}%")
268
269
270if __name__ == '__main__':
271    run_analysis()