anan character v0 and perfecly long lasting deployment

parent 12111380
version: "3.8"
services:
postgres:
# Use the new custom image from Docker Hub
image: salmamohammedhamedmustafa/postgres:latest
build: ./postgres
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
......@@ -35,7 +34,7 @@ services:
retries: 3
voice-agent:
image: salmamohammedhamedmustafa/voice-agent:latest
build: ./voice_agent
ports:
- "8000:8000"
environment:
......@@ -48,6 +47,8 @@ services:
POSTGRES_USER: "${POSTGRES_USER}"
POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
POSTGRES_DB: "${POSTGRES_DB}"
DB_PORT: "${DB_PORT}"
DB_HOST: "${DB_HOST}"
depends_on:
- minio
- postgres
......
# Start from the official MinIO image
FROM minio/minio:latest
# Set the command to run MinIO with the console address
CMD ["server", "/data", "--console-address", ":9001"]
\ No newline at end of file
{
"schemaVersion": 2,
"dockerfilePath": "./Dockerfile",
"containerHttpPort": "9001",
"ports": [
"9000:9000",
"9001:9001"
],
"volumes": [
"/data"
]
}
{
"schemaVersion": 2,
"dockerfilePath": "./Dockerfile",
"containerHttpPort": "5432",
"ports": [
"5432:5432"
],
"volumes": [
"/var/lib/postgresql/data"
]
}
-- Create the main database
DO
$$
BEGIN
IF NOT EXISTS (
SELECT FROM pg_database WHERE datname = 'embeddings_db'
) THEN
PERFORM dblink_exec('dbname=' || current_database(), 'CREATE DATABASE embeddings_db');
END IF;
END
$$;
-- Connect to the newly created database
\c embeddings_db
CREATE EXTENSION IF NOT EXISTS vector;
\ No newline at end of file
......@@ -14,7 +14,7 @@ from repositories import StorageRepository, MinIOStorageRepository
from handlers import AudioMessageHandler, TextMessageHandler
from services import (
AudioService, ChatService, HealthService, ResponseService,
ResponseManager, OpenAIService, AgentService
ResponseManager, OpenAIService, AgentService, ConnectionPool, PGVectorService, ChatDatabaseService
)
class DIContainer:
......@@ -25,8 +25,17 @@ class DIContainer:
# Initialize OpenAI and Agent services
self.openai_service = OpenAIService()
self.agent_service = AgentService()
self.pool_handler = ConnectionPool(
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
host=os.getenv("DB_HOST"), # This is the crucial part
port=int(os.getenv("DB_PORT"))
)
print(os.getenv("DB_HOST"), os.getenv("POSTGRES_DB"), os.getenv("POSTGRES_USER"))
self.agent_service = AgentService(pool_handler=self.pool_handler)
# Initialize services
self.audio_service = AudioService(self.storage_repo, self.config.minio_bucket)
self.chat_service = ChatService(
......
......@@ -5,4 +5,7 @@ from .response_service import ResponseService
from .response_manager import ResponseManager
from .openai_service import OpenAIService
from .agent_service import AgentService
from .pgvector_service import PGVectorService
\ No newline at end of file
from .pgvector_service import PGVectorService
from .chat_database_service import ChatDatabaseService
from .connection_pool import ConnectionPool
from .pedagogy_service import PedagogyService
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
from typing import List, Dict, Optional, Tuple
import logging
from services.connection_pool import ConnectionPool
logger = logging.getLogger(__name__)
class ChatDatabaseService:
"""Simple service for managing chat history in PostgreSQL with connection pooling"""
"""Service for managing chat history using a shared, robust connection pool"""
def __init__(self):
self.pool = ThreadedConnectionPool(
minconn=1,
maxconn=20,
host=os.getenv("POSTGRES_HOST", "postgres"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
dbname=os.getenv("POSTGRES_DB"),
)
def __init__(self, pool_handler: 'ConnectionPoolHandler'):
self.pool_handler = pool_handler
def get_student_nationality(self, student_id: str) -> Optional[str]:
"""Get student nationality from database"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"SELECT nationality FROM students WHERE student_id = %s",
......@@ -32,13 +25,10 @@ class ChatDatabaseService:
)
result = cur.fetchone()
return result["nationality"] if result else None
finally:
self.pool.putconn(conn)
def get_student_info(self, student_id: str) -> Optional[Dict]:
"""Get complete student information from database"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
......@@ -53,18 +43,15 @@ class ChatDatabaseService:
return {
'student_id': result['student_id'],
'student_name': result['student_name'],
'grade': result['grade'], # This is now an integer
'is_arabic': result['language'], # Convert language boolean to is_arabic
'grade': result['grade'],
'is_arabic': result['language'],
'nationality': result['nationality']
}
return None
finally:
self.pool.putconn(conn)
def get_student_grade_and_language(self, student_id: str) -> Optional[Tuple[int, bool]]:
"""Get student grade and language preference"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"SELECT grade, language FROM students WHERE student_id = %s",
......@@ -74,13 +61,10 @@ class ChatDatabaseService:
if result:
return (result["grade"], result["language"])
return None
finally:
self.pool.putconn(conn)
def get_chat_history(self, student_id: str, limit: int = 20) -> List[Dict[str, str]]:
"""Get chat history for a student, returns in chronological order"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
......@@ -93,15 +77,11 @@ class ChatDatabaseService:
(student_id, limit)
)
results = cur.fetchall()
# Return in chronological order (oldest first)
return [{"role": row["role"], "content": row["content"]} for row in reversed(results)]
finally:
self.pool.putconn(conn)
def add_message(self, student_id: str, role: str, content: str):
"""Add a message to chat history"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
......@@ -111,26 +91,20 @@ class ChatDatabaseService:
(student_id, role, content)
)
conn.commit()
finally:
self.pool.putconn(conn)
def clear_history(self, student_id: str):
"""Clear chat history for a student"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM chat_history WHERE student_id = %s",
(student_id,)
)
conn.commit()
finally:
self.pool.putconn(conn)
def limit_history(self, student_id: str, max_messages: int = 40):
"""Keep only recent messages for a student"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
......@@ -147,8 +121,6 @@ class ChatDatabaseService:
(student_id, student_id, max_messages)
)
conn.commit()
finally:
self.pool.putconn(conn)
def update_student_info(self, student_id: str, grade: Optional[int] = None,
language: Optional[bool] = None, nationality: Optional[str] = None):
......@@ -170,8 +142,7 @@ class ChatDatabaseService:
if updates:
params.append(student_id)
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
f"""
......@@ -182,14 +153,11 @@ class ChatDatabaseService:
params
)
conn.commit()
finally:
self.pool.putconn(conn)
def create_student(self, student_id: str, student_name: str, grade: int,
language: bool, nationality: str = 'EGYPTIAN'):
"""Create a new student record"""
conn = self.pool.getconn()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
......@@ -200,9 +168,3 @@ class ChatDatabaseService:
(student_id, student_name, grade, language, nationality)
)
conn.commit()
finally:
self.pool.putconn(conn)
def close_pool(self):
if self.pool:
self.pool.closeall()
\ No newline at end of file
import os
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
import logging
import time
import threading
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class ConnectionPool:
"""connection pool with health monitoring and automatic recovery"""
def __init__(self, minconn=2, maxconn=20, **db_params):
self.db_params = db_params
self.minconn = minconn
self.maxconn = maxconn
self.pool = None
self._pool_lock = threading.RLock()
self._last_health_check = 0
self._health_check_interval = 300 # 5 minutes
self._connection_timeout = 30
self._idle_timeout = 7200 # 2 hours
self._initialize_pool()
def _initialize_pool(self):
"""Initialize the connection pool with proper parameters"""
try:
# Add connection parameters to handle idle connections
pool_params = {
**self.db_params,
'connect_timeout': self._connection_timeout,
# These parameters help with connection management
'keepalives_idle': 600, # Start keepalives after 10 min idle
'keepalives_interval': 30, # Send keepalive every 30 seconds
'keepalives_count': 3, # Close connection after 3 failed keepalives
}
self.pool = ThreadedConnectionPool(
minconn=self.minconn,
maxconn=self.maxconn,
**pool_params
)
logger.info(f"Connection pool initialized with {self.minconn}-{self.maxconn} connections")
except Exception as e:
logger.error(f"Failed to initialize connection pool: {e}")
raise
def _recreate_pool(self):
"""Recreate the connection pool in case of catastrophic failure"""
with self._pool_lock:
if self.pool:
try:
self.pool.closeall()
except:
pass
logger.warning("Recreating connection pool...")
self._initialize_pool()
def _validate_connection(self, conn):
"""Validate a connection with comprehensive checks"""
try:
# Check if connection is alive
with conn.cursor() as cur:
cur.execute("SELECT 1")
cur.fetchone()
# Check connection status
if conn.closed != 0:
return False
# Check for any pending transactions
if conn.info.transaction_status != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
try:
conn.rollback()
except:
return False
return True
except (psycopg2.OperationalError, psycopg2.InterfaceError,
psycopg2.DatabaseError) as e:
logger.debug(f"Connection validation failed: {e}")
return False
except Exception as e:
logger.warning(f"Unexpected error during connection validation: {e}")
return False
def _health_check(self):
"""Perform periodic health check on the pool"""
current_time = time.time()
if current_time - self._last_health_check < self._health_check_interval:
return
try:
with self._pool_lock:
if self.pool:
# Try to get a connection to test pool health
test_conn = self.pool.getconn()
if test_conn and self._validate_connection(test_conn):
self.pool.putconn(test_conn)
self._last_health_check = current_time
return
else:
# Connection is bad, try to close it
if test_conn:
try:
test_conn.close()
except:
pass
# Pool seems unhealthy, recreate it
logger.warning("Pool health check failed, recreating pool")
self._recreate_pool()
self._last_health_check = current_time
except Exception as e:
logger.error(f"Health check failed: {e}")
try:
self._recreate_pool()
except Exception as recreate_error:
logger.error(f"Failed to recreate pool during health check: {recreate_error}")
@contextmanager
def get_connection(self, max_retries=3):
"""Get a validated connection with automatic retry and recovery"""
self._health_check()
conn = None
for attempt in range(max_retries):
try:
with self._pool_lock:
if not self.pool:
self._initialize_pool()
conn = self.pool.getconn()
if conn and self._validate_connection(conn):
try:
yield conn
return
finally:
if conn:
try:
# Ensure connection is in a clean state
if conn.info.transaction_status != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
conn.rollback()
self.pool.putconn(conn)
except Exception as e:
logger.warning(f"Error returning connection to pool: {e}")
try:
conn.close()
except:
pass
else:
# Bad connection, close it
if conn:
try:
conn.close()
except:
pass
conn = None
except Exception as e:
logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
if conn:
try:
conn.close()
except:
pass
conn = None
if attempt == max_retries - 1:
# Last attempt, try to recreate pool
try:
self._recreate_pool()
except Exception as recreate_error:
logger.error(f"Failed to recreate pool: {recreate_error}")
raise ConnectionError(f"Failed to get valid connection after {max_retries} attempts")
# Wait before retry with exponential backoff
time.sleep(min(2 ** attempt, 10))
def get_valid_conn(self):
"""Legacy method for backward compatibility - get a validated connection"""
max_retries = 3
for attempt in range(max_retries):
try:
with self._pool_lock:
if not self.pool:
self._initialize_pool()
conn = self.pool.getconn()
if conn and self._validate_connection(conn):
return conn
else:
# Bad connection, close it
if conn:
try:
conn.close()
except:
pass
conn = None
except Exception as e:
logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
if conn:
try:
conn.close()
except:
pass
conn = None
if attempt == max_retries - 1:
# Last attempt, try to recreate pool
try:
self._recreate_pool()
except Exception as recreate_error:
logger.error(f"Failed to recreate pool: {recreate_error}")
raise ConnectionError(f"Failed to get valid connection after {max_retries} attempts")
# Wait before retry with exponential backoff
time.sleep(min(2 ** attempt, 10))
def put_conn(self, conn):
"""Return connection to pool - legacy method for backward compatibility"""
try:
if conn:
# Ensure connection is in a clean state
if conn.info.transaction_status != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
conn.rollback()
self.pool.putconn(conn)
except Exception as e:
logger.warning(f"Error returning connection to pool: {e}")
try:
conn.close()
except:
pass
def close_all(self):
"""Close all connections in the pool"""
with self._pool_lock:
if self.pool:
try:
self.pool.closeall()
finally:
self.pool = None
\ No newline at end of file
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
from typing import List, Optional
# Import the pgvector adapter
import logging
from pgvector.psycopg2 import register_vector
from services.connection_pool import ConnectionPool
logger = logging.getLogger(__name__)
class PGVectorService:
"""Service for managing embeddings with PostgreSQL pgvector using connection pooling"""
"""Service for managing embeddings with PostgreSQL pgvector using a shared, robust connection pool"""
def __init__(self):
self.pool = ThreadedConnectionPool(
minconn=1,
maxconn=20,
host=os.getenv("POSTGRES_HOST", "postgres"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
dbname=os.getenv("POSTGRES_DB"),
)
# Test connection and register vector type to ensure the pool works
conn = self.pool.getconn()
try:
def __init__(self, pool_handler: 'ConnectionPool'):
self.pool_handler = pool_handler
# Test connection and register vector type
with self.pool_handler.get_connection() as conn:
register_vector(conn)
finally:
self.pool.putconn(conn)
def _get_conn_with_vector(self):
"""Get a connection from the pool and register vector type"""
conn = self.pool.getconn()
register_vector(conn)
return conn
def insert_embedding(self, id: int, embedding: list):
"""Insert or update an embedding"""
conn = self._get_conn_with_vector()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
......@@ -46,13 +31,10 @@ class PGVectorService:
(id, embedding),
)
conn.commit()
finally:
self.pool.putconn(conn)
def search_nearest(self, query_embedding: list, limit: int = 3):
"""Search nearest embeddings using cosine distance (<-> operator)"""
conn = self._get_conn_with_vector()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
......@@ -64,8 +46,6 @@ class PGVectorService:
(query_embedding, query_embedding, limit),
)
return cur.fetchall()
finally:
self.pool.putconn(conn)
def search_filtered_nearest(
self,
......@@ -76,8 +56,7 @@ class PGVectorService:
limit: int = 3
):
"""Search nearest embeddings with filtering by grade, subject, and language"""
conn = self._get_conn_with_vector()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
......@@ -93,8 +72,6 @@ class PGVectorService:
(query_embedding, grade, f"%{subject}%", is_arabic, query_embedding, limit),
)
return cur.fetchall()
finally:
self.pool.putconn(conn)
def search_flexible_filtered_nearest(
self,
......@@ -128,8 +105,7 @@ class PGVectorService:
params.append(query_embedding)
params.append(limit)
conn = self._get_conn_with_vector()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
f"""
......@@ -143,13 +119,10 @@ class PGVectorService:
params
)
return cur.fetchall()
finally:
self.pool.putconn(conn)
def get_subjects_by_grade_and_language(self, grade: int, is_arabic: bool) -> List[str]:
"""Get available subjects for a specific grade and language"""
conn = self._get_conn_with_vector()
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
......@@ -161,9 +134,3 @@ class PGVectorService:
(grade, is_arabic)
)
return [row['subject'] for row in cur.fetchall()]
finally:
self.pool.putconn(conn)
def close_pool(self):
if self.pool:
self.pool.closeall()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment