better db connectin hadeling for long lasting deplyment

parent a4501a9c
{
"schemaVersion": 2,
"dockerfilePath": "./Dockerfile",
"containerHttpPort": "8000",
"env": {
"POSTGRES_HOST": "srv-captain--postgres",
"MINIO_HOST": "srv-captain--minio"
}
}
...@@ -17,7 +17,7 @@ class StudentNationality(str, Enum): ...@@ -17,7 +17,7 @@ class StudentNationality(str, Enum):
class Models(str, Enum): class Models(str, Enum):
chat = "gpt-5" chat = "gpt-4o-mini"
tts = "gpt-4o-mini-tts" tts = "gpt-4o-mini-tts"
embedding = "text-embedding-3-small" embedding = "text-embedding-3-small"
transcription = "gpt-4o-transcribe" transcription = "gpt-4o-transcribe"
This diff is collapsed.
import os import os
import psycopg2 import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
from typing import List, Dict, Optional, Tuple from typing import List, Dict, Optional, Tuple
import logging import logging
...@@ -8,10 +9,12 @@ logger = logging.getLogger(__name__) ...@@ -8,10 +9,12 @@ logger = logging.getLogger(__name__)
class ChatDatabaseService: class ChatDatabaseService:
"""Simple service for managing chat history in PostgreSQL""" """Simple service for managing chat history in PostgreSQL with connection pooling"""
def __init__(self): def __init__(self):
self.conn = psycopg2.connect( self.pool = ThreadedConnectionPool(
minconn=1,
maxconn=20,
host=os.getenv("POSTGRES_HOST", "postgres"), host=os.getenv("POSTGRES_HOST", "postgres"),
user=os.getenv("POSTGRES_USER"), user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"), password=os.getenv("POSTGRES_PASSWORD"),
...@@ -20,17 +23,23 @@ class ChatDatabaseService: ...@@ -20,17 +23,23 @@ class ChatDatabaseService:
def get_student_nationality(self, student_id: str) -> Optional[str]: def get_student_nationality(self, student_id: str) -> Optional[str]:
"""Get student nationality from database""" """Get student nationality from database"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self.pool.getconn()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
"SELECT nationality FROM students WHERE student_id = %s", "SELECT nationality FROM students WHERE student_id = %s",
(student_id,) (student_id,)
) )
result = cur.fetchone() result = cur.fetchone()
return result["nationality"] if result else None return result["nationality"] if result else None
finally:
self.pool.putconn(conn)
def get_student_info(self, student_id: str) -> Optional[Dict]: def get_student_info(self, student_id: str) -> Optional[Dict]:
"""Get complete student information from database""" """Get complete student information from database"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self.pool.getconn()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
""" """
SELECT student_id, student_name, grade, language, nationality SELECT student_id, student_name, grade, language, nationality
...@@ -49,10 +58,14 @@ class ChatDatabaseService: ...@@ -49,10 +58,14 @@ class ChatDatabaseService:
'nationality': result['nationality'] 'nationality': result['nationality']
} }
return None return None
finally:
self.pool.putconn(conn)
def get_student_grade_and_language(self, student_id: str) -> Optional[Tuple[int, bool]]: def get_student_grade_and_language(self, student_id: str) -> Optional[Tuple[int, bool]]:
"""Get student grade and language preference""" """Get student grade and language preference"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self.pool.getconn()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
"SELECT grade, language FROM students WHERE student_id = %s", "SELECT grade, language FROM students WHERE student_id = %s",
(student_id,) (student_id,)
...@@ -61,10 +74,14 @@ class ChatDatabaseService: ...@@ -61,10 +74,14 @@ class ChatDatabaseService:
if result: if result:
return (result["grade"], result["language"]) return (result["grade"], result["language"])
return None return None
finally:
self.pool.putconn(conn)
def get_chat_history(self, student_id: str, limit: int = 20) -> List[Dict[str, str]]: def get_chat_history(self, student_id: str, limit: int = 20) -> List[Dict[str, str]]:
"""Get chat history for a student, returns in chronological order""" """Get chat history for a student, returns in chronological order"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self.pool.getconn()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
""" """
SELECT role, content SELECT role, content
...@@ -78,10 +95,14 @@ class ChatDatabaseService: ...@@ -78,10 +95,14 @@ class ChatDatabaseService:
results = cur.fetchall() results = cur.fetchall()
# Return in chronological order (oldest first) # Return in chronological order (oldest first)
return [{"role": row["role"], "content": row["content"]} for row in reversed(results)] return [{"role": row["role"], "content": row["content"]} for row in reversed(results)]
finally:
self.pool.putconn(conn)
def add_message(self, student_id: str, role: str, content: str): def add_message(self, student_id: str, role: str, content: str):
"""Add a message to chat history""" """Add a message to chat history"""
with self.conn.cursor() as cur: conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute( cur.execute(
""" """
INSERT INTO chat_history (student_id, role, content) INSERT INTO chat_history (student_id, role, content)
...@@ -89,20 +110,28 @@ class ChatDatabaseService: ...@@ -89,20 +110,28 @@ class ChatDatabaseService:
""", """,
(student_id, role, content) (student_id, role, content)
) )
self.conn.commit() conn.commit()
finally:
self.pool.putconn(conn)
def clear_history(self, student_id: str): def clear_history(self, student_id: str):
"""Clear chat history for a student""" """Clear chat history for a student"""
with self.conn.cursor() as cur: conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute( cur.execute(
"DELETE FROM chat_history WHERE student_id = %s", "DELETE FROM chat_history WHERE student_id = %s",
(student_id,) (student_id,)
) )
self.conn.commit() conn.commit()
finally:
self.pool.putconn(conn)
def limit_history(self, student_id: str, max_messages: int = 40): def limit_history(self, student_id: str, max_messages: int = 40):
"""Keep only recent messages for a student""" """Keep only recent messages for a student"""
with self.conn.cursor() as cur: conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute( cur.execute(
""" """
DELETE FROM chat_history DELETE FROM chat_history
...@@ -117,7 +146,9 @@ class ChatDatabaseService: ...@@ -117,7 +146,9 @@ class ChatDatabaseService:
""", """,
(student_id, student_id, max_messages) (student_id, student_id, max_messages)
) )
self.conn.commit() conn.commit()
finally:
self.pool.putconn(conn)
def update_student_info(self, student_id: str, grade: Optional[int] = None, def update_student_info(self, student_id: str, grade: Optional[int] = None,
language: Optional[bool] = None, nationality: Optional[str] = None): language: Optional[bool] = None, nationality: Optional[str] = None):
...@@ -139,7 +170,9 @@ class ChatDatabaseService: ...@@ -139,7 +170,9 @@ class ChatDatabaseService:
if updates: if updates:
params.append(student_id) params.append(student_id)
with self.conn.cursor() as cur: conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute( cur.execute(
f""" f"""
UPDATE students UPDATE students
...@@ -148,12 +181,16 @@ class ChatDatabaseService: ...@@ -148,12 +181,16 @@ class ChatDatabaseService:
""", """,
params params
) )
self.conn.commit() conn.commit()
finally:
self.pool.putconn(conn)
def create_student(self, student_id: str, student_name: str, grade: int, def create_student(self, student_id: str, student_name: str, grade: int,
language: bool, nationality: str = 'EGYPTIAN'): language: bool, nationality: str = 'EGYPTIAN'):
"""Create a new student record""" """Create a new student record"""
with self.conn.cursor() as cur: conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute( cur.execute(
""" """
INSERT INTO students (student_id, student_name, grade, language, nationality) INSERT INTO students (student_id, student_name, grade, language, nationality)
...@@ -162,8 +199,10 @@ class ChatDatabaseService: ...@@ -162,8 +199,10 @@ class ChatDatabaseService:
""", """,
(student_id, student_name, grade, language, nationality) (student_id, student_name, grade, language, nationality)
) )
self.conn.commit() conn.commit()
finally:
self.pool.putconn(conn)
def close(self): def close_pool(self):
if self.conn: if self.pool:
self.conn.close() self.pool.closeall()
\ No newline at end of file \ No newline at end of file
import os import os
import psycopg2 import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
from typing import List, Optional from typing import List, Optional
# Import the pgvector adapter # Import the pgvector adapter
from pgvector.psycopg2 import register_vector from pgvector.psycopg2 import register_vector
class PGVectorService: class PGVectorService:
"""Service for managing embeddings with PostgreSQL pgvector""" """Service for managing embeddings with PostgreSQL pgvector using connection pooling"""
def __init__(self): def __init__(self):
self.conn = psycopg2.connect( self.pool = ThreadedConnectionPool(
minconn=1,
maxconn=20,
host=os.getenv("POSTGRES_HOST", "postgres"), host=os.getenv("POSTGRES_HOST", "postgres"),
user=os.getenv("POSTGRES_USER"), user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"), password=os.getenv("POSTGRES_PASSWORD"),
dbname=os.getenv("POSTGRES_DB"), dbname=os.getenv("POSTGRES_DB"),
) )
# Register the vector type with the connection # Test connection and register vector type to ensure the pool works
register_vector(self.conn) conn = self.pool.getconn()
try:
register_vector(conn)
finally:
self.pool.putconn(conn)
def _get_conn_with_vector(self):
"""Get a connection from the pool and register vector type"""
conn = self.pool.getconn()
register_vector(conn)
return conn
def insert_embedding(self, id: int, embedding: list): def insert_embedding(self, id: int, embedding: list):
"""Insert or update an embedding""" """Insert or update an embedding"""
with self.conn.cursor() as cur: conn = self._get_conn_with_vector()
try:
with conn.cursor() as cur:
cur.execute( cur.execute(
""" """
INSERT INTO embeddings_table (id, embedding) INSERT INTO embeddings_table (id, embedding)
...@@ -30,11 +45,15 @@ class PGVectorService: ...@@ -30,11 +45,15 @@ class PGVectorService:
""", """,
(id, embedding), (id, embedding),
) )
self.conn.commit() conn.commit()
finally:
self.pool.putconn(conn)
def search_nearest(self, query_embedding: list, limit: int = 3): def search_nearest(self, query_embedding: list, limit: int = 3):
"""Search nearest embeddings using cosine distance (<-> operator)""" """Search nearest embeddings using cosine distance (<-> operator)"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self._get_conn_with_vector()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
""" """
SELECT id, embedding, embedding <-> %s AS distance SELECT id, embedding, embedding <-> %s AS distance
...@@ -45,6 +64,8 @@ class PGVectorService: ...@@ -45,6 +64,8 @@ class PGVectorService:
(query_embedding, query_embedding, limit), (query_embedding, query_embedding, limit),
) )
return cur.fetchall() return cur.fetchall()
finally:
self.pool.putconn(conn)
def search_filtered_nearest( def search_filtered_nearest(
self, self,
...@@ -55,7 +76,9 @@ class PGVectorService: ...@@ -55,7 +76,9 @@ class PGVectorService:
limit: int = 3 limit: int = 3
): ):
"""Search nearest embeddings with filtering by grade, subject, and language""" """Search nearest embeddings with filtering by grade, subject, and language"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self._get_conn_with_vector()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
""" """
SELECT id, grade, subject, unit, concept, lesson, chunk_text, SELECT id, grade, subject, unit, concept, lesson, chunk_text,
...@@ -70,6 +93,8 @@ class PGVectorService: ...@@ -70,6 +93,8 @@ class PGVectorService:
(query_embedding, grade, f"%{subject}%", is_arabic, query_embedding, limit), (query_embedding, grade, f"%{subject}%", is_arabic, query_embedding, limit),
) )
return cur.fetchall() return cur.fetchall()
finally:
self.pool.putconn(conn)
def search_flexible_filtered_nearest( def search_flexible_filtered_nearest(
self, self,
...@@ -103,7 +128,9 @@ class PGVectorService: ...@@ -103,7 +128,9 @@ class PGVectorService:
params.append(query_embedding) params.append(query_embedding)
params.append(limit) params.append(limit)
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self._get_conn_with_vector()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
f""" f"""
SELECT id, grade, subject, unit, concept, lesson, chunk_text, SELECT id, grade, subject, unit, concept, lesson, chunk_text,
...@@ -116,10 +143,14 @@ class PGVectorService: ...@@ -116,10 +143,14 @@ class PGVectorService:
params params
) )
return cur.fetchall() return cur.fetchall()
finally:
self.pool.putconn(conn)
def get_subjects_by_grade_and_language(self, grade: int, is_arabic: bool) -> List[str]: def get_subjects_by_grade_and_language(self, grade: int, is_arabic: bool) -> List[str]:
"""Get available subjects for a specific grade and language""" """Get available subjects for a specific grade and language"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur: conn = self._get_conn_with_vector()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute( cur.execute(
""" """
SELECT DISTINCT subject SELECT DISTINCT subject
...@@ -130,7 +161,9 @@ class PGVectorService: ...@@ -130,7 +161,9 @@ class PGVectorService:
(grade, is_arabic) (grade, is_arabic)
) )
return [row['subject'] for row in cur.fetchall()] return [row['subject'] for row in cur.fetchall()]
finally:
self.pool.putconn(conn)
def close(self): def close_pool(self):
if self.conn: if self.pool:
self.conn.close() self.pool.closeall()
\ No newline at end of file \ No newline at end of file
#!/bin/bash #!/bin/bash
set -e set -e
host="postgres" # Use the environment variables set in CapRover for the host and user
host="${POSTGRES_HOST}"
user="${POSTGRES_USER}" user="${POSTGRES_USER}"
db="${POSTGRES_DB}"
password="${POSTGRES_PASSWORD}"
echo "Waiting for PostgreSQL database to be ready..." echo "Waiting for PostgreSQL database at $host to be ready..."
# Wait for the database server to be available # Wait for the database server to be available. This is sufficient.
until PGPASSWORD="$password" pg_isready -h "$host" -U "$user"; do until PGPASSWORD="${POSTGRES_PASSWORD}" pg_isready -h "$host" -U "$user"; do
echo "PostgreSQL server is unavailable - sleeping" echo "PostgreSQL server is unavailable - sleeping"
sleep 1 sleep 1
done done
# Wait for the specific database to be available
until PGPASSWORD="$password" psql -h "$host" -U "$user" -d "$db" -c '\q'; do
echo "Database '$db' is not yet available - sleeping"
sleep 1
done
echo "PostgreSQL database is up and ready - executing command" echo "PostgreSQL database is up and ready - executing command"
exec "$@" exec "$@"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment