Commit 45b0db20 authored by Administrator's avatar Administrator

Update frontend/src/streamManager.js via Son of Anton

parent a313e179
......@@ -4,9 +4,7 @@ const _streams = new Map();
const _listeners = new Map();
let _dispatch = null;
export function setDispatch(dispatch) {
_dispatch = dispatch;
}
export function setDispatch(dispatch) { _dispatch = dispatch; }
export function getStreamData(chatId) {
const s = _streams.get(chatId);
......@@ -14,32 +12,19 @@ export function getStreamData(chatId) {
return { streaming: true, text: s.text, thinking: s.thinking, isThinking: s.isThinking };
}
export function isStreaming(chatId) {
return _streams.has(chatId);
}
export function isStreaming(chatId) { return _streams.has(chatId); }
export function subscribe(chatId, cb) {
if (!_listeners.has(chatId)) _listeners.set(chatId, new Set());
_listeners.get(chatId).add(cb);
return () => {
const s = _listeners.get(chatId);
if (s) { s.delete(cb); if (!s.size) _listeners.delete(chatId); }
};
return () => { const s = _listeners.get(chatId); if (s) { s.delete(cb); if (!s.size) _listeners.delete(chatId); } };
}
function _notify(id) {
const s = _listeners.get(id);
if (s) s.forEach((cb) => cb());
}
function _notify(id) { const s = _listeners.get(id); if (s) s.forEach((cb) => cb()); }
export function abortStream(chatId) {
const s = _streams.get(chatId);
if (s) {
s.abortController.abort();
_streams.delete(chatId);
_notify(chatId);
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false });
}
if (s) { s.abortController.abort(); _streams.delete(chatId); _notify(chatId); if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false }); }
}
export function startStream({ token, chatId, body }) {
......@@ -52,108 +37,39 @@ export function startStream({ token, chatId, body }) {
(async () => {
const s = _streams.get(chatId);
if (!s) return;
let usage = {};
let msgId = "";
let usage = {}, msgId = "";
try {
for await (const evt of streamMessage(token, chatId, body, ac.signal)) {
if (ac.signal.aborted || !_streams.has(chatId)) break;
_handleEvent(chatId, s, evt, (u) => { usage = u; }, (id) => { msgId = id; });
switch (evt.type) {
case "thinking_start": s.isThinking = true; _notify(chatId); break;
case "thinking_delta": s.thinking += evt.content; _notify(chatId); break;
case "thinking_end": s.isThinking = false; _notify(chatId); break;
case "text_delta": s.text += evt.content; _notify(chatId); break;
case "usage": usage = { input_tokens: evt.input_tokens, output_tokens: evt.output_tokens }; break;
case "title_update": if (_dispatch) _dispatch({ type: "UPDATE_CHAT", chat: { id: chatId, title: evt.title } }); break;
case "done": msgId = evt.message_id; break;
case "status": break; // Web search status — ignore in stream
case "error": s.text += `\n\n**Error:** ${evt.message}`; _notify(chatId); break;
}
}
if (!ac.signal.aborted && _dispatch) {
_dispatch({
type: "ADD_MESSAGE", chatId, message: {
id: msgId || `gen-${Date.now()}`, role: "assistant", content: s.text,
thinking_content: s.thinking || null, input_tokens: usage.input_tokens || 0,
output_tokens: usage.output_tokens || 0, created_at: new Date().toISOString(), attachments: [],
}
});
_dispatch({ type: "ADD_MESSAGE", chatId, message: {
id: msgId || `gen-${Date.now()}`, role: "assistant", content: s.text,
thinking_content: s.thinking || null, input_tokens: usage.input_tokens || 0,
output_tokens: usage.output_tokens || 0, created_at: new Date().toISOString(), attachments: [],
}});
}
} catch (err) {
if (!ac.signal.aborted && _dispatch) {
_dispatch({
type: "ADD_MESSAGE", chatId, message: {
id: `err-${Date.now()}`, role: "assistant", content: `**Error:** ${err.message}`,
created_at: new Date().toISOString(), attachments: [],
}
});
_dispatch({ type: "ADD_MESSAGE", chatId, message: {
id: `err-${Date.now()}`, role: "assistant", content: `**Error:** ${err.message}`,
created_at: new Date().toISOString(), attachments: [],
}});
}
} finally {
_streams.delete(chatId);
_notify(chatId);
_streams.delete(chatId); _notify(chatId);
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false });
}
})();
}
/**
* Reconnect to an ongoing background generation via GET /stream endpoint.
*/
export function reconnectStream({ token, chatId }) {
if (_streams.has(chatId)) return;
const ac = new AbortController();
_streams.set(chatId, { text: "", thinking: "", isThinking: false, abortController: ac });
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: true });
_notify(chatId);
(async () => {
const s = _streams.get(chatId);
if (!s) return;
let usage = {};
let msgId = "";
try {
const res = await fetch(`/api/chats/${chatId}/stream`, {
headers: { Authorization: `Bearer ${token}` },
signal: ac.signal,
});
if (!res.ok) throw new Error("Reconnect failed");
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const parts = buffer.split("\n\n");
buffer = parts.pop() || "";
for (const part of parts) {
const line = part.trim();
if (line.startsWith("data: ")) {
try {
const evt = JSON.parse(line.slice(6));
if (ac.signal.aborted || !_streams.has(chatId)) break;
_handleEvent(chatId, s, evt, (u) => { usage = u; }, (id) => { msgId = id; });
} catch { /* skip */ }
}
}
}
if (!ac.signal.aborted && s.text && _dispatch) {
_dispatch({
type: "ADD_MESSAGE", chatId, message: {
id: msgId || `gen-${Date.now()}`, role: "assistant", content: s.text,
thinking_content: s.thinking || null, input_tokens: usage.input_tokens || 0,
output_tokens: usage.output_tokens || 0, created_at: new Date().toISOString(), attachments: [],
}
});
}
} catch { /* reconnect failed, generation may be done */ }
finally {
_streams.delete(chatId);
_notify(chatId);
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false });
}
})();
}
function _handleEvent(chatId, s, evt, setUsage, setMsgId) {
switch (evt.type) {
case "thinking_start": s.isThinking = true; _notify(chatId); break;
case "thinking_delta": s.thinking += evt.content; _notify(chatId); break;
case "thinking_end": s.isThinking = false; _notify(chatId); break;
case "text_delta": s.text += evt.content; _notify(chatId); break;
case "usage": setUsage({ input_tokens: evt.input_tokens, output_tokens: evt.output_tokens }); break;
case "title_update": if (_dispatch) _dispatch({ type: "UPDATE_CHAT", chat: { id: chatId, title: evt.title } }); break;
case "done": setMsgId(evt.message_id); break;
case "error": s.text += `\n\n**Error:** ${evt.message}`; _notify(chatId); break;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment