CIE-Unified

git clone 

CIE-Unified / tests
im_wower  ·  2026-03-31

word_emergence.py

  1"""
  2Phase 8: 词结构涌现验证
  3========================
  4核心命题验证:纯字符 bigram 注入后,图上能否涌现出词级结构?
  5
  6方法:
  7  1. 把课本文本按字符灌入 CIE
  8  2. 分析哪些 bigram 的非对称比(fwd/bwd)最高——这些是"词内部"连接
  9  3. 哪些 bigram 的非对称比最接近 1——这些是"词边界"
 10  4. 用非对称比自动切分文本,与已知词边界对比
 11
 12这是 CIE-Ref 中"no-segmentation verification"的 runtime 版本。
 13"""
 14
 15import sys
 16import os
 17import math
 18from collections import defaultdict, Counter
 19
 20sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 21from cie import CIERuntime
 22
 23DATA_DIR = "/Users/george/code/china-text-book-md"
 24
 25
 26def load_clean_text(filename, max_chars=50000):
 27    path = os.path.join(DATA_DIR, filename)
 28    with open(path, "r", encoding="utf-8") as f:
 29        raw = f.read()
 30    chars = []
 31    for line in raw.split("\n"):
 32        line = line.strip()
 33        if not line or line.startswith("#") or line.startswith("**") or line.startswith("---"):
 34            continue
 35        if line.startswith("!["):
 36            continue
 37        ctrl = sum(1 for c in line if ord(c) < 32 and c not in '\n\t')
 38        if ctrl > len(line) * 0.3:
 39            continue
 40        cn = sum(1 for c in line if '\u4e00' <= c <= '\u9fff')
 41        if cn >= 2:
 42            chars.extend(list(line))
 43            chars.append('\n')
 44        if len(chars) >= max_chars:
 45            break
 46    return chars[:max_chars]
 47
 48
 49def bigram_asymmetry_analysis(rt):
 50    """分析所有 bigram 的非对称比 fwd/bwd"""
 51    g = rt.graph
 52    results = []
 53    
 54    for src_edges in g.fwd_edges.values():
 55        for dst, edge in src_edges.items():
 56            fwd = edge.weight
 57            bwd = g.get_bwd_weight(edge.src, dst)
 58            if bwd > 0.01:
 59                ratio = fwd / bwd
 60            else:
 61                ratio = fwd * 100  # bwd ≈ 0 means very asymmetric
 62            
 63            results.append({
 64                'src': edge.src,
 65                'dst': dst,
 66                'fwd': fwd,
 67                'bwd': bwd,
 68                'ratio': ratio,
 69                'bigram': edge.src + dst,
 70            })
 71    
 72    results.sort(key=lambda x: -x['ratio'])
 73    return results
 74
 75
 76def word_boundary_detection(rt, text_chars):
 77    """
 78    用非对称比检测词边界:
 79    - ratio 高 → 词内部(强方向性)
 80    - ratio 接近 1 → 词边界(无方向偏好)
 81    
 82    返回切分结果。
 83    """
 84    g = rt.graph
 85    segments = []
 86    current_word = []
 87    
 88    threshold = 1.3  # ratio < 1.3 判为边界
 89    
 90    for i in range(len(text_chars) - 1):
 91        a, b = text_chars[i], text_chars[i + 1]
 92        current_word.append(a)
 93        
 94        # 跳过非中文字符
 95        if not ('\u4e00' <= a <= '\u9fff') or not ('\u4e00' <= b <= '\u9fff'):
 96            if current_word:
 97                segments.append(''.join(current_word))
 98                current_word = []
 99            continue
100        
101        fwd = g.get_edge_weight(a, b)
102        bwd = g.get_bwd_weight(a, b)
103        
104        if bwd > 0.01:
105            ratio = fwd / bwd
106        else:
107            ratio = fwd * 10 if fwd > 0 else 1.0
108        
109        if ratio < threshold:
110            # 词边界
111            if current_word:
112                segments.append(''.join(current_word))
113                current_word = []
114    
115    if current_word:
116        current_word.append(text_chars[-1])
117        segments.append(''.join(current_word))
118    
119    return segments
120
121
122def evaluate_segmentation(segments, known_words):
123    """评估切分质量:命中已知词的比例"""
124    found = 0
125    total_known = len(known_words)
126    
127    seg_set = set(segments)
128    for w in known_words:
129        if w in seg_set:
130            found += 1
131    
132    # 也检查 segments 中有多少是 2-4 字的中文片段
133    cn_segs = [s for s in segments if all('\u4e00' <= c <= '\u9fff' for c in s) and 2 <= len(s) <= 4]
134    
135    return {
136        'total_segments': len(segments),
137        'cn_word_segments': len(cn_segs),
138        'known_word_hits': found,
139        'known_word_total': total_known,
140        'hit_rate': round(found / max(total_known, 1) * 100, 1),
141        'sample_cn_segs': cn_segs[:30],
142    }
143
144
145def run_experiment():
146    """主实验:小学语文一上"""
147    print("=" * 60)
148    print("  实验: 纯 bigram 流动能否涌现词结构")
149    print("=" * 60)
150    
151    # ── 准备数据 ──
152    filename = "小学_语文_统编版_义务教育教科书·语文一年级上册.md"
153    chars = load_clean_text(filename, max_chars=30000)
154    print(f"\n数据: 小学语文一上, {len(chars)} 字符")
155    
156    # ── 灌入 CIE ──
157    rt = CIERuntime(seed=42)
158    
159    # 分批灌入
160    batch_size = 100
161    for i in range(0, len(chars), batch_size):
162        batch = chars[i:i+batch_size]
163        rt.ingest(batch)
164        rt.step(n=2)
165    
166    print(f"图规模: {rt.graph.node_count} nodes, {rt.graph.edge_count} edges")
167    
168    # ── 分析非对称比 ──
169    bigrams = bigram_asymmetry_analysis(rt)
170    
171    print(f"\n{'='*60}")
172    print(f"  1. 非对称比最高的 bigram(词内部连接)")
173    print(f"{'='*60}")
174    
175    # 只看中文 bigram
176    cn_bigrams = [b for b in bigrams 
177                  if '\u4e00' <= b['src'] <= '\u9fff' and '\u4e00' <= b['dst'] <= '\u9fff']
178    
179    print(f"\nTop 20 高非对称比(强方向性 = 词内部):")
180    for b in cn_bigrams[:20]:
181        print(f"  {b['src']}{b['dst']}: ratio={b['ratio']:.2f} (fwd={b['fwd']:.1f}, bwd={b['bwd']:.1f})")
182    
183    print(f"\nTop 20 低非对称比(弱方向性 = 词边界):")
184    cn_bigrams_low = sorted(cn_bigrams, key=lambda x: x['ratio'])
185    for b in cn_bigrams_low[:20]:
186        print(f"  {b['src']}{b['dst']}: ratio={b['ratio']:.2f} (fwd={b['fwd']:.1f}, bwd={b['bwd']:.1f})")
187    
188    # ── 词边界检测 ──
189    print(f"\n{'='*60}")
190    print(f"  2. 基于非对称比的自动切分")
191    print(f"{'='*60}")
192    
193    # 取一段中文文本做切分
194    test_sentences = []
195    cn_text = []
196    for c in chars:
197        if '\u4e00' <= c <= '\u9fff':
198            cn_text.append(c)
199        elif cn_text:
200            if len(cn_text) >= 4:
201                test_sentences.append(''.join(cn_text))
202            cn_text = []
203    
204    # 一年级语文的已知常见词
205    known_words_grade1 = [
206        "老师", "学生", "同学", "小鸟", "大小", "上下", "左右", 
207        "学校", "花朵", "天地", "日月", "我们", "你们", "他们",
208        "爸爸", "妈妈", "太阳", "月亮", "星星", "白云",
209        "小朋友", "春天", "秋天", "冬天", "夏天",
210        "美丽", "快乐", "高兴", "漂亮", "可爱",
211        "什么", "为什", "这个", "那个", "不是",
212        "语文", "数学", "读书", "写字", "画画",
213        "自己", "大家", "小学", "中国", "北京",
214    ]
215    
216    all_segments = []
217    for sent in test_sentences[:50]:
218        segs = word_boundary_detection(rt, list(sent))
219        all_segments.extend(segs)
220    
221    eval_result = evaluate_segmentation(all_segments, known_words_grade1)
222    
223    print(f"\n切分统计:")
224    print(f"  总片段: {eval_result['total_segments']}")
225    print(f"  中文词片段(2-4字): {eval_result['cn_word_segments']}")
226    print(f"  命中已知词: {eval_result['known_word_hits']}/{eval_result['known_word_total']} ({eval_result['hit_rate']}%)")
227    
228    print(f"\n涌现的中文片段(前30个):")
229    for seg in eval_result['sample_cn_segs']:
230        print(f"{seg}", end="")
231    print()
232    
233    # ── Hub 分析:哪些字是连接中心 ──
234    print(f"\n{'='*60}")
235    print(f"  3. Hub 节点分析(高 κ = 语言骨架)")
236    print(f"{'='*60}")
237    
238    kappas = {}
239    for nid in rt.graph.nodes:
240        if '\u4e00' <= nid <= '\u9fff':
241            kappas[nid] = rt.graph.convergence(nid)
242    
243    top_hubs = sorted(kappas.items(), key=lambda x: -x[1])[:30]
244    print(f"\nTop 30 中文 Hub 字:")
245    for nid, k in top_hubs:
246        hits = rt.state.experience_hits.get(nid, 0)
247        c = rt.state.get_confidence(nid)
248        print(f"  '{nid}': κ={k:.2f}, hits={hits}, c={c:.3f}")
249    
250    # ── 闭环分析:哪些字形成技能环路 ──
251    print(f"\n{'='*60}")
252    print(f"  4. 闭环分析(非对称环流 = 固化的模式)")
253    print(f"{'='*60}")
254    
255    circuits = []
256    cn_nodes = [n for n in rt.graph.nodes if '\u4e00' <= n <= '\u9fff']
257    for a in cn_nodes[:30]:
258        for b in rt.graph.neighbors_fwd(a):
259            if not ('\u4e00' <= b <= '\u9fff'):
260                continue
261            for c in rt.graph.neighbors_fwd(b):
262                if not ('\u4e00' <= c <= '\u9fff'):
263                    continue
264                if a in rt.graph.neighbors_fwd(c):
265                    circ_fwd = rt.graph.circulation([a, b, c, a])
266                    circ_rev = rt.graph.circulation([a, c, b, a])
267                    asym = abs(circ_fwd - circ_rev)
268                    if asym > 0.5:
269                        circuits.append((f"{a}{b}{c}", circ_fwd, circ_rev, asym))
270    
271    circuits.sort(key=lambda x: -x[3])
272    print(f"\nTop 20 中文三字闭环:")
273    for tri, fwd, rev, asym in circuits[:20]:
274        print(f"{tri}」: fwd={fwd:.1f}, rev={rev:.1f}, asym={asym:.1f}")
275    
276    # ── 最终快照 ──
277    snap = rt.snapshot_state()
278    print(f"\n{'='*60}")
279    print(f"  最终状态")
280    print(f"{'='*60}")
281    print(f"  nodes: {snap['phi_summary']['count']}")
282    print(f"  edges: {snap['graph']['edge_count']}")
283    print(f"  anchors: {len(rt.state.anchor_nodes)}")
284    print(f"  ability_cores: {len(rt.state.ability_cores)}")
285    print(f"  experience: {len(rt.state.experience_regions.get('experience', set()))}")
286    print(f"  skill_belts: {len(rt.state.skill_belt_candidates)}")
287    print(f"  phi_range: [{snap['phi_summary']['min']:.3f}, {snap['phi_summary']['max']:.3f}]")
288    print(f"  attention: {snap['attention']['used']:.1f}/{snap['attention']['total']:.0f}")
289
290
291if __name__ == '__main__':
292    run_experiment()