Retrieval working

parent 6d1a52f6
FROM python:3.10-slim
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
#just keep the container running without doing anything
CMD ["sh", "-c", "while :; do sleep 10; done"]
import os
import psycopg2
import pandas as pd
import json
from dotenv import load_dotenv
# Import the pgvector adapter for psycopg2
from pgvector.psycopg2 import register_vector
load_dotenv()
def get_db_connection():
conn = psycopg2.connect(
dbname=os.getenv("POSTGRES_DB", "embeddings_db"),
user=os.getenv("POSTGRES_USER", "db_admin"),
password=os.getenv("POSTGRES_PASSWORD"),
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", 5432)
)
# Register the vector type with the connection
register_vector(conn)
return conn
def create_schema_and_table():
create_extension = "CREATE EXTENSION IF NOT EXISTS vector;"
drop_table = "DROP TABLE IF EXISTS educational_chunks;"
create_table = """
CREATE TABLE IF NOT EXISTS educational_chunks (
id SERIAL PRIMARY KEY,
grade TEXT NOT NULL,
subject TEXT,
unit TEXT,
concept TEXT,
lesson TEXT,
from_page INT,
to_page INT,
chunk_index INT,
chunk_text TEXT NOT NULL,
is_arabic BOOLEAN NOT NULL,
embedding VECTOR(1536) NOT NULL
);
"""
create_indexes = [
"CREATE INDEX IF NOT EXISTS idx_embedding ON educational_chunks USING hnsw (embedding vector_cosine_ops);",
"CREATE INDEX IF NOT EXISTS idx_grade ON educational_chunks (grade);",
"CREATE INDEX IF NOT EXISTS idx_is_arabic ON educational_chunks (is_arabic);",
"CREATE INDEX IF NOT EXISTS idx_subject ON educational_chunks (subject);",
"CREATE INDEX IF NOT EXISTS idx_grade_is_arabic ON educational_chunks (grade, is_arabic);"
]
conn = get_db_connection()
cur = conn.cursor()
cur.execute(create_extension)
print("CREATE EXTENSION vector operation fine.")
cur.execute(drop_table)
print("DROP TABLE educational_chunks operation fine.")
cur.execute(create_table)
print("CREATE TABLE educational_chunks operation fine.")
for idx_query in create_indexes:
cur.execute(idx_query)
print(f"CREATE INDEX operation fine for: {idx_query}")
conn.commit()
cur.close()
conn.close()
def insert_chunks_from_csv(csv_file: str):
df = pd.read_csv(csv_file)
required_cols = [
"Grade", "Subject", "Unit", "Concept", "Lesson",
"From page", "To page", "Chunk index", "Chunk text",
"Is Arabic", "Embedding"
]
for col in required_cols:
if col not in df.columns:
raise ValueError(f"Missing required column in CSV: {col}")
conn = get_db_connection()
cur = conn.cursor()
insert_query = """
INSERT INTO educational_chunks
(grade, subject, unit, concept, lesson,
from_page, to_page, chunk_index, chunk_text,
is_arabic, embedding)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
batch_size = 50
buffer = []
for idx, row in df.iterrows():
try:
embedding = json.loads(row["Embedding"])
buffer.append((
str(row["Grade"]),
row["Subject"],
row.get("Unit"),
row.get("Concept"),
row.get("Lesson"),
int(row["From page"]) if not pd.isna(row["From page"]) else None,
int(row["To page"]) if not pd.isna(row["To page"]) else None,
int(row["Chunk index"]),
row["Chunk text"],
bool(row["Is Arabic"]),
embedding
))
except Exception as e:
print(f"Skipping row {idx} due to error: {e}")
continue
if len(buffer) >= batch_size:
cur.executemany(insert_query, buffer)
conn.commit()
print(f"Inserted {len(buffer)} rows. Operation fine.")
buffer = []
if buffer:
cur.executemany(insert_query, buffer)
conn.commit()
print(f"Inserted final {len(buffer)} rows. Operation fine.")
cur.close()
conn.close()
print("All data inserted successfully.")
if __name__ == "__main__":
create_schema_and_table()
csv_files = ["prime4_ar_embeddings.csv", "Prime5_en_chunked_with_embeddings.csv", "prime6_ar_embeddings.csv", "Prime6_en_chunked_with_embeddings.csv"]
for file in csv_files:
if os.path.exists(file):
print(f"Inserting data from {file}...")
insert_chunks_from_csv(file)
else:
print(f"File not found: {file}")
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
psycopg2-binary
pandas
python-dotenv
pgvector
...@@ -86,6 +86,28 @@ services: ...@@ -86,6 +86,28 @@ services:
data-handler:
build: ./data-handler # path to your Dockerfile folder
container_name: data-handler
restart: always
environment:
MINIO_ENDPOINT: "http://minio:9000"
MINIO_ACCESS_KEY: "${MINIO_ROOT_USER}"
MINIO_SECRET_KEY: "${MINIO_ROOT_PASSWORD}"
N8N_WEBHOOK_URL: "${N8N_WEBHOOK_URL}"
OPENAI_API_KEY: "${OPENAI_API_KEY}"
MINIO_BUCKET: "${MINIO_BUCKET}"
POSTGRES_HOST: "postgres"
POSTGRES_USER: "${POSTGRES_USER}"
POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
POSTGRES_DB: "${POSTGRES_DB}"
depends_on:
- minio
- postgres
volumes: volumes:
pgdata: pgdata:
miniodata: miniodata:
......
...@@ -7,3 +7,4 @@ uvicorn[standard] ...@@ -7,3 +7,4 @@ uvicorn[standard]
python-multipart python-multipart
openai openai
psycopg2-binary psycopg2-binary
pgvector
\ No newline at end of file
...@@ -48,7 +48,7 @@ SYSTEM_PROMPTS: Dict[StudentNationality, str] = { ...@@ -48,7 +48,7 @@ SYSTEM_PROMPTS: Dict[StudentNationality, str] = {
class AgentService: class AgentService:
"""Service class for handling AI agent conversations using database memory""" """Service class for handling AI agent conversations using database memory"""
def __init__(self, use_pgvector: bool = False): def __init__(self, use_pgvector: bool = True):
self.openai_service = OpenAIService() self.openai_service = OpenAIService()
if not self.openai_service.is_available(): if not self.openai_service.is_available():
logger.warning("Warning: OPENAI_API_KEY not found. Agent service will be disabled.") logger.warning("Warning: OPENAI_API_KEY not found. Agent service will be disabled.")
...@@ -106,6 +106,13 @@ class AgentService: ...@@ -106,6 +106,13 @@ class AgentService:
if not student_info: if not student_info:
raise HTTPException(status_code=404, detail=f"Student with ID {student_id} not found") raise HTTPException(status_code=404, detail=f"Student with ID {student_id} not found")
# Print student information
print("----------------- Student Info Retrieved -----------------")
print(f"Student ID: {student_id}")
for key, value in student_info.items():
print(f"{key.capitalize()}: {value}")
print("---------------------------------------------------------")
logger.info(f"Retrieved student info from DB: {student_info} for student: {student_id}") logger.info(f"Retrieved student info from DB: {student_info} for student: {student_id}")
# Convert nationality string to StudentNationality enum # Convert nationality string to StudentNationality enum
...@@ -163,6 +170,8 @@ class AgentService: ...@@ -163,6 +170,8 @@ class AgentService:
) )
if neighbors: if neighbors:
# Print retrieval results
print("\n----------------- Retrieval Results -----------------")
context_message = f"معلومات من المنهج لمادة {subject} للصف {student_info['grade']}:\n" context_message = f"معلومات من المنهج لمادة {subject} للصف {student_info['grade']}:\n"
for i, n in enumerate(neighbors, 1): for i, n in enumerate(neighbors, 1):
unit_info = f" - الوحدة: {n['unit']}" if n['unit'] else "" unit_info = f" - الوحدة: {n['unit']}" if n['unit'] else ""
...@@ -173,9 +182,21 @@ class AgentService: ...@@ -173,9 +182,21 @@ class AgentService:
context_message += f"المحتوى: {n['chunk_text'][:200]}...\n" context_message += f"المحتوى: {n['chunk_text'][:200]}...\n"
context_message += f"(درجة التشابه: {n['distance']:.3f})\n" context_message += f"(درجة التشابه: {n['distance']:.3f})\n"
print(f"Result {i}:")
print(f" Unit: {n['unit']}")
print(f" Concept: {n['concept']}")
print(f" Lesson: {n['lesson']}")
print(f" Chunk Text: {n['chunk_text']}...")
print(f" Distance: {n['distance']:.3f}")
print("-" * 20)
print("-----------------------------------------------------")
messages.append({"role": "system", "content": context_message}) messages.append({"role": "system", "content": context_message})
logger.info(f"Added {len(neighbors)} filtered knowledge base results for subject: {subject}") logger.info(f"Added {len(neighbors)} filtered knowledge base results for subject: {subject}")
else: else:
print("\n----------------- Retrieval Results -----------------")
print(f"No relevant content found for subject: {subject}, grade: {student_info['grade']}, Arabic: {student_info['is_arabic']}")
print("-----------------------------------------------------")
logger.info(f"No relevant content found for subject: {subject}, grade: {student_info['grade']}, Arabic: {student_info['is_arabic']}") logger.info(f"No relevant content found for subject: {subject}, grade: {student_info['grade']}, Arabic: {student_info['is_arabic']}")
except Exception as e: except Exception as e:
...@@ -188,6 +209,7 @@ class AgentService: ...@@ -188,6 +209,7 @@ class AgentService:
temperature=temperature temperature=temperature
) )
ai_response = response.choices[0].message.content.strip() ai_response = response.choices[0].message.content.strip()
if not ai_response: if not ai_response:
raise ValueError("Empty response from AI model") raise ValueError("Empty response from AI model")
......
...@@ -2,6 +2,8 @@ import os ...@@ -2,6 +2,8 @@ import os
import psycopg2 import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
from typing import List, Optional from typing import List, Optional
# Import the pgvector adapter
from pgvector.psycopg2 import register_vector
class PGVectorService: class PGVectorService:
...@@ -14,6 +16,8 @@ class PGVectorService: ...@@ -14,6 +16,8 @@ class PGVectorService:
password=os.getenv("POSTGRES_PASSWORD"), password=os.getenv("POSTGRES_PASSWORD"),
dbname=os.getenv("POSTGRES_DB"), dbname=os.getenv("POSTGRES_DB"),
) )
# Register the vector type with the connection
register_vector(self.conn)
def insert_embedding(self, id: int, embedding: list): def insert_embedding(self, id: int, embedding: list):
"""Insert or update an embedding""" """Insert or update an embedding"""
...@@ -55,12 +59,12 @@ class PGVectorService: ...@@ -55,12 +59,12 @@ class PGVectorService:
cur.execute( cur.execute(
""" """
SELECT id, grade, subject, unit, concept, lesson, chunk_text, SELECT id, grade, subject, unit, concept, lesson, chunk_text,
is_arabic, embedding <-> %s AS distance is_arabic, embedding <-> %s::vector AS distance
FROM educational_chunks FROM educational_chunks
WHERE grade = %s WHERE grade = %s
AND subject ILIKE %s AND subject ILIKE %s
AND is_arabic = %s AND is_arabic = %s
ORDER BY embedding <-> %s ORDER BY embedding <-> %s::vector
LIMIT %s; LIMIT %s;
""", """,
(query_embedding, grade, f"%{subject}%", is_arabic, query_embedding, limit), (query_embedding, grade, f"%{subject}%", is_arabic, query_embedding, limit),
...@@ -103,10 +107,10 @@ class PGVectorService: ...@@ -103,10 +107,10 @@ class PGVectorService:
cur.execute( cur.execute(
f""" f"""
SELECT id, grade, subject, unit, concept, lesson, chunk_text, SELECT id, grade, subject, unit, concept, lesson, chunk_text,
is_arabic, embedding <-> %s AS distance is_arabic, embedding <-> %s::vector AS distance
FROM educational_chunks FROM educational_chunks
{where_clause} {where_clause}
ORDER BY embedding <-> %s ORDER BY embedding <-> %s::vector
LIMIT %s; LIMIT %s;
""", """,
params params
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment