Commit 13e4cefa authored by Administrator's avatar Administrator

Update 2 files via Son of Anton

parent 2a5ea07a
"""
GitLab CE integration routes — superadmin + user-facing.
Son of Anton v4.2.0
"""
import asyncio
import json
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import User, GitLabSettings, LinkedRepo, PendingAction, KnowledgeBase, KnowledgeDocument
from backend.auth import require_superadmin, get_current_user, check_feature
from backend.services import gitlab_service, code_analyzer, rag_service
router = APIRouter()
# ═══════════════════════════════════════════════════
# USER-FACING ENDPOINTS (non-superadmin, permission-gated)
# These let regular users with can_use_gitlab pick repos/branches in chat
# Request Bodies
# ═══════════════════════════════════════════════════
from backend.auth import get_current_user, check_feature
from backend.models import KnowledgeBase, KnowledgeDocument
from backend.services import rag_service
class SettingsBody(BaseModel):
gitlab_url: str
gitlab_token: str
class CreateProjectBody(BaseModel):
name: str
description: str = ""
visibility: str = "private"
class LinkRepoBody(BaseModel):
gitlab_project_id: int
class CommitBody(BaseModel):
branch: str
commit_message: str
actions: list[dict]
class SingleCommitBody(BaseModel):
branch: str
file_path: str
content: str
commit_message: str
action: str = "auto"
class BranchBody(BaseModel):
branch_name: str
ref: str = "main"
class MergeRequestBody(BaseModel):
source_branch: str
target_branch: str
title: str
description: str = ""
class ActionBody(BaseModel):
linked_repo_id: str
action_type: str
title: str = ""
payload: str
class CreateKBFromRepoBody(BaseModel):
branch: Optional[str] = None
name: Optional[str] = None
def _get_settings(db: Session) -> GitLabSettings:
s = db.query(GitLabSettings).first()
if not s or not s.is_active or not s.gitlab_url or not s.gitlab_token:
raise HTTPException(400, "GitLab not configured. Set up connection in GitLab Command Center.")
return s
def _get_repo(repo_id: str, db: Session) -> LinkedRepo:
repo = db.query(LinkedRepo).filter(LinkedRepo.id == repo_id).first()
if not repo:
raise HTTPException(404, "Linked repo not found")
return repo
# ═══════════════════════════════════════════════════
# Connection Settings
# ═══════════════════════════════════════════════════
@router.get("/settings")
def get_settings(admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
s = db.query(GitLabSettings).first()
if not s:
return {"gitlab_url": "", "gitlab_token_set": False, "is_active": False}
return {
"gitlab_url": s.gitlab_url,
"gitlab_token_set": bool(s.gitlab_token),
"is_active": s.is_active,
"updated_at": str(s.updated_at) if s.updated_at else None,
}
@router.put("/settings")
def update_settings(body: SettingsBody, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
s = db.query(GitLabSettings).first()
if not s:
s = GitLabSettings()
db.add(s)
s.gitlab_url = body.gitlab_url.rstrip("/")
s.gitlab_token = body.gitlab_token
s.is_active = True
db.commit()
return {"ok": True}
@router.post("/test-connection")
async def test_connection(admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
s = db.query(GitLabSettings).first()
if not s or not s.gitlab_url or not s.gitlab_token:
raise HTTPException(400, "GitLab URL and token not configured")
try:
result = await gitlab_service.test_connection(s.gitlab_url, s.gitlab_token)
return result
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, f"Connection failed: {e.detail}")
except Exception as e:
raise HTTPException(502, f"Cannot reach GitLab: {str(e)}")
# ═══════════════════════════════════════════════════
# GitLab Projects (remote)
# ═══════════════════════════════════════════════════
@router.get("/projects")
async def search_projects(
search: Optional[str] = Query(None),
owned: bool = Query(False),
admin: User = Depends(require_superadmin),
db: Session = Depends(get_db),
):
s = _get_settings(db)
try:
projects = await gitlab_service.list_projects(s.gitlab_url, s.gitlab_token, search=search, owned=owned)
return projects
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
@router.post("/projects")
async def create_project(body: CreateProjectBody, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
s = _get_settings(db)
try:
project = await gitlab_service.create_project(
s.gitlab_url, s.gitlab_token,
name=body.name, description=body.description, visibility=body.visibility,
)
return project
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
# ═══════════════════════════════════════════════════
# Linked Repos (local)
# ═══════════════════════════════════════════════════
@router.get("/repos")
def list_repos(admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
repos = db.query(LinkedRepo).order_by(LinkedRepo.created_at.desc()).all()
return [_repo_dict(r) for r in repos]
@router.post("/repos")
async def link_repo(body: LinkRepoBody, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
existing = db.query(LinkedRepo).filter(LinkedRepo.gitlab_project_id == body.gitlab_project_id).first()
if existing:
return _repo_dict(existing)
s = _get_settings(db)
try:
project = await gitlab_service.get_project(s.gitlab_url, s.gitlab_token, body.gitlab_project_id)
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, f"Cannot fetch project: {e.detail}")
repo = LinkedRepo(
gitlab_project_id=project["id"],
name=project["name"],
path_with_namespace=project["path_with_namespace"],
default_branch=project.get("default_branch", "main"),
web_url=project.get("web_url", ""),
description=project.get("description", ""),
map_status="analyzing",
)
db.add(repo)
db.commit()
db.refresh(repo)
asyncio.create_task(_analyze_repo_background(
repo.id, s.gitlab_url, s.gitlab_token,
project["id"], project.get("default_branch", "main"),
))
return _repo_dict(repo)
@router.delete("/repos/{repo_id}")
def unlink_repo(repo_id: str, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
repo = _get_repo(repo_id, db)
db.delete(repo)
db.commit()
return {"ok": True}
# ═══════════════════════════════════════════════════
# Architecture Map
# ═══════════════════════════════════════════════════
async def _analyze_repo_background(repo_id: str, gitlab_url: str, gitlab_token: str, project_id: int, branch: str):
"""Background task: load all files and generate architecture map."""
from backend.database import SessionLocal as BgSession
db = BgSession()
try:
repo = db.query(LinkedRepo).filter(LinkedRepo.id == repo_id).first()
if not repo:
return
repo.map_status = "analyzing"
db.commit()
result = await gitlab_service.load_project_files(
gitlab_url, gitlab_token, project_id, ref=branch,
)
files = result.get("files", [])
if not files:
repo.map_status = "failed"
repo.architecture_map = "[No files could be loaded for analysis]"
db.commit()
return
architecture_map = code_analyzer.analyze_codebase(files)
repo.architecture_map = architecture_map
repo.map_status = "ready"
repo.map_generated_at = datetime.utcnow()
db.commit()
print(f" ✅ Architecture map generated for {repo.name} ({len(architecture_map)} chars)")
except Exception as e:
try:
repo = db.query(LinkedRepo).filter(LinkedRepo.id == repo_id).first()
if repo:
repo.map_status = "failed"
repo.architecture_map = f"[Analysis failed: {str(e)[:200]}]"
db.commit()
except Exception:
pass
print(f" ❌ Architecture analysis failed for repo {repo_id}: {e}")
finally:
db.close()
@router.post("/repos/{repo_id}/analyze")
async def reanalyze_repo(repo_id: str, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
"""Re-generate the architecture map for a linked repo."""
s = _get_settings(db)
repo = _get_repo(repo_id, db)
repo.map_status = "analyzing"
db.commit()
asyncio.create_task(_analyze_repo_background(
repo.id, s.gitlab_url, s.gitlab_token,
repo.gitlab_project_id, repo.default_branch,
))
return {"ok": True, "status": "analyzing"}
@router.get("/repos/{repo_id}/map")
def get_repo_map(repo_id: str, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
"""Get the architecture map for a linked repo."""
repo = _get_repo(repo_id, db)
return {
"map_status": repo.map_status or "none",
"map_generated_at": str(repo.map_generated_at) if repo.map_generated_at else None,
"architecture_map": repo.architecture_map or "",
"map_size": len(repo.architecture_map or ""),
}
# ═══════════════════════════════════════════════════
# Repository Operations
# ═══════════════════════════════════════════════════
@router.get("/repos/{repo_id}/tree")
async def get_tree(
repo_id: str,
path: str = Query(""),
ref: Optional[str] = Query(None),
admin: User = Depends(require_superadmin),
db: Session = Depends(get_db),
):
s = _get_settings(db)
repo = _get_repo(repo_id, db)
branch = ref or repo.default_branch
try:
tree = await gitlab_service.get_tree(s.gitlab_url, s.gitlab_token, repo.gitlab_project_id, path=path, ref=branch)
return {"branch": branch, "path": path, "items": tree}
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
@router.get("/repos/{repo_id}/file")
async def get_file(
repo_id: str,
path: str = Query(...),
ref: Optional[str] = Query(None),
admin: User = Depends(require_superadmin),
db: Session = Depends(get_db),
):
s = _get_settings(db)
repo = _get_repo(repo_id, db)
branch = ref or repo.default_branch
try:
file_data = await gitlab_service.get_file_content(s.gitlab_url, s.gitlab_token, repo.gitlab_project_id, path, ref=branch)
return file_data
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
@router.get("/repos/{repo_id}/branches")
async def get_branches(repo_id: str, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
s = _get_settings(db)
repo = _get_repo(repo_id, db)
try:
branches = await gitlab_service.list_branches(s.gitlab_url, s.gitlab_token, repo.gitlab_project_id)
return branches
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
@router.post("/repos/{repo_id}/branches")
async def create_branch(repo_id: str, body: BranchBody, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
s = _get_settings(db)
repo = _get_repo(repo_id, db)
try:
result = await gitlab_service.create_branch(s.gitlab_url, s.gitlab_token, repo.gitlab_project_id, body.branch_name, body.ref)
return result
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
@router.post("/repos/{repo_id}/commit")
async def commit_code(repo_id: str, body: CommitBody, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
"""Commit multiple files. Auto-detects create vs update per file."""
s = _get_settings(db)
repo = _get_repo(repo_id, db)
existing_paths = set()
try:
tree = await gitlab_service.get_tree(
s.gitlab_url, s.gitlab_token, repo.gitlab_project_id,
ref=body.branch, recursive=True,
)
existing_paths = {item["path"] for item in tree if item["type"] == "blob"}
except Exception:
pass
resolved_actions = []
for a in body.actions:
file_path = a.get("file_path", "")
content = a.get("content", "")
requested = a.get("action", "auto")
if not file_path:
continue
file_exists = file_path in existing_paths
if requested in ("auto", "upsert"):
actual = "update" if file_exists else "create"
elif requested == "update" and not file_exists:
actual = "create"
elif requested == "create" and file_exists:
actual = "update"
else:
actual = requested
resolved_actions.append({"action": actual, "file_path": file_path, "content": content})
if not resolved_actions:
raise HTTPException(400, "No valid files to commit")
try:
result = await gitlab_service.commit_files(
s.gitlab_url, s.gitlab_token, repo.gitlab_project_id,
body.branch, body.commit_message, resolved_actions,
)
return result
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
@router.post("/repos/{repo_id}/commit-single")
async def commit_single(
repo_id: str,
body: SingleCommitBody,
admin: User = Depends(require_superadmin),
db: Session = Depends(get_db),
):
"""Commit a single file. Auto-detects create vs update."""
s = _get_settings(db)
repo = _get_repo(repo_id, db)
action = body.action
if action in ("update", "create", "auto"):
try:
await gitlab_service.get_file_content(
s.gitlab_url, s.gitlab_token,
repo.gitlab_project_id, body.file_path, ref=body.branch,
)
file_exists = True
except gitlab_service.GitLabError:
file_exists = False
if action == "auto":
action = "update" if file_exists else "create"
elif action == "update" and not file_exists:
action = "create"
elif action == "create" and file_exists:
action = "update"
try:
result = await gitlab_service.commit_single_file(
s.gitlab_url, s.gitlab_token, repo.gitlab_project_id,
body.branch, body.file_path, body.content, body.commit_message, action,
)
return result
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
@router.post("/repos/{repo_id}/merge-request")
async def create_mr(repo_id: str, body: MergeRequestBody, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
s = _get_settings(db)
repo = _get_repo(repo_id, db)
try:
result = await gitlab_service.create_merge_request(
s.gitlab_url, s.gitlab_token, repo.gitlab_project_id,
body.source_branch, body.target_branch, body.title, body.description,
)
return result
except gitlab_service.GitLabError as e:
raise HTTPException(e.status_code, e.detail)
# ═══════════════════════════════════════════════════
# Pending Actions
# ═══════════════════════════════════════════════════
@router.get("/actions")
def list_actions(
status: str = Query("pending"),
admin: User = Depends(require_superadmin),
db: Session = Depends(get_db),
):
q = db.query(PendingAction).filter(PendingAction.status == status)
actions = q.order_by(PendingAction.created_at.desc()).limit(100).all()
return [_action_dict(a, db) for a in actions]
@router.post("/actions")
def create_action(body: ActionBody, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
repo = _get_repo(body.linked_repo_id, db)
action = PendingAction(
linked_repo_id=repo.id,
action_type=body.action_type,
title=body.title,
payload=body.payload,
)
db.add(action)
db.commit()
db.refresh(action)
return _action_dict(action, db)
@router.post("/actions/{action_id}/approve")
async def approve_action(action_id: str, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
action = db.query(PendingAction).filter(PendingAction.id == action_id).first()
if not action:
raise HTTPException(404)
if action.status != "pending":
raise HTTPException(400, f"Action already {action.status}")
s = _get_settings(db)
repo = _get_repo(action.linked_repo_id, db)
payload = json.loads(action.payload)
try:
if action.action_type == "commit":
result = await gitlab_service.commit_files(
s.gitlab_url, s.gitlab_token, repo.gitlab_project_id,
payload["branch"], payload["commit_message"], payload["actions"],
)
action.result_message = json.dumps(result)
elif action.action_type == "create_branch":
result = await gitlab_service.create_branch(
s.gitlab_url, s.gitlab_token, repo.gitlab_project_id,
payload["branch_name"], payload.get("ref", repo.default_branch),
)
action.result_message = json.dumps(result)
elif action.action_type == "create_mr":
result = await gitlab_service.create_merge_request(
s.gitlab_url, s.gitlab_token, repo.gitlab_project_id,
payload["source_branch"], payload["target_branch"],
payload["title"], payload.get("description", ""),
)
action.result_message = json.dumps(result)
else:
raise HTTPException(400, f"Unknown action type: {action.action_type}")
action.status = "approved"
action.resolved_at = datetime.utcnow()
db.commit()
return {"ok": True, "result": json.loads(action.result_message)}
except gitlab_service.GitLabError as e:
action.status = "rejected"
action.result_message = f"GitLab error: {e.detail}"
action.resolved_at = datetime.utcnow()
db.commit()
raise HTTPException(e.status_code, e.detail)
@router.post("/actions/{action_id}/reject")
def reject_action(action_id: str, admin: User = Depends(require_superadmin), db: Session = Depends(get_db)):
action = db.query(PendingAction).filter(PendingAction.id == action_id).first()
if not action:
raise HTTPException(404)
action.status = "rejected"
action.resolved_at = datetime.utcnow()
db.commit()
return {"ok": True}
# ═══════════════════════════════════════════════════
# USER-FACING ENDPOINTS (permission-gated, not superadmin-only)
# ═══════════════════════════════════════════════════
@router.get("/user/repos")
def user_list_repos(user: User = Depends(get_current_user), db: Session = Depends(get_db)):
"""List linked repos — available to any user with can_use_gitlab."""
......@@ -153,4 +681,39 @@ def _chunk_for_kb(text, chunk_size=3000, overlap=300):
if chunk:
chunks.append(chunk)
start = end - overlap if end < len(text) else end
return chunks
\ No newline at end of file
return chunks
# ═══════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════
def _repo_dict(r: LinkedRepo) -> dict:
return {
"id": r.id,
"gitlab_project_id": r.gitlab_project_id,
"name": r.name,
"path_with_namespace": r.path_with_namespace,
"default_branch": r.default_branch,
"web_url": r.web_url,
"description": r.description,
"map_status": r.map_status or "none",
"map_generated_at": str(r.map_generated_at) if r.map_generated_at else None,
"created_at": str(r.created_at),
}
def _action_dict(a: PendingAction, db: Session) -> dict:
repo = db.query(LinkedRepo).filter(LinkedRepo.id == a.linked_repo_id).first()
return {
"id": a.id,
"linked_repo_id": a.linked_repo_id,
"repo_name": repo.name if repo else "?",
"action_type": a.action_type,
"title": a.title,
"payload": a.payload,
"status": a.status,
"result_message": a.result_message,
"created_at": str(a.created_at),
"resolved_at": str(a.resolved_at) if a.resolved_at else None,
}
\ No newline at end of file
async def _build_repo_context(self, db, chat, user_query):
"""
Background generation manager — v4.2.0 with web search + branch-aware repo context.
"""
import asyncio
import time
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, field
from backend.database import SessionLocal
from backend.models import User, Chat, Message, ChatAttachment, GitLabSettings, LinkedRepo
from backend.system_prompt import build_full_prompt
from backend.services import bedrock_service, memory_service, rag_service, attachment_service, gitlab_service
_tree_cache: dict[str, tuple[float, list[dict]]] = {}
TREE_CACHE_TTL = 600
_chat_file_history: dict[str, set[str]] = {}
def _get_tree_cache(repo_id, branch):
key = f"{repo_id}:{branch}"
if key in _tree_cache:
ts, tree = _tree_cache[key]
if time.time() - ts < TREE_CACHE_TTL:
return tree
return None
def _set_tree_cache(repo_id, branch, tree):
_tree_cache[f"{repo_id}:{branch}"] = (time.time(), tree)
@dataclass
class GenerationState:
events: list = field(default_factory=list)
done: asyncio.Event = field(default_factory=asyncio.Event)
message_id: str = ""
error: str = ""
class GenerationManager:
def __init__(self):
self._active: dict[str, GenerationState] = {}
def is_active(self, chat_id: str) -> bool:
state = self._active.get(chat_id)
return state is not None and not state.done.is_set()
def get_state(self, chat_id: str) -> Optional[GenerationState]:
return self._active.get(chat_id)
def start(self, chat_id, user_id, content, model, max_tokens, reasoning_budget, knowledge_base_id, attachment_ids, web_search=False):
old = self._active.get(chat_id)
if old and not old.done.is_set():
old.done.set()
state = GenerationState()
self._active[chat_id] = state
asyncio.create_task(self._run(state, chat_id, user_id, content, model, max_tokens, reasoning_budget, knowledge_base_id, attachment_ids, web_search))
return state
async def stream_events(self, chat_id):
state = self._active.get(chat_id)
if not state: return
idx = 0
while True:
while idx < len(state.events):
yield state.events[idx]; idx += 1
if state.done.is_set():
while idx < len(state.events):
yield state.events[idx]; idx += 1
break
await asyncio.sleep(0.02)
def invalidate_repo_cache(self, repo_id):
for k in [k for k in _tree_cache if k.startswith(f"{repo_id}:")]:
_tree_cache.pop(k, None)
async def _build_repo_context(self, db, chat, user_query):
if not chat.linked_repo_id: return None
repo = db.query(LinkedRepo).filter(LinkedRepo.id == chat.linked_repo_id).first()
if not repo: return None
......@@ -34,4 +112,117 @@ async def _build_repo_context(self, db, chat, user_query):
lines.append(f"\n━━━ {f['path']} ━━━"); lines.append(f["content"]); lines.append(f"━━━ end ━━━")
for f in result["query_files"]:
lines.append(f"\n━━━ {f['path']} ━━━"); lines.append(f["content"]); lines.append(f"━━━ end ━━━")
return "\n".join(lines)
\ No newline at end of file
return "\n".join(lines)
async def _run(self, state, chat_id, user_id, content, model_id, max_tokens, reasoning_budget, knowledge_base_id, attachment_ids, web_search=False):
db = SessionLocal()
try:
chat = db.query(Chat).filter(Chat.id == chat_id, Chat.user_id == user_id).first()
if not chat:
state.events.append({"type": "error", "message": "Chat not found"}); return
db_user = db.query(User).filter(User.id == user_id).first()
now = datetime.utcnow()
if db_user.quota_reset_date and now >= db_user.quota_reset_date:
db_user.tokens_used_this_month = 0
db_user.quota_reset_date = datetime(now.year + 1, 1, 1) if now.month == 12 else datetime(now.year, now.month + 1, 1)
db.commit()
if db_user.tokens_used_this_month >= db_user.quota_tokens_monthly:
state.events.append({"type": "error", "message": "Monthly quota exceeded."}); return
attachments = []
if attachment_ids:
attachments = db.query(ChatAttachment).filter(ChatAttachment.id.in_(attachment_ids), ChatAttachment.chat_id == chat_id).all()
stored_content = content
if attachments:
labels = {"image": "Image", "video": "Video", "document": "Document", "text": "File"}
notes = [f"[{labels.get(a.file_type, 'File')}: {a.original_filename}]" for a in attachments]
stored_content = "\n".join(notes) + "\n" + content
user_msg = Message(chat_id=chat_id, role="user", content=stored_content)
db.add(user_msg); db.commit(); db.refresh(user_msg)
for att in attachments: att.message_id = user_msg.id
if attachments: db.commit()
kb_id = knowledge_base_id or chat.knowledge_base_id
rag_context = None
if kb_id:
try: rag_context = rag_service.query(kb_id, content, n_results=8)
except Exception: pass
repo_context = await self._build_repo_context(db, chat, content)
attachment_context = memory_service.gather_attachment_context(chat_id, db)
# Web search
web_context = None
if web_search:
try:
from backend.services.web_search_service import search_web
state.events.append({"type": "status", "message": "Searching the web..."})
web_context = await search_web(content, num_results=8, fetch_pages=3)
except Exception as e:
web_context = f"[Web search failed: {str(e)[:100]}]"
system_prompt = build_full_prompt(rag_context=rag_context, repo_context=repo_context, attachment_context=attachment_context, web_search_context=web_context)
messages = memory_service.build_messages(chat, db)
if attachments and messages and messages[-1]["role"] == "user":
content_blocks = attachment_service.build_claude_content_blocks(attachments)
content_blocks.append({"type": "text", "text": content})
messages[-1]["content"] = content_blocks
effective_max = max_tokens
thinking_config = None
if reasoning_budget > 0:
thinking_config = {"enabled": True, "budget_tokens": reasoning_budget}
effective_max = max_tokens + reasoning_budget
full_text = ""; full_thinking = ""; input_tokens = 0; output_tokens = 0; current_block_type = "text"
async for event in bedrock_service.stream_response(messages=messages, system_prompt=system_prompt, model_id=model_id, max_tokens=min(effective_max, 65536), thinking_config=thinking_config):
if state.done.is_set(): break
evt_type = event.get("type", "")
if evt_type == "message_start":
usage = event.get("message", {}).get("usage", {}); input_tokens = usage.get("input_tokens", 0)
elif evt_type == "content_block_start":
current_block_type = event.get("content_block", {}).get("type", "text")
if current_block_type == "thinking": state.events.append({"type": "thinking_start"})
elif evt_type == "content_block_delta":
delta = event.get("delta", {}); dt = delta.get("type", "")
if dt == "thinking_delta":
t = delta.get("thinking", ""); full_thinking += t; state.events.append({"type": "thinking_delta", "content": t})
elif dt == "text_delta":
t = delta.get("text", ""); full_text += t; state.events.append({"type": "text_delta", "content": t})
elif evt_type == "content_block_stop":
if current_block_type == "thinking": state.events.append({"type": "thinking_end"})
elif evt_type == "message_delta":
output_tokens = event.get("usage", {}).get("output_tokens", 0)
assistant_msg = Message(chat_id=chat_id, role="assistant", content=full_text, thinking_content=full_thinking or None, input_tokens=input_tokens, output_tokens=output_tokens)
db.add(assistant_msg)
db_user.tokens_used_this_month += input_tokens + output_tokens
chat.model = model_id; chat.max_tokens = max_tokens; chat.reasoning_budget = reasoning_budget
chat.knowledge_base_id = knowledge_base_id or None; chat.updated_at = datetime.utcnow()
db.commit()
state.message_id = assistant_msg.id
msg_count = db.query(Message).filter(Message.chat_id == chat_id).count()
if msg_count <= 2 and chat.title == "New Chat":
try:
title = await self._generate_title(content, full_text[:300])
chat.title = title[:120]; db.commit()
state.events.append({"type": "title_update", "title": chat.title})
except Exception: pass
state.events.append({"type": "usage", "input_tokens": input_tokens, "output_tokens": output_tokens})
state.events.append({"type": "done", "message_id": assistant_msg.id})
except Exception as exc:
state.events.append({"type": "error", "message": str(exc)}); state.error = str(exc)
finally:
state.done.set(); db.close()
await asyncio.sleep(120); self._active.pop(chat_id, None)
async def _generate_title(self, user_msg, ai_msg):
from backend.config import FAST_MODEL
result = await bedrock_service.invoke_model_simple(model_id=FAST_MODEL, prompt=f"Generate a concise title (max 6 words):\nUser: {user_msg[:200]}\nAssistant: {ai_msg[:200]}\nRespond ONLY with the title.", max_tokens=30)
return result.strip().strip('"').strip("'")
manager = GenerationManager()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment