baa-conductor

git clone 

commit
76aa1aa
parent
7c7e4c5
author
im_wower
date
2026-03-29 04:02:52 +0800 CST
fix: recover chatgpt final-message after aborted sse
6 files changed,  +393, -44
M plugins/baa-firefox/controller.js
+3, -3
 1@@ -5823,7 +5823,7 @@ function observeFinalMessageFromPageSse(data, sender, context = null) {
 2   const observer = platform ? state.finalMessageRelayObservers[platform] : null;
 3   if (!observer) {
 4     try {
 5-      if (data.done === true) {
 6+      if (data.done === true || data.error) {
 7         addLog("debug", `[FM-SSE] ${platform || "?"} 无 observer,跳过`, false);
 8       }
 9     } catch (_) {}
10@@ -5836,10 +5836,10 @@ function observeFinalMessageFromPageSse(data, sender, context = null) {
11   });
12 
13   try {
14-    if (data.done === true) {
15+    if (data.done === true || data.error) {
16       const relevant = typeof FINAL_MESSAGE_HELPERS.isRelevantStreamUrl === "function"
17         ? FINAL_MESSAGE_HELPERS.isRelevantStreamUrl(platform, data.url) : false;
18-      addLog("debug", `[FM-SSE] ${platformLabel(platform)} done url_relevant=${relevant} relay=${relay ? "产出" : "null"} assistant=${relay?.payload?.assistant_message_id || "-"}`, false);
19+      addLog("debug", `[FM-SSE] ${platformLabel(platform)} ${data.done === true ? "done" : "error"} url_relevant=${relevant} relay=${relay ? "产出" : "null"} assistant=${relay?.payload?.assistant_message_id || "-"}`, false);
20     }
21   } catch (_) {}
22 
M plugins/baa-firefox/final-message.js
+67, -24
  1@@ -53,6 +53,17 @@
  2     }
  3   }
  4 
  5+  function extractUrlPathname(url) {
  6+    const raw = trimToNull(url);
  7+    if (!raw) return "";
  8+
  9+    try {
 10+      return (new URL(raw, "https://platform.invalid/").pathname || "/").toLowerCase();
 11+    } catch (_) {
 12+      return raw.toLowerCase().split(/[?#]/u)[0] || "";
 13+    }
 14+  }
 15+
 16   function normalizeMessageText(value) {
 17     if (typeof value !== "string") return null;
 18 
 19@@ -753,9 +764,12 @@
 20 
 21   function isRelevantStreamUrl(platform, url) {
 22     const lower = String(url || "").toLowerCase();
 23+    const pathname = extractUrlPathname(url);
 24 
 25     if (platform === "chatgpt") {
 26-      return lower.includes("/conversation");
 27+      return /^\/conversation\/?$/iu.test(pathname)
 28+        || /^\/(?:backend-api|backend-anon|public-api)\/conversation\/?$/iu.test(pathname)
 29+        || /^\/(?:backend-api|backend-anon|public-api)\/f\/conversation\/?$/iu.test(pathname);
 30     }
 31 
 32     if (platform === "claude") {
 33@@ -798,6 +812,47 @@
 34     return state.activeStream;
 35   }
 36 
 37+  function extractSseCandidateFromText(platform, text, context) {
 38+    if (platform === "chatgpt") {
 39+      return extractChatgptCandidateFromText(text, context);
 40+    }
 41+
 42+    if (platform === "claude") {
 43+      return extractClaudeCandidateFromText(text, context);
 44+    }
 45+
 46+    return extractGeminiCandidateFromText(text, context);
 47+  }
 48+
 49+  function finalizeObservedSseRelay(state, stream, meta = {}) {
 50+    if (!state || !stream) {
 51+      return null;
 52+    }
 53+
 54+    const context = {
 55+      pageUrl: stream.pageUrl,
 56+      reqBody: stream.reqBody,
 57+      url: stream.url
 58+    };
 59+    const finalCandidate = extractSseCandidateFromText(
 60+      state.platform,
 61+      stream.chunks.join("\n\n"),
 62+      context
 63+    );
 64+    const relay = buildRelayEnvelope(
 65+      state.platform,
 66+      mergeCandidates(stream.latestCandidate, finalCandidate),
 67+      meta.observedAt
 68+    );
 69+
 70+    state.activeStream = null;
 71+    if (!relay || hasSeenRelay(state, relay)) {
 72+      return null;
 73+    }
 74+
 75+    return relay;
 76+  }
 77+
 78   function buildRelayEnvelope(platform, candidate, observedAt) {
 79     const rawText = normalizeMessageText(candidate?.rawText);
 80     if (!rawText) return null;
 81@@ -843,11 +898,6 @@
 82       return null;
 83     }
 84 
 85-    if (detail.error) {
 86-      state.activeStream = null;
 87-      return null;
 88-    }
 89-
 90     const stream = ensureActiveStream(state, detail, meta);
 91     const context = {
 92       pageUrl: stream.pageUrl,
 93@@ -871,31 +921,24 @@
 94       }
 95     }
 96 
 97-    if (detail.done !== true) {
 98-      return null;
 99+    if (detail.done === true) {
100+      return finalizeObservedSseRelay(state, stream, meta);
101     }
102 
103-    const fullText = stream.chunks.join("\n\n");
104-    let finalCandidate = null;
105-    if (state.platform === "chatgpt") {
106-      finalCandidate = extractChatgptCandidateFromText(fullText, context);
107-    } else if (state.platform === "claude") {
108-      finalCandidate = extractClaudeCandidateFromText(fullText, context);
109-    } else {
110-      finalCandidate = extractGeminiCandidateFromText(fullText, context);
111+    if (detail.error) {
112+      if (!stream.latestCandidate && stream.chunks.length === 0) {
113+        state.activeStream = null;
114+        return null;
115+      }
116+
117+      return finalizeObservedSseRelay(state, stream, meta);
118     }
119-    const relay = buildRelayEnvelope(
120-      state.platform,
121-      mergeCandidates(stream.latestCandidate, finalCandidate),
122-      meta.observedAt
123-    );
124 
125-    state.activeStream = null;
126-    if (!relay || hasSeenRelay(state, relay)) {
127+    if (detail.done !== true) {
128       return null;
129     }
130 
131-    return relay;
132+    return null;
133   }
134 
135   function observeNetwork(state, detail, meta = {}) {
A plugins/baa-firefox/final-message.test.cjs
+96, -0
 1@@ -0,0 +1,96 @@
 2+const assert = require("node:assert/strict");
 3+const test = require("node:test");
 4+
 5+const {
 6+  createRelayState,
 7+  isRelevantStreamUrl,
 8+  observeSse
 9+} = require("./final-message.js");
10+
11+function buildChatgptChunk({
12+  assistantMessageId = "msg_abort",
13+  conversationId = "conv_abort",
14+  text = "@conductor::status"
15+} = {}) {
16+  return `data: ${JSON.stringify({
17+    conversation_id: conversationId,
18+    message: {
19+      author: {
20+        role: "assistant"
21+      },
22+      content: {
23+        parts: [text]
24+      },
25+      end_turn: true,
26+      id: assistantMessageId
27+    }
28+  })}`;
29+}
30+
31+test("isRelevantStreamUrl only accepts ChatGPT root conversation streams", () => {
32+  assert.equal(
33+    isRelevantStreamUrl("chatgpt", "https://chatgpt.com/backend-api/conversation"),
34+    true
35+  );
36+  assert.equal(
37+    isRelevantStreamUrl("chatgpt", "https://chatgpt.com/backend-api/f/conversation?oai-device-id=test"),
38+    true
39+  );
40+  assert.equal(
41+    isRelevantStreamUrl("chatgpt", "https://chatgpt.com/backend-api/conversation/implicit_message_feedback"),
42+    false
43+  );
44+  assert.equal(
45+    isRelevantStreamUrl("chatgpt", "https://chatgpt.com/backend-api/f/conversation/prepare"),
46+    false
47+  );
48+});
49+
50+test("observeSse recovers a ChatGPT final message when the SSE stream aborts after chunks arrived", () => {
51+  const state = createRelayState("chatgpt");
52+  const meta = {
53+    observedAt: 1743206400000,
54+    pageUrl: "https://chatgpt.com/c/conv_abort"
55+  };
56+  const url = "https://chatgpt.com/backend-api/f/conversation";
57+  const reqBody = JSON.stringify({
58+    conversation_id: "conv_abort"
59+  });
60+
61+  assert.equal(
62+    observeSse(state, {
63+      chunk: buildChatgptChunk(),
64+      reqBody,
65+      url
66+    }, meta),
67+    null
68+  );
69+
70+  const relay = observeSse(state, {
71+    error: "The operation was aborted.",
72+    reqBody,
73+    url
74+  }, meta);
75+
76+  assert.ok(relay);
77+  assert.equal(relay.payload.platform, "chatgpt");
78+  assert.equal(relay.payload.conversation_id, "conv_abort");
79+  assert.equal(relay.payload.assistant_message_id, "msg_abort");
80+  assert.equal(relay.payload.raw_text, "@conductor::status");
81+  assert.equal(relay.payload.observed_at, 1743206400000);
82+  assert.equal(state.activeStream, null);
83+});
84+
85+test("observeSse ignores ChatGPT auxiliary conversation subpaths", () => {
86+  const state = createRelayState("chatgpt");
87+  const relay = observeSse(state, {
88+    chunk: buildChatgptChunk(),
89+    url: "https://chatgpt.com/backend-api/conversation/implicit_message_feedback"
90+  }, {
91+    observedAt: 1743206400000,
92+    pageUrl: "https://chatgpt.com/c/conv_abort"
93+  });
94+
95+  assert.equal(relay, null);
96+  assert.equal(state.activeStream, null);
97+});
M plugins/baa-firefox/page-interceptor.js
+32, -13
 1@@ -174,6 +174,14 @@
 2     return normalized ? normalized : null;
 3   }
 4 
 5+  function describeError(error) {
 6+    if (typeof error === "string") {
 7+      return trimToNull(error) || "unknown_error";
 8+    }
 9+
10+    return trimToNull(error?.message) || String(error || "unknown_error");
11+  }
12+
13   function emit(type, detail, rule = pageRule) {
14     window.dispatchEvent(new CustomEvent(type, {
15       detail: {
16@@ -271,13 +279,27 @@
17       source: "page-interceptor",
18       url
19     }, rule);
20+    let buffer = "";
21+    const decoder = new TextDecoder();
22+    const emitBufferedChunk = () => {
23+      if (!buffer.trim()) return false;
24+
25+      emitSse({
26+        url,
27+        method,
28+        reqBody: requestBody,
29+        chunk: buffer,
30+        ts: Date.now()
31+      }, rule);
32+      buffer = "";
33+      return true;
34+    };
35+
36     try {
37       const clone = response.clone();
38       if (!clone.body) return;
39 
40       const reader = clone.body.getReader();
41-      const decoder = new TextDecoder();
42-      let buffer = "";
43 
44       while (true) {
45         const { done, value } = await reader.read();
46@@ -299,15 +321,8 @@
47         }
48       }
49 
50-      if (buffer.trim()) {
51-        emitSse({
52-          url,
53-          method,
54-          reqBody: requestBody,
55-          chunk: buffer,
56-          ts: Date.now()
57-        }, rule);
58-      }
59+      buffer += decoder.decode();
60+      emitBufferedChunk();
61 
62       const duration = Date.now() - startedAt;
63       try { console.log("[BAA]", "sse_stream_done", method, url.slice(0, 120), "duration=" + duration + "ms"); } catch (_) {}
64@@ -326,12 +341,16 @@
65         duration
66       }, rule);
67     } catch (error) {
68+      buffer += decoder.decode();
69+      emitBufferedChunk();
70+
71       emitSse({
72         url,
73         method,
74         reqBody: requestBody,
75-        error: error.message,
76-        ts: Date.now()
77+        error: describeError(error),
78+        ts: Date.now(),
79+        duration: Date.now() - startedAt
80       }, rule);
81     }
82   }
A plugins/baa-firefox/page-interceptor.test.cjs
+173, -0
  1@@ -0,0 +1,173 @@
  2+const assert = require("node:assert/strict");
  3+const fs = require("node:fs");
  4+const path = require("node:path");
  5+const test = require("node:test");
  6+const vm = require("node:vm");
  7+const { TextDecoder, TextEncoder } = require("node:util");
  8+
  9+const PAGE_INTERCEPTOR_SOURCE = fs.readFileSync(
 10+  path.join(__dirname, "page-interceptor.js"),
 11+  "utf8"
 12+);
 13+
 14+function createAbortingSseResponse(chunks, errorMessage = "The operation was aborted.") {
 15+  const headers = new Headers({
 16+    "content-type": "text/event-stream"
 17+  });
 18+  const encoder = new TextEncoder();
 19+
 20+  return {
 21+    headers,
 22+    status: 200,
 23+    clone() {
 24+      let index = 0;
 25+      return {
 26+        body: {
 27+          getReader() {
 28+            return {
 29+              async read() {
 30+                if (index < chunks.length) {
 31+                  return {
 32+                    done: false,
 33+                    value: encoder.encode(chunks[index++])
 34+                  };
 35+                }
 36+
 37+                throw new Error(errorMessage);
 38+              }
 39+            };
 40+          }
 41+        }
 42+      };
 43+    }
 44+  };
 45+}
 46+
 47+function createHarness(fetchImpl) {
 48+  const events = [];
 49+  const listeners = new Map();
 50+  const location = {
 51+    hostname: "chatgpt.com",
 52+    href: "https://chatgpt.com/c/conv_abort",
 53+    origin: "https://chatgpt.com"
 54+  };
 55+
 56+  function addEventListener(type, listener) {
 57+    if (!listeners.has(type)) {
 58+      listeners.set(type, new Set());
 59+    }
 60+    listeners.get(type).add(listener);
 61+  }
 62+
 63+  function removeEventListener(type, listener) {
 64+    listeners.get(type)?.delete(listener);
 65+  }
 66+
 67+  const window = {
 68+    __baaFirefoxIntercepted__: undefined,
 69+    addEventListener,
 70+    dispatchEvent(event) {
 71+      events.push({
 72+        detail: event.detail,
 73+        type: event.type
 74+      });
 75+
 76+      for (const listener of listeners.get(event.type) || []) {
 77+        listener.call(window, event);
 78+      }
 79+
 80+      return true;
 81+    },
 82+    fetch: fetchImpl,
 83+    removeEventListener
 84+  };
 85+  window.location = location;
 86+
 87+  function CustomEvent(type, init = {}) {
 88+    this.detail = init.detail;
 89+    this.type = type;
 90+  }
 91+
 92+  function FakeXMLHttpRequest() {}
 93+  FakeXMLHttpRequest.prototype.addEventListener = function addEventListenerNoop() {};
 94+  FakeXMLHttpRequest.prototype.getAllResponseHeaders = function getAllResponseHeaders() {
 95+    return "";
 96+  };
 97+  FakeXMLHttpRequest.prototype.getResponseHeader = function getResponseHeader() {
 98+    return "";
 99+  };
100+  FakeXMLHttpRequest.prototype.open = function open() {};
101+  FakeXMLHttpRequest.prototype.send = function send() {};
102+  FakeXMLHttpRequest.prototype.setRequestHeader = function setRequestHeader() {};
103+
104+  const context = vm.createContext({
105+    AbortController,
106+    ArrayBuffer,
107+    Blob,
108+    CustomEvent,
109+    FormData,
110+    Headers,
111+    Request,
112+    Response,
113+    TextDecoder,
114+    URL,
115+    URLSearchParams,
116+    XMLHttpRequest: FakeXMLHttpRequest,
117+    clearTimeout,
118+    console: {
119+      log() {}
120+    },
121+    location,
122+    setTimeout,
123+    window
124+  });
125+
126+  vm.runInContext(PAGE_INTERCEPTOR_SOURCE, context, {
127+    filename: "page-interceptor.js"
128+  });
129+
130+  return {
131+    events,
132+    window
133+  };
134+}
135+
136+async function waitFor(predicate, timeoutMs = 200) {
137+  const deadline = Date.now() + timeoutMs;
138+
139+  while (Date.now() < deadline) {
140+    if (predicate()) {
141+      return;
142+    }
143+
144+    await new Promise((resolve) => setTimeout(resolve, 0));
145+  }
146+
147+  throw new Error("Timed out waiting for page-interceptor events.");
148+}
149+
150+test("page-interceptor flushes the buffered SSE tail before emitting the abort event", async () => {
151+  const response = createAbortingSseResponse([
152+    'data: {"message":{"id":"msg_abort","author":{"role":"assistant"},"content":{"parts":["partial"]}}}\n\n',
153+    'data: {"message":{"author":{"role":"assistant"},"content":{"parts":["tail"]}}'
154+  ]);
155+  const { events, window } = createHarness(async () => response);
156+
157+  await window.fetch("https://chatgpt.com/backend-api/f/conversation", {
158+    body: JSON.stringify({
159+      conversation_id: "conv_abort"
160+    }),
161+    method: "POST"
162+  });
163+
164+  await waitFor(() => events.some((event) => event.type === "__baa_sse__" && event.detail?.error));
165+
166+  const sseEvents = events.filter((event) => event.type === "__baa_sse__");
167+  const errorEvent = sseEvents.find((event) => event.detail?.error);
168+
169+  assert.ok(
170+    sseEvents.some((event) => event.detail?.chunk === 'data: {"message":{"author":{"role":"assistant"},"content":{"parts":["tail"]}}'),
171+    "expected the unflushed tail chunk to be emitted before the error event"
172+  );
173+  assert.equal(errorEvent?.detail?.error, "The operation was aborted.");
174+});
M tasks/T-BUG-029.md
+22, -4
 1@@ -2,7 +2,7 @@
 2 
 3 ## 状态
 4 
 5-- 当前状态:`待开始`
 6+- 当前状态:`已完成`
 7 - 规模预估:`M`
 8 - 依赖任务:`T-S046`(需要日志辅助排查 ChatGPT SSE 原始数据)
 9 - 建议执行者:`Claude`(需要理解 ChatGPT SSE 流结构和 final-message 提取逻辑)
10@@ -164,21 +164,39 @@ if (platform === "chatgpt") {
11 
12 ### 开始执行
13 
14-- 执行者:
15-- 开始时间:
16+- 执行者:`Codex (GPT-5)`
17+- 开始时间:`2026-03-29 03:45:00 CST(约)`
18 - 状态变更:`待开始` → `进行中`
19 
20 ### 完成摘要
21 
22-- 完成时间:
23+- 完成时间:`2026-03-29 04:02:01 CST`
24 - 状态变更:`进行中` → `已完成`
25 - 修改了哪些文件:
26+  - `plugins/baa-firefox/final-message.js`
27+  - `plugins/baa-firefox/page-interceptor.js`
28+  - `plugins/baa-firefox/controller.js`
29+  - `plugins/baa-firefox/final-message.test.cjs`
30+  - `plugins/baa-firefox/page-interceptor.test.cjs`
31+  - `tasks/T-BUG-029.md`
32 - 核心实现思路:
33+  - 收紧 ChatGPT final-message 观察 URL,只接受根对话流 `/conversation`、`/backend-api/conversation`、`/backend-api/f/conversation`,排除 `implicit_message_feedback`、`prepare` 等辅助流。
34+  - `observeSse` 在 `detail.error` 终止时不再直接丢弃已缓存 chunks,而是复用正常收尾逻辑,从已有 SSE 文本中提取 candidate 并生成 relay。
35+  - `page-interceptor` 的 `streamSse` 在 abort/error 收尾前先 flush 尚未按 `\n\n` 切出的尾部 buffer,再发出带 `error` 的 `__baa_sse__` 事件,避免最后一段数据静默丢失。
36+  - `controller.js` 的 `[FM-SSE]` 调试日志扩展到 `done` 和 `error` 两种收尾,便于确认 abort 修复后的 relay 产出。
37 - 跑了哪些测试:
38+  - `node --test plugins/baa-firefox/final-message.test.cjs`
39+  - `node --test plugins/baa-firefox/page-interceptor.test.cjs`
40+  - `git diff --check`
41+  - `pnpm build`(失败:worktree 缺少依赖,`tsc` 不存在,见下方问题记录)
42 
43 ### 执行过程中遇到的问题
44 
45 > 记录执行过程中遇到的阻塞、环境问题、临时绕过方案等。合并时由合并者判断是否需要修复或建新任务。
46 
47+- worktree 当前未安装依赖,执行 `pnpm build` 时多个 workspace 包直接报 `Command "tsc" not found`,属于环境缺失而非本次改动引入的编译错误。
48+
49 ### 剩余风险
50 
51+- 尚未在真实 ChatGPT 页面做一轮 `@conductor::status` 闭环验证,因此 `logs/baa-plugin/` 与 `logs/baa-ingest/` 的最终验收还需要人工浏览器联调确认。
52+- Claude 与 Gemini 仅做了代码路径上的隔离确认,未在浏览器端做回归实测。