add life span manager to guarantee graceful db shutdown

parent 5b55ec50
......@@ -268,4 +268,4 @@ if __name__ == "__main__":
print("🔍 Verifying Setup")
# Verify the setup
verify_curriculum_structure()
\ No newline at end of file
verify_curriculum_structure()
import os
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, Response
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
from typing import Optional
import uvicorn
import base64
from pathlib import Path
# Import your existing modules
from core import AppConfig, StudentNationality
from repositories import StorageRepository, MinIOStorageRepository
from handlers import AudioMessageHandler, TextMessageHandler
from core import AppConfig
from repositories import MinIOStorageRepository
from services import (
AudioService, ChatService, HealthService, ResponseService,
ResponseManager, OpenAIService, AgentService, ConnectionPool, PGVectorService, ChatDatabaseService, LanguageSegmentationService
ResponseManager, OpenAIService, AgentService, ConnectionPool, LanguageSegmentationService
)
class DIContainer:
......@@ -30,7 +30,7 @@ class DIContainer:
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
host=os.getenv("DB_HOST"), # This is the crucial part
host=os.getenv("DB_HOST"),
port=int(os.getenv("DB_PORT"))
)
print(os.getenv("DB_HOST"), os.getenv("POSTGRES_DB"), os.getenv("POSTGRES_USER"))
......@@ -50,8 +50,30 @@ class DIContainer:
self.response_service = ResponseService(self.response_manager, self.audio_service)
self.health_service = HealthService(self.storage_repo, self.config)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages application startup and shutdown events for resource safety.
"""
# --- Code to run ON STARTUP ---
print("Application starting up...")
container = DIContainer()
app.state.container = container
print("DIContainer created and database pool initialized.")
yield # The application is now running and handling requests
# --- Code to run ON SHUTDOWN ---
print("Application shutting down...")
# This is the guaranteed, graceful shutdown call
app.state.container.agent_service.close()
print("Database connection pool closed successfully.")
def create_app() -> FastAPI:
app = FastAPI(title="Unified Chat API with Local Agent")
# Connect the lifespan manager to your FastAPI app instance
app = FastAPI(title="Unified Chat API with Local Agent", lifespan=lifespan)
# Fixed CORS configuration for CapRover
app.add_middleware(
......@@ -76,35 +98,33 @@ def create_app() -> FastAPI:
expose_headers=["X-Response-Text"],
)
# Initialize dependencies
container = DIContainer()
# Print configuration
print("MinIO Endpoint:", container.config.minio_endpoint)
print("MinIO Bucket:", container.config.minio_bucket)
print("OpenAI Service Available:", container.openai_service.is_available())
print("Agent Service Available:", container.agent_service.is_available())
# NOTE: The container is now created and managed by the 'lifespan' function.
# No need to create it here.
# Serve static files if the directory exists
static_path = Path("static")
if static_path.exists():
app.mount("/static", StaticFiles(directory=static_path), name="static")
@app.on_event("startup")
async def startup_event():
# Access the container from app state to print config on startup
container = app.state.container
print("MinIO Endpoint:", container.config.minio_endpoint)
print("MinIO Bucket:", container.config.minio_bucket)
print("OpenAI Service Available:", container.openai_service.is_available())
print("Agent Service Available:", container.agent_service.is_available())
@app.get("/chat-interface")
async def serve_audio_recorder():
"""Serve the audio recorder HTML file"""
try:
# Try to serve from static directory first
static_file = Path("static/audio-recorder.html")
if static_file.exists():
return FileResponse(static_file)
# Fallback to current directory
current_file = Path("audio-recorder.html")
if current_file.exists():
return FileResponse(current_file)
# If no file found, return an error
raise HTTPException(status_code=404, detail="Audio recorder interface not found")
except Exception as e:
print(f"Error serving audio recorder: {e}")
......@@ -112,57 +132,39 @@ def create_app() -> FastAPI:
@app.post("/chat")
async def chat_handler(
request: Request,
file: Optional[UploadFile] = File(None),
text: Optional[str] = Form(None),
student_id: str = Form("student_001"),
game_context: Optional[str] = Form(None)
):
"""
Handles incoming chat messages (either text or audio).
Generates responses locally using the agent service.
"""
"""Handles incoming chat messages using the shared container instance."""
container = request.app.state.container
try:
if not student_id.strip():
raise HTTPException(status_code=400, detail="Student ID is required")
print(f"Processing message for student: {student_id}")
print(f"Text: {text}")
print(f"File: {file.filename if file else 'None'}")
result = container.chat_service.process_message(
student_id=student_id,
file=file,
text=text,
game_context=game_context
)
print(f"Chat service result: {result}")
return result
except Exception as e:
print(f"Error in chat handler: {str(e)}")
raise HTTPException(status_code=500, detail=f"Chat processing error: {str(e)}")
@app.get("/get-audio-response")
async def get_audio_response(student_id: str = "student_001"):
"""Fetches the agent's text and audio response with proper CORS headers."""
async def get_audio_response(request: Request, student_id: str = "student_001"):
"""Fetches the agent's text and audio response using the shared container."""
container = request.app.state.container
try:
print("Getting audio response...")
result = container.response_service.get_agent_response(student_id=student_id)
if hasattr(result, 'status_code'):
# This is already a Response object from response_service
print(f"Response headers: {dict(result.headers)}")
print(f"Response audio raw bytes size: {len(result.body) if result.body else 'N/A'}")
print(f"Response audio first 20 bytes: {result.body[:20] if result.body else 'N/A'}")
return result
print(f"Created response with headers: {dict(response.headers)}")
return response
# This should be unreachable if response_service always returns a Response object
return result
except Exception as e:
print(f"Error getting audio response: {str(e)}")
raise HTTPException(status_code=500, detail=f"Audio response error: {str(e)}")
......@@ -170,34 +172,19 @@ def create_app() -> FastAPI:
@app.options("/chat")
async def chat_options():
"""Handle preflight CORS requests for chat endpoint"""
return Response(
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "*",
"Access-Control-Max-Age": "86400"
}
)
return Response(status_code=204, headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "*"})
@app.options("/get-audio-response")
async def audio_response_options():
"""Handle preflight CORS requests for audio response endpoint"""
return Response(
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "*",
"Access-Control-Expose-Headers": "X-Response-Text",
"Access-Control-Max-Age": "86400"
}
)
return Response(status_code=204, headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, OPTIONS", "Access-Control-Allow-Headers": "*", "Access-Control-Expose-Headers": "X-Response-Text"})
@app.get("/health")
async def health_check():
"""Health check endpoint with agent service status"""
async def health_check(request: Request):
"""Health check endpoint using the shared container."""
container = request.app.state.container
try:
health_status = container.health_service.get_health_status()
# Add agent service status
health_status.update({
"openai_service_status": "available" if container.openai_service.is_available() else "unavailable",
"agent_service_status": "available" if container.agent_service.is_available() else "unavailable",
......@@ -211,50 +198,46 @@ def create_app() -> FastAPI:
# Agent management endpoints
@app.get("/conversation/stats")
async def get_conversation_stats(student_id: str = "student_001"):
"""Get conversation statistics"""
async def get_conversation_stats(request: Request, student_id: str = "student_001"):
container = request.app.state.container
try:
return container.chat_service.get_agent_stats(student_id)
except Exception as e:
print(f"Stats error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/conversation/clear")
async def clear_conversation(student_id: str = Form("student_001")):
"""Clear conversation history"""
async def clear_conversation(request: Request, student_id: str = Form("student_001")):
container = request.app.state.container
try:
return container.chat_service.clear_conversation(student_id)
except Exception as e:
print(f"Clear conversation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/agent/system-prompt")
async def set_system_prompt(request: dict):
"""Update the agent's system prompt"""
async def set_system_prompt(req_body: dict, request: Request):
container = request.app.state.container
try:
prompt = request.get("prompt", "")
prompt = req_body.get("prompt", "")
if not prompt:
raise HTTPException(status_code=400, detail="System prompt cannot be empty")
return container.chat_service.set_system_prompt(prompt)
except Exception as e:
print(f"Set system prompt error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/agent/system-prompt")
async def get_system_prompt():
"""Get the current system prompt"""
async def get_system_prompt(request: Request):
container = request.app.state.container
try:
return {
"system_prompt": container.agent_service.system_prompt,
"status": "success"
}
except Exception as e:
print(f"Get system prompt error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/conversation/export")
async def export_conversation(student_id: str = "student_001"):
"""Export conversation history"""
async def export_conversation(request: Request, student_id: str = "student_001"):
container = request.app.state.container
try:
history = container.agent_service.export_conversation(student_id)
return {
......@@ -263,73 +246,32 @@ def create_app() -> FastAPI:
"total_messages": len(history)
}
except Exception as e:
print(f"Export conversation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/conversation/import")
async def import_conversation(request: dict):
"""Import conversation history"""
async def import_conversation(req_body: dict, request: Request):
container = request.app.state.container
try:
student_id = request.get("student_id", "student_001")
messages = request.get("messages", [])
student_id = req_body.get("student_id", "student_001")
messages = req_body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="Messages list cannot be empty")
container.agent_service.import_conversation(messages, student_id)
return {
"status": "success",
"message": f"Imported {len(messages)} messages to conversation {student_id}"
}
return {"status": "success", "message": f"Imported {len(messages)} messages"}
except Exception as e:
print(f"Import conversation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/debug/test-response")
async def debug_test_response():
"""Debug endpoint to test response generation"""
try:
# Test basic response
test_text = "This is a test response"
encoded_text = base64.b64encode(test_text.encode('utf-8')).decode('utf-8')
return Response(
content=b"test audio data",
media_type="audio/mpeg",
headers={
"X-Response-Text": encoded_text,
"Access-Control-Expose-Headers": "X-Response-Text"
}
)
except Exception as e:
print(f"Debug test error: {e}")
raise HTTPException(status_code=500, detail=str(e))
test_text = "This is a test response"
encoded_text = base64.b64encode(test_text.encode('utf-8')).decode('utf-8')
return Response(content=b"test audio data", media_type="audio/mpeg", headers={"X-Response-Text": encoded_text, "Access-Control-Expose-Headers": "X-Response-Text"})
@app.get("/")
async def root():
"""Root endpoint with API info"""
return {
"service": "Unified Chat API with Local Agent",
"version": "2.1.0",
"description": "Unified backend for audio/text chat with a local AI agent.",
"status": "running",
"deployment": "CapRover",
"features": [
"Local AI agent responses using OpenAI GPT",
"Audio transcription using OpenAI Whisper",
"Text-to-speech using OpenAI TTS",
"Conversation history management",
"Student-specific conversations",
"CORS enabled for cross-origin requests"
],
"endpoints": {
"chat_interface": "/chat-interface (HTML interface)",
"chat": "/chat (accepts audio or text with student_id)",
"get_audio_response": "/get-audio-response (fetches agent's audio and text)",
"health": "/health (service health check)",
"debug": "/debug/test-response (test response generation)"
}
}
return {"service": "Unified Chat API with Local Agent", "version": "2.2.0-lifespan", "status": "running"}
return app
......
......@@ -293,6 +293,3 @@ class AgentService:
except Exception as e:
logger.error(f"Error closing connection pools: {e}")
def __del__(self):
"""Destructor to ensure connection pools are closed"""
self.close()
\ No newline at end of file
......@@ -40,43 +40,44 @@ class ResponseManager:
def get_response(self, student_id: str) -> Dict:
"""
Atomically gets the response for a student and removes it from Redis
to ensure it's claimed only once.
Gets the response for a student without deleting it.
This allows the client to safely retry the request if it fails.
The key will be cleaned up automatically by Redis when its TTL expires.
"""
key = self._get_key(student_id)
# Use a pipeline to get and delete the key in a single, atomic operation
pipe = self.redis.pipeline()
pipe.get(key)
pipe.delete(key)
results = pipe.execute()
json_value = results[0]
# 1. Use a simple, non-destructive GET command. No pipeline needed.
json_value = self.redis.get(key)
if not json_value:
# If nothing was found, return the same empty structure as the old class
return {"text": None, "audio_filename": None, "audio_bytes": None}
return {"text": None, "audio_filepath": None, "audio_bytes": None}
# If data was found, decode it
# 2. Decode the payload as before.
payload = json.loads(json_value)
# Decode the Base64 string back into binary audio data
if payload.get("audio_bytes_b64"):
payload["audio_bytes"] = base64.b64decode(payload["audio_bytes_b64"])
else:
payload["audio_bytes"] = None
# Remove the temporary key before returning
del payload["audio_bytes_b64"]
return payload
def clear_response(self, student_id: str) -> None:
"""Clears a response for a specific student from Redis."""
"""
Clears a response for a specific student from Redis.
This is still important to call at the *beginning* of a new /chat request
to ensure old data is invalidated immediately.
"""
key = self._get_key(student_id)
self.redis.delete(key)
def is_response_fresh(self, student_id: str, max_age_seconds: int = 300) -> bool:
"""Checks if a response exists in Redis for the given student."""
def is_response_fresh(self, student_id: str) -> bool:
"""
Checks if a response exists in Redis for the given student.
This is much simpler and more reliable now.
"""
key = self._get_key(student_id)
# redis.exists() is the direct equivalent of checking if the key is present
# redis.exists() returns the number of keys that exist (0 or 1 in this case).
return self.redis.exists(key) > 0
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment