Commit 25a6e3a4 authored by Mahmoud Aglan's avatar Mahmoud Aglan

ykrftjdgdhmdhdgh ety d jd j

parent d55ca0c4
# === Node ===
frontend/node_modules
frontend/dist
frontend/.vite
# === Python ===
**/__pycache__
**/*.pyc
**/*.pyo
*.egg-info
# === Git ===
.git
.gitignore
# === Dev files ===
.env
.env.local
*.md
create-project.ps1
main.py
**/.DS_Store
# === IDE ===
.idea
.vscode
*.swp
*.swo
......@@ -4,10 +4,23 @@
FROM node:20-alpine AS frontend-build
WORKDIR /build/frontend
# Nuke NODE_ENV — CapRover/Docker can inject production
# which prevents devDependencies (vite, tailwind) from installing
ENV NODE_ENV=
COPY frontend/package.json frontend/package-lock.json* ./
RUN npm install --legacy-peer-deps
# Force ALL deps including dev
RUN npm install --legacy-peer-deps --include=dev && \
echo "=== vite check ===" && \
npx vite --version
COPY frontend/ ./
RUN npm run build
RUN NODE_ENV=production npx vite build && \
echo "=== dist ===" && \
ls -la dist/
# ============================================
# Stage 2: Python Backend + Serve Frontend
......@@ -15,9 +28,9 @@ RUN npm run build
FROM python:3.11-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
build-essential \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
......@@ -28,11 +41,12 @@ COPY backend/ ./backend/
COPY --from=frontend-build /build/frontend/dist ./frontend/dist
# Warm up the ChromaDB embedding model so first request is fast
RUN echo "=== Frontend ===" && ls -la frontend/dist/ && \
echo "=== Assets ===" && ls -la frontend/dist/assets/ || true
COPY warmup.py /tmp/warmup.py
RUN python /tmp/warmup.py && rm /tmp/warmup.py
# Create persistent data directories
RUN mkdir -p /data/chromadb /data/uploads /data/uploads/chat_attachments
ENV PYTHONUNBUFFERED=1
......
This diff is collapsed.
......@@ -23,10 +23,15 @@ DATABASE_URL: str = os.getenv("DATABASE_URL", "sqlite:////data/sonofanton.db")
CHROMADB_PATH: str = os.getenv("CHROMADB_PATH", "/data/chromadb")
UPLOAD_PATH: str = os.getenv("UPLOAD_PATH", "/data/uploads")
ATTACHMENT_PATH: str = os.getenv("ATTACHMENT_PATH", "/data/uploads/chat_attachments")
DEFAULT_QUOTA: int = int(os.getenv("DEFAULT_QUOTA", "2000000"))
MAX_UPLOAD_BYTES: int = int(os.getenv("MAX_UPLOAD_MB", "50")) * 1024 * 1024
MAX_ATTACHMENT_BYTES: int = int(os.getenv("MAX_ATTACHMENT_MB", "25")) * 1024 * 1024
MAX_IMAGE_DIMENSION: int = 1568
MAX_VIDEO_FRAMES: int = 6
BEDROCK_ENDPOINT: str = (
f"https://bedrock-runtime.{AWS_REGION}.amazonaws.com"
)
\ No newline at end of file
)
......@@ -18,37 +18,44 @@ from backend.routes.chat_routes import router as chat_router
from backend.routes.admin_routes import router as admin_router
from backend.routes.knowledge_routes import router as knowledge_router
from backend.routes.files_routes import router as files_router
from backend.routes.attachment_routes import router as attachment_router
from backend.services.bedrock_service import close_http_client
def _run_migrations():
"""Add new columns to existing tables if they're missing (lightweight migration)."""
"""Add new columns/tables to existing DB if they're missing."""
from sqlalchemy import inspect, text
try:
inspector = inspect(engine)
if "chats" in inspector.get_table_names():
existing_tables = inspector.get_table_names()
if "chats" in existing_tables:
columns = {c["name"] for c in inspector.get_columns("chats")}
with engine.connect() as conn:
if "max_tokens" not in columns:
conn.execute(text("ALTER TABLE chats ADD COLUMN max_tokens INTEGER DEFAULT 4096"))
print(" Added chats.max_tokens column")
print(" Added chats.max_tokens column")
if "reasoning_budget" not in columns:
conn.execute(text("ALTER TABLE chats ADD COLUMN reasoning_budget INTEGER DEFAULT 0"))
print(" Added chats.reasoning_budget column")
print(" Added chats.reasoning_budget column")
conn.commit()
if "chat_attachments" not in existing_tables:
from backend.models import ChatAttachment
ChatAttachment.__table__.create(bind=engine, checkfirst=True)
print(" Created chat_attachments table")
except Exception as e:
print(f" ⚠️ Migration note: {e}")
print(f" Migration note: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- Startup ---
Base.metadata.create_all(bind=engine)
_run_migrations()
seed_superadmin()
print("🔥 Son of Anton is online.")
print("Son of Anton is online.")
yield
# --- Shutdown ---
await close_http_client()
print("Son of Anton shutting down.")
......@@ -56,7 +63,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="Son of Anton",
description="Avatar of All Elements of Code",
version="1.0.0",
version="2.0.0",
lifespan=lifespan,
)
......@@ -68,14 +75,13 @@ app.add_middleware(
allow_headers=["*"],
)
# ── API Routes ────────────────────────────────────
app.include_router(auth_router, prefix="/api/auth", tags=["Auth"])
app.include_router(chat_router, prefix="/api/chats", tags=["Chats"])
app.include_router(admin_router, prefix="/api/admin", tags=["Admin"])
app.include_router(knowledge_router, prefix="/api/knowledge", tags=["Knowledge"])
app.include_router(files_router, prefix="/api/files", tags=["Files"])
app.include_router(attachment_router, prefix="/api", tags=["Attachments"])
# ── Serve Frontend ────────────────────────────────
FRONTEND_DIR = Path(__file__).parent.parent / "frontend" / "dist"
if (FRONTEND_DIR / "assets").exists():
......@@ -96,4 +102,4 @@ async def serve_frontend(full_path: str):
index = FRONTEND_DIR / "index.html"
if index.is_file():
return FileResponse(str(index))
return {"message": "Son of Anton API is running. Frontend not built."}
\ No newline at end of file
return {"message": "Son of Anton API is running. Frontend not built."}
......@@ -59,6 +59,10 @@ class Chat(Base):
"Message", back_populates="chat",
cascade="all,delete-orphan", order_by="Message.created_at",
)
attachments = relationship(
"ChatAttachment", back_populates="chat",
cascade="all,delete-orphan",
)
class Message(Base):
......@@ -74,6 +78,26 @@ class Message(Base):
created_at = Column(DateTime, default=datetime.utcnow)
chat = relationship("Chat", back_populates="messages")
attachments = relationship("ChatAttachment", back_populates="message")
class ChatAttachment(Base):
__tablename__ = "chat_attachments"
id = Column(String(36), primary_key=True, default=new_id)
chat_id = Column(String(36), ForeignKey("chats.id", ondelete="CASCADE"), nullable=False)
message_id = Column(String(36), ForeignKey("messages.id", ondelete="SET NULL"), nullable=True)
filename = Column(String(200), nullable=False)
original_filename = Column(String(200), nullable=False)
mime_type = Column(String(100), nullable=False)
file_type = Column(String(20), nullable=False)
file_size = Column(Integer, default=0)
storage_path = Column(String(500), nullable=False)
text_extract = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
chat = relationship("Chat", back_populates="attachments")
message = relationship("Message", back_populates="attachments")
class KnowledgeBase(Base):
......@@ -99,4 +123,4 @@ class KnowledgeDocument(Base):
filename = Column(String(200), nullable=False)
file_size = Column(Integer, default=0)
chunk_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
\ No newline at end of file
created_at = Column(DateTime, default=datetime.utcnow)
......@@ -3,19 +3,37 @@ Chat attachment upload, serve, and delete routes.
"""
import os
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query, Request
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import User, Chat, ChatAttachment
from backend.auth import get_current_user
from backend.auth import get_current_user, decode_token
from backend.services import attachment_service
from backend.config import MAX_ATTACHMENT_BYTES
router = APIRouter()
def _get_user_flexible(request: Request, db: Session, token_param: Optional[str] = None) -> User:
raw_token = None
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer "):
raw_token = auth_header[7:]
if not raw_token and token_param:
raw_token = token_param
if not raw_token:
raise HTTPException(401, "Authentication required")
payload = decode_token(raw_token)
user = db.query(User).filter(User.id == payload["sub"]).first()
if not user or not user.is_active:
raise HTTPException(401, "User not found or inactive")
return user
@router.post("/chats/{chat_id}/attachments")
async def upload_attachments(
chat_id: str,
......@@ -23,7 +41,6 @@ async def upload_attachments(
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Upload one or more files as chat attachments. Returns attachment metadata."""
chat = db.query(Chat).filter(Chat.id == chat_id, Chat.user_id == user.id).first()
if not chat:
raise HTTPException(404, "Chat not found")
......@@ -34,21 +51,16 @@ async def upload_attachments(
try:
content = await file.read()
if len(content) > MAX_ATTACHMENT_BYTES:
results.append({
"error": f"File too large: {filename} ({len(content) // 1024 // 1024}MB). Max {MAX_ATTACHMENT_BYTES // 1024 // 1024}MB.",
})
results.append({"error": f"Too large: {filename}"})
continue
meta = attachment_service.save_attachment(
chat_id=chat_id,
filename=filename,
content=content,
content_type=file.content_type,
chat_id=chat_id, filename=filename,
content=content, content_type=file.content_type,
)
att = ChatAttachment(
id=meta["id"],
chat_id=chat_id,
id=meta["id"], chat_id=chat_id,
filename=meta["filename"],
original_filename=meta["original_filename"],
mime_type=meta["mime_type"],
......@@ -60,11 +72,9 @@ async def upload_attachments(
db.add(att)
db.commit()
db.refresh(att)
results.append(_att_dict(att))
except Exception as e:
results.append({"error": f"Failed to upload {filename}: {str(e)}"})
results.append({"error": f"Failed: {filename}: {str(e)}"})
return {"attachments": results}
......@@ -72,26 +82,20 @@ async def upload_attachments(
@router.get("/attachments/{attachment_id}/file")
def serve_attachment(
attachment_id: str,
user: User = Depends(get_current_user),
request: Request,
token: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Serve an attachment file. Validates user owns the chat."""
user = _get_user_flexible(request, db, token)
att = db.query(ChatAttachment).filter(ChatAttachment.id == attachment_id).first()
if not att:
raise HTTPException(404, "Attachment not found")
chat = db.query(Chat).filter(Chat.id == att.chat_id).first()
if not chat or (chat.user_id != user.id and user.role != "superadmin"):
raise HTTPException(403, "Access denied")
if not os.path.exists(att.storage_path):
raise HTTPException(404, "File not found on disk")
return FileResponse(
att.storage_path,
media_type=att.mime_type,
filename=att.original_filename,
)
return FileResponse(att.storage_path, media_type=att.mime_type, filename=att.original_filename)
@router.delete("/attachments/{attachment_id}")
......@@ -100,30 +104,22 @@ def delete_attachment(
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Delete a single attachment."""
att = db.query(ChatAttachment).filter(ChatAttachment.id == attachment_id).first()
if not att:
raise HTTPException(404)
chat = db.query(Chat).filter(Chat.id == att.chat_id).first()
if not chat or (chat.user_id != user.id and user.role != "superadmin"):
raise HTTPException(403)
attachment_service.delete_attachment_file(att.storage_path)
db.delete(att)
db.commit()
return {"ok": True}
def _att_dict(att: ChatAttachment) -> dict:
def _att_dict(att):
return {
"id": att.id,
"chat_id": att.chat_id,
"message_id": att.message_id,
"filename": att.filename,
"original_filename": att.original_filename,
"mime_type": att.mime_type,
"file_type": att.file_type,
"file_size": att.file_size,
"created_at": str(att.created_at),
}
\ No newline at end of file
"id": att.id, "chat_id": att.chat_id, "message_id": att.message_id,
"filename": att.filename, "original_filename": att.original_filename,
"mime_type": att.mime_type, "file_type": att.file_type,
"file_size": att.file_size, "created_at": str(att.created_at),
}
This diff is collapsed.
This diff is collapsed.
"""
Build the `messages` list for the Bedrock/Anthropic API from chat history.
Keeps the most recent messages that fit within a character budget
(rough proxy for tokens — 1 token ≈ 4 chars).
Limits the total number of DB rows loaded to prevent hangs on very long chats.
Build the messages list for the Bedrock/Anthropic API from chat history.
"""
from sqlalchemy.orm import Session
from backend.models import Chat, Message
# ~100 000 tokens budget → ~400 000 characters
MAX_CONTEXT_CHARS = 400_000
# Hard cap: never load more than this many messages from the DB
MAX_MESSAGES = 80
def build_messages(chat: Chat, db: Session) -> list[dict]:
"""
Return a list of {"role": ..., "content": ...} ready for the API.
Messages are oldest-first (chronological).
Only the most recent MAX_MESSAGES are considered, then trimmed by char budget.
"""
# Fetch the most recent N messages (descending) then reverse to chronological
rows: list[Message] = (
db.query(Message)
.filter(Message.chat_id == chat.id)
......@@ -36,7 +22,6 @@ def build_messages(chat: Chat, db: Session) -> list[dict]:
if not rows:
return []
# --- trim from the oldest to fit character budget ---
total_chars = sum(len(m.content or "") for m in rows)
idx = 0
while total_chars > MAX_CONTEXT_CHARS and idx < len(rows) - 2:
......@@ -45,26 +30,20 @@ def build_messages(chat: Chat, db: Session) -> list[dict]:
trimmed = rows[idx:]
# Anthropic requires the first message to be role=user.
# If trimming left an assistant message at the front, skip it.
while trimmed and trimmed[0].role != "user":
trimmed = trimmed[1:]
# Build the final list — collapse consecutive same-role messages
result: list[dict] = []
for m in trimmed:
content = m.content or ""
if not content.strip():
continue
role = m.role
if role not in ("user", "assistant"):
continue
if result and result[-1]["role"] == role:
# Merge into previous
result[-1]["content"] += "\n" + content
else:
result.append({"role": role, "content": content})
return result
\ No newline at end of file
return result
This diff is collapsed.
This diff is collapsed.
......@@ -6,19 +6,21 @@ function headers(token) {
return h;
}
function authHeader(token) {
return token ? { Authorization: `Bearer ${token}` } : {};
}
async function request(method, path, token, body) {
const opts = { method, headers: headers(token) };
if (body) opts.body = JSON.stringify(body);
const res = await fetch(`${BASE}${path}`, opts);
if (!res.ok) {
const err = await res.json().catch(() => ({ detail: res.statusText }));
throw new Error(err.detail || `Request failed: ${res.status}`);
throw new Error(err.detail || err.message || "Request failed");
}
if (res.status === 204) return null;
return res.json();
}
/* ── Auth ─────────────────────────────────── */
export const login = (username, password) =>
request("POST", "/auth/login", null, { username, password });
......@@ -27,12 +29,9 @@ export const register = (username, email, password) =>
export const getMe = (token) => request("GET", "/auth/me", token);
/* ── Chats ─────────────────────────────────── */
export const listChats = (token) =>
request("GET", "/chats", token);
export const listChats = (token) => request("GET", "/chats", token);
export const createChat = (token, data = {}) =>
request("POST", "/chats", token, data);
export const createChat = (token, data = {}) => request("POST", "/chats", token, data);
export const updateChat = (token, chatId, data) =>
request("PUT", `/chats/${chatId}`, token, data);
......@@ -46,88 +45,57 @@ export const deleteChat = (token, chatId) =>
export const getMessages = (token, chatId) =>
request("GET", `/chats/${chatId}/messages`, token);
/* ── Streaming message (accepts AbortSignal) ─ */
export async function* streamMessage(token, chatId, body, signal) {
const res = await fetch(`${BASE}/chats/${chatId}/messages`, {
method: "POST",
headers: headers(token),
body: JSON.stringify(body),
signal,
method: "POST", headers: headers(token),
body: JSON.stringify(body), signal,
});
if (!res.ok) {
const err = await res.json().catch(() => ({ detail: res.statusText }));
throw new Error(err.detail || "Stream failed");
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const parts = buffer.split("\n\n");
buffer = parts.pop() || "";
for (const part of parts) {
const line = part.trim();
if (line.startsWith("data: ")) {
try {
yield JSON.parse(line.slice(6));
} catch {
/* skip malformed */
}
try { yield JSON.parse(line.slice(6)); } catch { }
}
}
}
// flush
if (buffer.trim().startsWith("data: ")) {
try {
yield JSON.parse(buffer.trim().slice(6));
} catch {
/* skip */
}
try { yield JSON.parse(buffer.trim().slice(6)); } catch { }
}
}
/* ── File Uploads (Attachments) ───────────── */
export async function uploadAttachments(token, chatId, files) {
const formData = new FormData();
for (const file of files) {
formData.append("files", file);
}
const form = new FormData();
for (const file of files) form.append("files", file);
const res = await fetch(`${BASE}/chats/${chatId}/attachments`, {
method: "POST",
headers: {
Authorization: `Bearer ${token}`,
// Do NOT set Content-Type — browser sets it with boundary for multipart
},
body: formData,
method: "POST", headers: authHeader(token), body: form,
});
if (!res.ok) {
const err = await res.json().catch(() => ({ detail: res.statusText }));
const err = await res.json().catch(() => ({}));
throw new Error(err.detail || "Upload failed");
}
return res.json();
}
export const listAttachments = (token, chatId) =>
request("GET", `/chats/${chatId}/attachments`, token);
export function getAttachmentPreviewUrl(chatId, attachmentId) {
return `${BASE}/chats/${chatId}/attachments/${attachmentId}/preview`;
export function getAttachmentUrl(attachmentId) {
return `${BASE}/attachments/${attachmentId}/file`;
}
/* ── Knowledge Bases ───────────────────────── */
export const listKnowledgeBases = (token) =>
request("GET", "/knowledge", token);
export const deleteAttachment = (token, attachmentId) =>
request("DELETE", `/attachments/${attachmentId}`, token);
export const listKnowledgeBases = (token) => request("GET", "/knowledge", token);
export const createKnowledgeBase = (token, name, description = "") =>
request("POST", "/knowledge", token, { name, description });
......@@ -139,50 +107,53 @@ export const deleteKnowledgeBase = (token, kbId) =>
request("DELETE", `/knowledge/${kbId}`, token);
export async function uploadDocuments(token, kbId, files) {
const formData = new FormData();
for (const file of files) {
formData.append("files", file);
}
const res = await fetch(`${BASE}/knowledge/${kbId}/documents`, {
method: "POST",
headers: { Authorization: `Bearer ${token}` },
body: formData,
const form = new FormData();
for (const file of files) form.append("files", file);
const res = await fetch(`${BASE}/knowledge/${kbId}/upload`, {
method: "POST", headers: authHeader(token), body: form,
});
if (!res.ok) {
const err = await res.json().catch(() => ({ detail: res.statusText }));
const err = await res.json().catch(() => ({}));
throw new Error(err.detail || "Upload failed");
}
return res.json();
}
export const deleteDocument = (token, kbId, docId) =>
request("DELETE", `/knowledge/${kbId}/documents/${docId}`, token);
export const uploadDocument = (token, kbId, file) =>
uploadDocuments(token, kbId, [file]);
export const adminStats = (token) => request("GET", "/admin/stats", token);
/* ── Admin ─────────────────────────────────── */
export const adminListUsers = (token) =>
request("GET", "/admin/users", token);
export const adminListUsers = (token) => request("GET", "/admin/users", token);
export const adminCreateUser = (token, data) =>
request("POST", "/admin/users", token, data);
export const adminUpdateUser = (token, userId, data) =>
request("PUT", `/admin/users/${userId}`, token, data);
/* ── Download Zip ──────────────────────────── */
export async function downloadZip(token, content) {
const res = await fetch(`${BASE}/download-zip`, {
method: "POST",
headers: headers(token),
body: JSON.stringify({ content }),
});
export const adminDeleteUser = (token, userId) =>
request("DELETE", `/admin/users/${userId}`, token);
if (!res.ok) throw new Error("Download failed");
export const adminListChats = (token) => request("GET", "/admin/chats", token);
const blob = await res.blob();
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = "code-files.zip";
a.click();
URL.revokeObjectURL(url);
}
\ No newline at end of file
export async function downloadZip(token, markdown) {
const res = await fetch(`${BASE}/files/download-zip`, {
method: "POST", headers: headers(token),
body: JSON.stringify({ markdown }),
});
if (!res.ok) throw new Error("Download failed");
const ct = res.headers.get("content-type") || "";
if (ct.includes("application/zip")) {
const blob = await res.blob();
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = "son-of-anton-code.zip";
a.click();
URL.revokeObjectURL(url);
} else {
const data = await res.json();
if (data.error) throw new Error(data.error);
}
}
This diff is collapsed.
This diff is collapsed.
/**
* Son of Anton — Background Stream Manager
*
* Runs AI streams outside React component lifecycle.
* Switching chats does NOT abort streams. Multiple chats
* can stream simultaneously. Components subscribe to
* per-chat updates via subscribe().
*/
import { streamMessage } from "./api";
// chatId -> { text, thinking, isThinking, abortController }
const _streams = new Map();
// chatId -> Set<() => void>
const _listeners = new Map();
// Store dispatch reference, set once from AppProvider
let _dispatch = null;
export function setDispatch(dispatch) {
_dispatch = dispatch;
}
export function setDispatch(dispatch) { _dispatch = dispatch; }
/** Read current stream data for a chat (non-reactive, call from inside subscriber) */
export function getStreamData(chatId) {
const s = _streams.get(chatId);
if (!s) return { streaming: false, text: "", thinking: "", isThinking: false };
return {
streaming: true,
text: s.text,
thinking: s.thinking,
isThinking: s.isThinking,
};
return { streaming: true, text: s.text, thinking: s.thinking, isThinking: s.isThinking };
}
/** Is this chat currently streaming? */
export function isStreaming(chatId) {
return _streams.has(chatId);
}
export function isStreaming(chatId) { return _streams.has(chatId); }
/** Is ANY chat currently streaming? (for UI indicators, NOT for blocking) */
export function isAnyStreaming() {
return _streams.size > 0;
}
/** Subscribe to stream data changes for a specific chat. Returns unsubscribe fn. */
export function subscribe(chatId, callback) {
export function subscribe(chatId, cb) {
if (!_listeners.has(chatId)) _listeners.set(chatId, new Set());
_listeners.get(chatId).add(callback);
return () => {
const set = _listeners.get(chatId);
if (set) {
set.delete(callback);
if (set.size === 0) _listeners.delete(chatId);
}
};
_listeners.get(chatId).add(cb);
return () => { const s = _listeners.get(chatId); if (s) { s.delete(cb); if (!s.size) _listeners.delete(chatId); } };
}
function _notify(chatId) {
const set = _listeners.get(chatId);
if (set) set.forEach((cb) => cb());
}
function _notify(id) { const s = _listeners.get(id); if (s) s.forEach((cb) => cb()); }
/** Abort a running stream for a chat */
export function abortStream(chatId) {
const s = _streams.get(chatId);
if (s) {
s.abortController.abort();
_streams.delete(chatId);
_notify(chatId);
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false });
}
if (s) { s.abortController.abort(); _streams.delete(chatId); _notify(chatId); if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false }); }
}
/**
* Start a background stream for a chat.
* If a stream already exists for this chat, it is aborted first.
*/
export function startStream({ token, chatId, body }) {
// Abort existing stream for THIS chat only (other chats keep streaming)
if (_streams.has(chatId)) {
abortStream(chatId);
}
const abortController = new AbortController();
const streamState = {
text: "",
thinking: "",
isThinking: false,
abortController,
};
_streams.set(chatId, streamState);
if (_streams.has(chatId)) return;
const ac = new AbortController();
_streams.set(chatId, { text: "", thinking: "", isThinking: false, abortController: ac });
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: true });
_notify(chatId);
// Fire and forget — runs independently of React
(async () => {
const s = _streams.get(chatId);
if (!s) return;
let usage = {}, msgId = "";
try {
const gen = streamMessage(token, chatId, body, abortController.signal);
for await (const event of gen) {
const s = _streams.get(chatId);
if (!s) break; // stream was aborted
if (event.type === "thinking") {
s.thinking += event.content || "";
s.isThinking = true;
} else if (event.type === "text") {
s.text += event.content || "";
s.isThinking = false;
} else if (event.type === "done") {
// Final message — add to store
if (_dispatch && event.message) {
_dispatch({
type: "ADD_MESSAGE",
chatId,
message: event.message,
});
}
// Auto-title
if (_dispatch && event.title) {
_dispatch({
type: "UPDATE_CHAT",
chat: { id: chatId, title: event.title },
});
}
} else if (event.type === "error") {
s.text += `\n\n**Error:** ${event.content || "Unknown error"}`;
for await (const evt of streamMessage(token, chatId, body, ac.signal)) {
if (ac.signal.aborted || !_streams.has(chatId)) break;
switch (evt.type) {
case "thinking_start": s.isThinking = true; _notify(chatId); break;
case "thinking_delta": s.thinking += evt.content; _notify(chatId); break;
case "thinking_end": s.isThinking = false; _notify(chatId); break;
case "text_delta": s.text += evt.content; _notify(chatId); break;
case "usage": usage = { input_tokens: evt.input_tokens, output_tokens: evt.output_tokens }; break;
case "title_update": if (_dispatch) _dispatch({ type: "UPDATE_CHAT", chat: { id: chatId, title: evt.title } }); break;
case "done": msgId = evt.message_id; break;
case "error": s.text += `\n\n**Error:** ${evt.message}`; _notify(chatId); break;
}
_notify(chatId);
}
if (!ac.signal.aborted && _dispatch) {
_dispatch({ type: "ADD_MESSAGE", chatId, message: {
id: msgId || `gen-${Date.now()}`, role: "assistant", content: s.text,
thinking_content: s.thinking || null, input_tokens: usage.input_tokens || 0,
output_tokens: usage.output_tokens || 0, created_at: new Date().toISOString(), attachments: [],
}});
}
} catch (err) {
if (err.name !== "AbortError") {
const s = _streams.get(chatId);
if (s) {
s.text += `\n\n**Stream error:** ${err.message}`;
_notify(chatId);
}
if (!ac.signal.aborted && _dispatch) {
_dispatch({ type: "ADD_MESSAGE", chatId, message: {
id: `err-${Date.now()}`, role: "assistant", content: `**Error:** ${err.message}`,
created_at: new Date().toISOString(), attachments: [],
}});
}
} finally {
_streams.delete(chatId);
_notify(chatId);
_streams.delete(chatId); _notify(chatId);
if (_dispatch) _dispatch({ type: "SET_STREAMING", chatId, streaming: false });
}
})();
}
\ No newline at end of file
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment