Commit bde969b1 authored by Mahmoud Aglan's avatar Mahmoud Aglan

jj

parent 18d64a46
......@@ -49,12 +49,13 @@ export const deleteChat = (token, chatId) =>
export const getMessages = (token, chatId) =>
request("GET", `/chats/${chatId}/messages`, token);
/* ── Streaming message ─────────────────────── */
export async function* streamMessage(token, chatId, body) {
/* ── Streaming message (accepts AbortSignal) ─ */
export async function* streamMessage(token, chatId, body, signal) {
const res = await fetch(`${BASE}/chats/${chatId}/messages`, {
method: "POST",
headers: headers(token),
body: JSON.stringify(body),
signal,
});
if (!res.ok) {
......
This diff is collapsed.
This diff is collapsed.
/**
* Global state via React Context + useReducer
* Global state via React Context + useReducer.
*
* Holds chat messages and streaming flags so they persist
* across chat switches (background streams keep running).
*/
import React, { createContext, useContext, useReducer, useEffect } from "react";
import { setDispatch } from "./streamManager";
const AppContext = createContext();
......@@ -12,6 +16,8 @@ const initialState = {
chats: [],
activeChatId: null,
sidebarOpen: true,
chatMessages: {}, // { [chatId]: Message[] }
activeStreams: {}, // { [chatId]: true } — which chats are currently streaming
};
function reducer(state, action) {
......@@ -24,7 +30,13 @@ function reducer(state, action) {
case "LOGOUT":
localStorage.removeItem("soa_token");
localStorage.removeItem("soa_user");
return { ...initialState, token: null, user: null };
return {
...initialState,
token: null,
user: null,
chatMessages: {},
activeStreams: {},
};
case "SET_CHATS":
return { ...state, chats: action.chats };
......@@ -45,9 +57,15 @@ function reducer(state, action) {
case "REMOVE_CHAT": {
const filtered = state.chats.filter((c) => c.id !== action.chatId);
const newMessages = { ...state.chatMessages };
delete newMessages[action.chatId];
const newStreams = { ...state.activeStreams };
delete newStreams[action.chatId];
return {
...state,
chats: filtered,
chatMessages: newMessages,
activeStreams: newStreams,
activeChatId:
state.activeChatId === action.chatId
? filtered[0]?.id || null
......@@ -61,6 +79,41 @@ function reducer(state, action) {
case "TOGGLE_SIDEBAR":
return { ...state, sidebarOpen: !state.sidebarOpen };
// ── Per-chat message management ──────────────
case "SET_MESSAGES":
return {
...state,
chatMessages: {
...state.chatMessages,
[action.chatId]: action.messages,
},
};
case "ADD_MESSAGE":
return {
...state,
chatMessages: {
...state.chatMessages,
[action.chatId]: [
...(state.chatMessages[action.chatId] || []),
action.message,
],
},
};
// ── Background streaming flags ───────────────
case "SET_STREAMING": {
if (action.streaming) {
return {
...state,
activeStreams: { ...state.activeStreams, [action.chatId]: true },
};
}
const next = { ...state.activeStreams };
delete next[action.chatId];
return { ...state, activeStreams: next };
}
default:
return state;
}
......@@ -68,6 +121,12 @@ function reducer(state, action) {
export function AppProvider({ children }) {
const [state, dispatch] = useReducer(reducer, initialState);
// Give the background stream manager access to dispatch
useEffect(() => {
setDispatch(dispatch);
}, [dispatch]);
return (
<AppContext.Provider value={{ state, dispatch }}>
{children}
......
/**
* Son of Anton — Background Stream Manager
*
* Runs AI streams outside React component lifecycle.
* Switching chats does NOT abort streams. Multiple chats
* can stream simultaneously. Components subscribe to
* per-chat updates via subscribe().
*/
import { streamMessage } from "./api";
// chatId -> { text, thinking, isThinking, abortController }
const _streams = new Map();
// chatId -> Set<() => void>
const _listeners = new Map();
// Store dispatch reference, set once from AppProvider
let _dispatch = null;
export function setDispatch(dispatch) {
_dispatch = dispatch;
}
/** Read current stream data for a chat (non-reactive, call from inside subscriber) */
export function getStreamData(chatId) {
const s = _streams.get(chatId);
if (!s) return { streaming: false, text: "", thinking: "", isThinking: false };
return {
streaming: true,
text: s.text,
thinking: s.thinking,
isThinking: s.isThinking,
};
}
/** Is this chat currently streaming? */
export function isStreaming(chatId) {
return _streams.has(chatId);
}
/** Subscribe to stream data changes for a specific chat. Returns unsubscribe fn. */
export function subscribe(chatId, callback) {
if (!_listeners.has(chatId)) _listeners.set(chatId, new Set());
_listeners.get(chatId).add(callback);
return () => {
const set = _listeners.get(chatId);
if (set) {
set.delete(callback);
if (set.size === 0) _listeners.delete(chatId);
}
};
}
function _notify(chatId) {
const set = _listeners.get(chatId);
if (set) set.forEach((cb) => cb());
}
/** Abort a running stream for a chat */
export function abortStream(chatId) {
const s = _streams.get(chatId);
if (s) {
s.abortController.abort();
_streams.delete(chatId);
_notify(chatId);
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false });
}
}
/**
* Start a background stream for a chat.
* Does nothing if that chat is already streaming.
*
* @param {object} opts
* @param {string} opts.token - JWT
* @param {string} opts.chatId - chat UUID
* @param {object} opts.body - SendMessageBody
*/
export function startStream({ token, chatId, body }) {
if (_streams.has(chatId)) return;
const ac = new AbortController();
_streams.set(chatId, {
text: "",
thinking: "",
isThinking: false,
abortController: ac,
});
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: true });
_notify(chatId);
// Fire-and-forget async IIFE — runs entirely in the background
(async () => {
const s = _streams.get(chatId);
if (!s) return;
let usage = {};
let msgId = "";
try {
for await (const evt of streamMessage(token, chatId, body, ac.signal)) {
if (ac.signal.aborted) break;
if (!_streams.has(chatId)) break;
switch (evt.type) {
case "thinking_start":
s.isThinking = true;
_notify(chatId);
break;
case "thinking_delta":
s.thinking += evt.content;
_notify(chatId);
break;
case "thinking_end":
s.isThinking = false;
_notify(chatId);
break;
case "text_delta":
s.text += evt.content;
_notify(chatId);
break;
case "usage":
usage = {
input_tokens: evt.input_tokens,
output_tokens: evt.output_tokens,
};
break;
case "title_update":
if (_dispatch)
_dispatch({
type: "UPDATE_CHAT",
chat: { id: chatId, title: evt.title },
});
break;
case "done":
msgId = evt.message_id;
break;
case "error":
s.text += `\n\n**Error:** ${evt.message}`;
_notify(chatId);
break;
}
}
// Stream finished normally — persist the final assistant message
if (!ac.signal.aborted && _dispatch) {
const assistantMsg = {
id: msgId || `gen-${Date.now()}`,
role: "assistant",
content: s.text,
thinking_content: s.thinking || null,
input_tokens: usage.input_tokens || 0,
output_tokens: usage.output_tokens || 0,
created_at: new Date().toISOString(),
};
_dispatch({ type: "ADD_MESSAGE", chatId, message: assistantMsg });
}
} catch (err) {
// Only surface errors that aren't deliberate aborts
if (!ac.signal.aborted && _dispatch) {
const errMsg = {
id: `err-${Date.now()}`,
role: "assistant",
content: `**Error:** ${err.message}`,
created_at: new Date().toISOString(),
};
_dispatch({ type: "ADD_MESSAGE", chatId, message: errMsg });
}
} finally {
_streams.delete(chatId);
_notify(chatId);
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false });
}
})();
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment