use posrgress for chat history

parent 425d7b68
...@@ -7,5 +7,8 @@ COPY requirements.txt . ...@@ -7,5 +7,8 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
COPY . . COPY . .
#keep the container running always #just keep the container running without doing anything
CMD ["python", "main.py"] CMD ["sh", "-c", "while :; do sleep 10; done"]
#run the app automatically when the container starts
#CMD ["python", "main.py"]
...@@ -3,7 +3,7 @@ from fastapi import FastAPI, UploadFile, File, Form, HTTPException ...@@ -3,7 +3,7 @@ from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from typing import Optional from typing import Optional
import uvicorn import uvicorn
from core import AppConfig from core import AppConfig, StudentNationality
from repositories import StorageRepository, MinIOStorageRepository from repositories import StorageRepository, MinIOStorageRepository
from handlers import AudioMessageHandler, TextMessageHandler from handlers import AudioMessageHandler, TextMessageHandler
from services import ( from services import (
...@@ -64,7 +64,7 @@ def create_app() -> FastAPI: ...@@ -64,7 +64,7 @@ def create_app() -> FastAPI:
Handles incoming chat messages (either text or audio). Handles incoming chat messages (either text or audio).
Generates responses locally using the agent service. Generates responses locally using the agent service.
""" """
return container.chat_service.process_message(file, text) return container.chat_service.process_message(student_id="student_001", file=file, text=text, nationality=StudentNationality.EGYPTIAN)
@app.get("/get-audio-response") @app.get("/get-audio-response")
async def get_audio_response(): async def get_audio_response():
......
...@@ -8,20 +8,21 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) ...@@ -8,20 +8,21 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import StudentNationality, Models from core import StudentNationality, Models
from services.pgvector_service import PGVectorService from services.pgvector_service import PGVectorService
from services.openai_service import OpenAIService from services.openai_service import OpenAIService
from services.chat_database_service import ChatDatabaseService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SYSTEM_PROMPTS: Dict[StudentNationality, str] = { SYSTEM_PROMPTS: Dict[StudentNationality, str] = {
StudentNationality.EGYPTIAN: """إنت مدرس كيميا لطفل في ابتدائي. رد باللهجة المصريّة السهلة. كلّم الطفل كأنك بتحكي له بصوت طبيعي. خلي الجمل قصيرة وواضحة، وما تحشرش معلومات كتير في جملة واحدة. ما تقولش الحاجات البديهية اللي هو عارفها زي "المَيَّه بتتشرب". قول المعلومة مرّة واحدة من غير تكرار. لو هتدي مثال أو تشبيه، يكون حاجة جديدة بتوضّح الفكرة، مش مجرد إعادة. خلّي المثال بسيط زي لعبة، شكل، أو صورة في الخيال. اكتب الكلمات زي ما بتنطق بالتشكيل الصح (زي: مَيَّه، أوكسچين). لو فيه رموز كيميائية زي H2O أو CO2 اكتبها زي ما هي. خلي الشرح شبه حكاية صغيرة أو صورة في دماغ الطفل، مش زي شرح كتاب.""", StudentNationality.EGYPTIAN: """إنت مدرس كيميا لطفل في ابتدائي. رد باللهجة المصريّة السهلة. كلّم الطفل كأنك بتحكي له بصوت طبيعي. خلي الجمل قصيرة وواضحة، وما تحشرش معلومات كتير في جملة واحدة. ما تقولش الحاجات البديهية اللي هو عارفها زي "المَيَّه بتتشرب". قول المعلومة مرّة واحدة من غير تكرار. لو هتدي مثال أو تشبيه، يكون حاجة جديدة بتوضّح الفكرة، مش مجرد إعادة. خلّي المثال بسيط زي لعبة، شكل، أو صورة في الخيال. اكتب الكلمات زي ما بتنطق بالتشكيل الصح (زي: مَيَّه، أوكسچين). لو فيه رموز كيميائية زي H2O أو CO2 اكتبها زي ما هي. خلي الشرح شبه حكاية صغيرة أو صورة في دماغ الطفل، مش زي شرح كتاب.""",
StudentNationality.SAUDI: """إنت مُعلّم كيميا لطفل في ابتدائي. رد باللهجة السعوديّة الدارجة والبسيطة. كَلّم الطفل كأنك تحاكيه وجهاً لوجه بصوت طبيعي. خل الجمل قصار وواضحة، لا تكدّس معلومات كثير في جملة وحدة. لا تقول أشياء بديهية يعرفها مثل "المُوَيَّه نشربها". أعط المعلومة مرّة وحدة بلا تكرار. لو بتضرب مثال أو تشبيه، يكون زاوية جديدة توضّح الفكرة، ما يكون تكرار. خلّ المثال شي بسيط يقرّب المعنى للطفل: زي لعبة، حركة، أو صورة يتخيّلها. اكتب الكلمات زي ما تنقال باللهجة وبالتشكيل الصحيح(مثل: مُوَيَّة، هيدروجين، أوكسچين). لو فيه رموز كيميائية مثل H2O أو CO2 اكتُبها زي ما هي. الشرح يكون كأنه سواليف بسيطة أو حكاية تخلي الطفل يتصوّرها، مو زي كلام كتاب مدرسي.""" StudentNationality.SAUDI: """إنت مُعلّم كيميا لطفل في ابتدائي. رد باللهجة السعوديّة الدارجة والبسيطة. كَلّم الطفل كأنك تحاكيه وجهاً لوجه بصوت طبيعي. خل الجمل قصار وواضحة، لا تكدّس معلومات كثير في جملة وحدة. لا تقول أشياء بديهية يعرفها مثل "المُوَيَّه نشربها". أعط المعلومة مرّة وحدة بلا تكرار. لو بتضرب مثال أو تشبيه، يكون زاوية جديدة توضّح الفكرة، ما يكون تكرار. خلّ المثال شي بسيط يقرّب المعنى للطفل: زي لعبة، حركة، أو صورة يتخيّلها. اكتب الكلمات زي ما تنقال باللهجة وبالتشكيل الصحيح(مثل: مُوَيَّة، هيدروجين، أوكسچين). لو فيه رموز كيميائية مثل H2O أو CO2 اكتُبها زي ما هي. الشرح يكون كأنه سواليف بسيطة أو حكاية تخلي الطفل يتصوّرها، مو زي كلام كتاب مدرسي."""
} }
class AgentService: class AgentService:
"""Service class for handling AI agent conversations using OpenAI GPT and optional PGVector""" """Service class for handling AI agent conversations using database memory"""
def __init__(self, use_pgvector: bool = False): def __init__(self, use_pgvector: bool = False):
self.openai_service = OpenAIService() self.openai_service = OpenAIService()
...@@ -31,57 +32,63 @@ class AgentService: ...@@ -31,57 +32,63 @@ class AgentService:
else: else:
self.client = self.openai_service.client self.client = self.openai_service.client
self.conversations: Dict[str, List[Dict[str, str]]] = {} # Use database for conversation memory
self.db_service = ChatDatabaseService()
self.pgvector = PGVectorService() if use_pgvector else None self.pgvector = PGVectorService() if use_pgvector else None
def is_available(self) -> bool: def is_available(self) -> bool:
return self.client is not None return self.client is not None
def get_conversation_history(self, conversation_id: str = "default") -> List[Dict[str, str]]: def get_conversation_history(self, student_id: str) -> List[Dict[str, str]]:
return self.conversations.get(conversation_id, []) """Get conversation history from database"""
return self.db_service.get_chat_history(student_id)
def add_message_to_history(self, message: str, role: str = "user", conversation_id: str = "default"):
if conversation_id not in self.conversations: def add_message_to_history(self, student_id: str, message: str, role: str = "user"):
self.conversations[conversation_id] = [] """Add message to database"""
self.conversations[conversation_id].append({"role": role, "content": message}) self.db_service.add_message(student_id, role, message)
if len(self.conversations[conversation_id]) > 20: # Limit history to prevent growth
messages = self.conversations[conversation_id] self.db_service.limit_history(student_id, max_messages=38)
if messages[0].get("role") == "system":
self.conversations[conversation_id] = [messages[0]] + messages[-19:]
else:
self.conversations[conversation_id] = messages[-20:]
def generate_response( def generate_response(
self, self,
user_message: str, user_message: str,
conversation_id: str = "default", student_id: str,
nationality: StudentNationality = StudentNationality.EGYPTIAN,
model: str = Models.chat, model: str = Models.chat,
temperature: float = 1.0, temperature: float = 1.0,
nationality: StudentNationality = StudentNationality.EGYPTIAN,
top_k: int = 3 top_k: int = 3
) -> str: ) -> str:
"""Generate a GPT response, optionally enriched with pgvector results""" """Generate AI response using database memory"""
if not self.is_available(): if not self.is_available():
raise HTTPException(status_code=500, detail="Agent service not available") raise HTTPException(status_code=500, detail="Agent service not available")
try: try:
self.add_message_to_history(user_message, "user", conversation_id) # Add user message to database
self.add_message_to_history(student_id, user_message, "user")
# Get conversation history from database
conversation_history = self.get_conversation_history(student_id)
# Pick system prompt # Pick system prompt
system_prompt = SYSTEM_PROMPTS.get(nationality, SYSTEM_PROMPTS[StudentNationality.EGYPTIAN]) system_prompt = SYSTEM_PROMPTS.get(nationality, SYSTEM_PROMPTS[StudentNationality.EGYPTIAN])
# Prepare messages
messages = [] messages = []
conversation_history = self.get_conversation_history(conversation_id)
if not conversation_history or conversation_history[0].get("role") != "system": # Check if system message exists
has_system_message = conversation_history and conversation_history[0].get("role") == "system"
if not has_system_message:
messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "system", "content": system_prompt})
self.conversations.setdefault(conversation_id, []).insert(0, { # Add system message to database
"role": "system", self.add_message_to_history(student_id, system_prompt, "system")
"content": system_prompt
}) # Add conversation history
messages.extend(conversation_history) messages.extend(conversation_history)
# If pgvector is enabled → enrich with nearest neighbors # Optional pgvector enrichment
if self.pgvector: if self.pgvector:
try:
query_embedding = self.openai_service.generate_embedding(user_message) query_embedding = self.openai_service.generate_embedding(user_message)
neighbors = self.pgvector.search_nearest(query_embedding, limit=top_k) neighbors = self.pgvector.search_nearest(query_embedding, limit=top_k)
...@@ -90,6 +97,8 @@ class AgentService: ...@@ -90,6 +97,8 @@ class AgentService:
for n in neighbors: for n in neighbors:
context_message += f"- ID {n['id']} (distance {n['distance']:.4f})\n" context_message += f"- ID {n['id']} (distance {n['distance']:.4f})\n"
messages.append({"role": "system", "content": context_message}) messages.append({"role": "system", "content": context_message})
except Exception as e:
logger.warning(f"Error using pgvector: {e}")
# Generate AI response # Generate AI response
response = self.client.chat.completions.create( response = self.client.chat.completions.create(
...@@ -102,7 +111,9 @@ class AgentService: ...@@ -102,7 +111,9 @@ class AgentService:
if not ai_response: if not ai_response:
raise ValueError("Empty response from AI model") raise ValueError("Empty response from AI model")
self.add_message_to_history(ai_response, "assistant", conversation_id) # Add AI response to database
self.add_message_to_history(student_id, ai_response, "assistant")
return ai_response return ai_response
except Exception as e: except Exception as e:
...@@ -110,21 +121,34 @@ class AgentService: ...@@ -110,21 +121,34 @@ class AgentService:
raise HTTPException(status_code=500, detail=f"AI response generation failed: {str(e)}") raise HTTPException(status_code=500, detail=f"AI response generation failed: {str(e)}")
def search_similar(self, query_embedding: List[float], top_k: int = 3): def search_similar(self, query_embedding: List[float], top_k: int = 3):
"""Optional nearest neighbor search if PGVector is enabled""" """Optional pgvector search"""
if not self.pgvector: if not self.pgvector:
raise HTTPException(status_code=400, detail="PGVector service not enabled") raise HTTPException(status_code=400, detail="PGVector service not enabled")
return self.pgvector.search_nearest(query_embedding, limit=top_k) return self.pgvector.search_nearest(query_embedding, limit=top_k)
def close(self):
"""Close database connection"""
if self.db_service:
self.db_service.close()
# ----------------- Suggested Test ----------------- # ----------------- Test -----------------
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
# Agent with pgvector enabled
agent = AgentService(use_pgvector=True) agent = AgentService(use_pgvector=True)
if agent.is_available(): if agent.is_available():
reply = agent.generate_response("هو يعني إيه ذَرّة؟", model="gpt-5-nano", nationality=StudentNationality.EGYPTIAN) try:
reply = agent.generate_response(
"هو يعني إيه ذَرّة؟",
student_id="student_001",
nationality=StudentNationality.EGYPTIAN
)
print("AI:", reply) print("AI:", reply)
except Exception as e:
print(f"Test failed: {e}")
finally:
agent.close()
else: else:
print("Agent service not available. Check OPENAI_API_KEY.") print("Agent service not available. Check OPENAI_API_KEY.")
\ No newline at end of file
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import List, Dict
import logging
logger = logging.getLogger(__name__)
class ChatDatabaseService:
"""Simple service for managing chat history in PostgreSQL"""
def __init__(self):
self.conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "postgres"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
dbname=os.getenv("POSTGRES_DB"),
)
def get_chat_history(self, student_id: str, limit: int = 20) -> List[Dict[str, str]]:
"""Get chat history for a student, returns in chronological order"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT role, content
FROM chat_history
WHERE student_id = %s
ORDER BY created_at DESC
LIMIT %s;
""",
(student_id, limit)
)
results = cur.fetchall()
# Return in chronological order (oldest first)
return [{"role": row["role"], "content": row["content"]} for row in reversed(results)]
def add_message(self, student_id: str, role: str, content: str):
"""Add a message to chat history"""
with self.conn.cursor() as cur:
cur.execute(
"""
INSERT INTO chat_history (student_id, role, content)
VALUES (%s, %s, %s);
""",
(student_id, role, content)
)
self.conn.commit()
def clear_history(self, student_id: str):
"""Clear chat history for a student"""
with self.conn.cursor() as cur:
cur.execute(
"DELETE FROM chat_history WHERE student_id = %s",
(student_id,)
)
self.conn.commit()
def limit_history(self, student_id: str, max_messages: int = 40):
"""Keep only recent messages for a student"""
with self.conn.cursor() as cur:
cur.execute(
"""
DELETE FROM chat_history
WHERE student_id = %s
AND role != 'system'
AND id NOT IN (
SELECT id FROM chat_history
WHERE student_id = %s AND role != 'system'
ORDER BY created_at DESC
LIMIT %s
);
""",
(student_id, student_id, max_messages)
)
self.conn.commit()
def close(self):
if self.conn:
self.conn.close()
\ No newline at end of file
...@@ -21,7 +21,7 @@ class ChatService: ...@@ -21,7 +21,7 @@ class ChatService:
self.openai_service = openai_service self.openai_service = openai_service
self.agent_service = agent_service self.agent_service = agent_service
# Message handlers (no webhook dependencies) # Message handlers
self.handlers = { self.handlers = {
MessageType.AUDIO: AudioMessageHandler( MessageType.AUDIO: AudioMessageHandler(
storage_repo, storage_repo,
...@@ -31,99 +31,85 @@ class ChatService: ...@@ -31,99 +31,85 @@ class ChatService:
MessageType.TEXT: TextMessageHandler() MessageType.TEXT: TextMessageHandler()
} }
def process_message(self, file: Optional[UploadFile] = None, text: Optional[str] = None) -> dict: def process_message(self,
"""Process incoming message and generate agent response directly""" student_id: str,
file: Optional[UploadFile] = None,
text: Optional[str] = None,
nationality: StudentNationality = StudentNationality.EGYPTIAN) -> dict:
"""Process message for student using database memory"""
self.response_manager.clear_response() self.response_manager.clear_response()
try: try:
# Process the input message first # Process the input message
if file and file.filename: if file and file.filename:
# Handle audio message - transcribe first
result = self.handlers[MessageType.AUDIO].handle(file=file) result = self.handlers[MessageType.AUDIO].handle(file=file)
if result.get("status") == "success": if result.get("status") == "success":
# Get transcribed text from the result
user_message = result.get("transcription", "") user_message = result.get("transcription", "")
if not user_message: if not user_message:
# Fallback message if transcription failed
user_message = "تم إرسال رسالة صوتية - فشل في التفريغ المحلي" user_message = "تم إرسال رسالة صوتية - فشل في التفريغ المحلي"
else: else:
raise HTTPException(status_code=400, detail="Failed to process audio message") raise HTTPException(status_code=400, detail="Failed to process audio message")
elif text: elif text:
# Handle text message
result = self.handlers[MessageType.TEXT].handle(text=text) result = self.handlers[MessageType.TEXT].handle(text=text)
user_message = text user_message = text
else: else:
raise HTTPException(status_code=400, detail="No text or audio file provided.") raise HTTPException(status_code=400, detail="No text or audio file provided.")
# Generate agent response using the local agent service # Generate agent response using database
try: try:
agent_response = self.agent_service.generate_response(user_message, nationality=StudentNationality.EGYPTIAN) agent_response = self.agent_service.generate_response(
user_message=user_message,
student_id=student_id,
nationality=nationality
)
# Generate TTS audio from the response # Generate TTS audio
audio_filename = self._generate_and_upload_audio(agent_response) audio_filename = self._generate_and_upload_audio(agent_response)
# Store response for retrieval # Store response for retrieval
self.response_manager.store_response(agent_response, audio_filename) self.response_manager.store_response(agent_response, audio_filename)
print(f"Generated agent response: {agent_response[:100]}...") print(f"Generated response for student {student_id}: {agent_response[:100]}...")
return { return {
"status": "success", "status": "success",
"message": "Message processed and agent response ready", "message": "Message processed and agent response ready",
"student_id": student_id,
"agent_response": agent_response, "agent_response": agent_response,
"audio_filename": audio_filename "audio_filename": audio_filename
} }
except Exception as agent_error: except Exception as agent_error:
print(f"Agent service error: {agent_error}") print(f"Agent error for student {student_id}: {agent_error}")
raise HTTPException(status_code=500, detail=f"Agent response generation failed: {str(agent_error)}") raise HTTPException(status_code=500, detail=f"Agent response failed: {str(agent_error)}")
except Exception as e: except Exception as e:
print(f"Error processing message: {e}") print(f"Error processing message for student {student_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to process message: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to process message: {str(e)}")
def _generate_and_upload_audio(self, text: str) -> str: def _generate_and_upload_audio(self, text: str) -> str:
"""Generate audio from text and upload to MinIO, return filename""" """Generate and upload TTS audio"""
try: try:
import time import time
# Generate audio using OpenAI service
temp_file_path = self.openai_service.generate_speech(text) temp_file_path = self.openai_service.generate_speech(text)
# Generate unique filename for MinIO
timestamp = int(time.time()) timestamp = int(time.time())
filename = f"agent_response_{timestamp}.mp3" filename = f"agent_response_{timestamp}.mp3"
minio_file_path = f"audio/{filename}" minio_file_path = f"audio/{filename}"
print(f"Uploading generated audio to MinIO: {minio_file_path}") print(f"Uploading audio to MinIO: {minio_file_path}")
# Upload to MinIO
with open(temp_file_path, 'rb') as audio_file: with open(temp_file_path, 'rb') as audio_file:
self.storage_repo.upload_file(audio_file, self.config.minio_bucket, minio_file_path) self.storage_repo.upload_file(audio_file, self.config.minio_bucket, minio_file_path)
# Clean up temporary file
self.openai_service.cleanup_temp_file(temp_file_path) self.openai_service.cleanup_temp_file(temp_file_path)
print(f"Successfully generated and uploaded TTS audio: {filename}") print(f"Successfully generated TTS audio: {filename}")
return filename return filename
except Exception as e: except Exception as e:
print(f"Error generating and uploading audio: {e}") print(f"Error generating audio: {e}")
# Don't fail the entire request if TTS fails
return None return None
\ No newline at end of file
def get_agent_stats(self, conversation_id: str = "default") -> dict:
"""Get conversation statistics from agent service"""
return self.agent_service.get_conversation_stats(conversation_id)
def clear_conversation(self, conversation_id: str = "default"):
"""Clear conversation history"""
self.agent_service.clear_conversation(conversation_id)
return {"status": "success", "message": f"Conversation {conversation_id} cleared"}
def set_system_prompt(self, prompt: str):
"""Update the agent's system prompt"""
self.agent_service.set_system_prompt(prompt)
return {"status": "success", "message": "System prompt updated"}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment