db init in voice agent

parent 9fff7202
...@@ -21,4 +21,4 @@ RUN chmod +x wait-for-postgres.sh ...@@ -21,4 +21,4 @@ RUN chmod +x wait-for-postgres.sh
ENTRYPOINT ["/app/wait-for-postgres.sh"] ENTRYPOINT ["/app/wait-for-postgres.sh"]
# This is your application's original startup command # This is your application's original startup command
CMD ["python", "main.py"] CMD ["/bin/bash", "-c", "python apply_test_schema.py && python insert_csv_embeddings.py && python main.py"]
\ No newline at end of file \ No newline at end of file
import psycopg2
import os
schema_sql = """
-- Create students table
CREATE TABLE IF NOT EXISTS students (
id SERIAL PRIMARY KEY,
student_id VARCHAR(50) UNIQUE NOT NULL,
student_name VARCHAR(100),
grade VARCHAR(20),
language BOOLEAN,
nationality VARCHAR(20) NOT NULL DEFAULT 'EGYPTIAN'
);
-- Create chat_history table
CREATE TABLE IF NOT EXISTS chat_history (
id SERIAL PRIMARY KEY,
student_id VARCHAR(50) NOT NULL,
role VARCHAR(20) NOT NULL CHECK (role IN ('user', 'assistant', 'system')),
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (student_id) REFERENCES students(student_id) ON DELETE CASCADE
);
-- Create indexes for better performance
CREATE INDEX IF NOT EXISTS idx_chat_history_student_id ON chat_history(student_id);
CREATE INDEX IF NOT EXISTS idx_chat_history_created_at ON chat_history(created_at);
CREATE INDEX IF NOT EXISTS idx_students_nationality ON students(nationality);
-- Insert dummy data for testing
INSERT INTO students (student_id, student_name, grade, language, nationality) VALUES
('student_001', 'Ahmed Ali', 'prime4', TRUE, 'EGYPTIAN'),
('student_002', 'Sara Hassan', 'prime6', FALSE, 'SAUDI'),
('student_003', 'Mona Adel', 'prime5', TRUE, 'EGYPTIAN'),
('student_004', 'Omar Youssef', 'prime6', FALSE, 'SAUDI')
ON CONFLICT (student_id) DO NOTHING;
"""
drop_all_tables_sql = """
DO $$
DECLARE
rec RECORD;
BEGIN
-- drop all tables in public schema
FOR rec IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') LOOP
EXECUTE 'DROP TABLE IF EXISTS "' || rec.tablename || '" CASCADE';
END LOOP;
END $$;
"""
def setup_database(drop_existing_tables: bool = False):
"""
Sets up the database schema and tables.
Args:
drop_existing_tables: If True, drops all existing tables before creating them.
"""
try:
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", "5432"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
dbname=os.getenv("POSTGRES_DB")
)
conn.autocommit = True
with conn.cursor() as cur:
if drop_existing_tables:
print("Dropping all existing tables...")
cur.execute(drop_all_tables_sql)
print("All tables dropped.")
print("Setting up schema and inserting data...")
cur.execute(schema_sql)
print("Database setup complete. Verifying data...")
# Verifications: Select from students and chat_history tables
print("\nStudents table rows:")
cur.execute("SELECT * FROM students ORDER BY id;")
students = cur.fetchall()
for row in students:
print(row)
print("\nChat_history table rows:")
cur.execute("SELECT * FROM chat_history ORDER BY id;")
chat_history = cur.fetchall()
for row in chat_history:
print(row)
except psycopg2.OperationalError as e:
print(f"Database connection failed: {e}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
if 'conn' in locals() and conn:
conn.close()
print("Database connection closed.")
if __name__ == "__main__":
# To run with a clean slate, pass True
# setup_database(drop_existing_tables=True)
# To run without dropping tables (default)
setup_database()
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
import os
import psycopg2
import pandas as pd
import json
from dotenv import load_dotenv
# Import the pgvector adapter for psycopg2
from pgvector.psycopg2 import register_vector
load_dotenv()
def get_db_connection():
conn = psycopg2.connect(
dbname=os.getenv("POSTGRES_DB", "embeddings_db"),
user=os.getenv("POSTGRES_USER", "db_admin"),
password=os.getenv("POSTGRES_PASSWORD"),
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", 5432)
)
# Register the vector type with the connection
register_vector(conn)
return conn
def create_schema_and_table(conn, drop_existing_table: bool):
create_extension = "CREATE EXTENSION IF NOT EXISTS vector;"
create_table = """
CREATE TABLE IF NOT EXISTS educational_chunks (
id SERIAL PRIMARY KEY,
grade TEXT NOT NULL,
subject TEXT,
unit TEXT,
concept TEXT,
lesson TEXT,
from_page INT,
to_page INT,
chunk_index INT,
chunk_text TEXT NOT NULL,
is_arabic BOOLEAN NOT NULL,
embedding VECTOR(1536) NOT NULL
);
"""
create_indexes = [
"CREATE INDEX IF NOT EXISTS idx_embedding ON educational_chunks USING hnsw (embedding vector_cosine_ops);",
"CREATE INDEX IF NOT EXISTS idx_grade ON educational_chunks (grade);",
"CREATE INDEX IF NOT EXISTS idx_is_arabic ON educational_chunks (is_arabic);",
"CREATE INDEX IF NOT EXISTS idx_subject ON educational_chunks (subject);",
"CREATE INDEX IF NOT EXISTS idx_grade_is_arabic ON educational_chunks (grade, is_arabic);"
]
cur = conn.cursor()
cur.execute(create_extension)
print("CREATE EXTENSION vector operation fine.")
if drop_existing_table:
drop_table = "DROP TABLE IF EXISTS educational_chunks;"
cur.execute(drop_table)
print("DROP TABLE educational_chunks operation fine.")
cur.execute(create_table)
print("CREATE TABLE educational_chunks operation fine.")
for idx_query in create_indexes:
cur.execute(idx_query)
print(f"CREATE INDEX operation fine for: {idx_query}")
conn.commit()
cur.close()
def insert_chunks_from_csv(csv_file: str):
df = pd.read_csv(csv_file)
required_cols = [
"Grade", "Subject", "Unit", "Concept", "Lesson",
"From page", "To page", "Chunk index", "Chunk text",
"Is Arabic", "Embedding"
]
for col in required_cols:
if col not in df.columns:
raise ValueError(f"Missing required column in CSV: {col}")
conn = get_db_connection()
cur = conn.cursor()
insert_query = """
INSERT INTO educational_chunks
(grade, subject, unit, concept, lesson,
from_page, to_page, chunk_index, chunk_text,
is_arabic, embedding)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
batch_size = 50
buffer = []
for idx, row in df.iterrows():
try:
embedding = json.loads(row["Embedding"])
buffer.append((
str(row["Grade"]),
row["Subject"],
row.get("Unit"),
row.get("Concept"),
row.get("Lesson"),
int(row["From page"]) if not pd.isna(row["From page"]) else None,
int(row["To page"]) if not pd.isna(row["To page"]) else None,
int(row["Chunk index"]),
row["Chunk text"],
bool(row["Is Arabic"]),
embedding
))
except Exception as e:
print(f"Skipping row {idx} due to error: {e}")
continue
if len(buffer) >= batch_size:
cur.executemany(insert_query, buffer)
conn.commit()
print(f"Inserted {len(buffer)} rows. Operation fine.")
buffer = []
if buffer:
cur.executemany(insert_query, buffer)
conn.commit()
print(f"Inserted final {len(buffer)} rows. Operation fine.")
cur.close()
conn.close()
print("All data inserted successfully.")
def setup_embeddings_database(drop_existing_tables: bool = False):
"""
Sets up the educational chunks table and populates it with embeddings from CSV files.
Args:
drop_existing_tables: If True, drops the existing table before creating it.
"""
try:
conn = get_db_connection()
create_schema_and_table(conn, drop_existing_tables)
csv_dir = os.path.join(os.path.dirname(__file__), "embeddings")
csv_files = [
os.path.join(csv_dir, f)
for f in os.listdir(csv_dir)
if f.endswith(".csv")
]
for file in csv_files:
if os.path.exists(file):
print(f"Inserting data from {file}...")
insert_chunks_from_csv(file)
else:
print(f"File not found: {file}")
except psycopg2.OperationalError as e:
print(f"Database connection failed: {e}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
if 'conn' in locals() and conn:
conn.close()
print("Database connection closed.")
if __name__ == "__main__":
# To run with a clean slate, pass True
# setup_embeddings_database(drop_existing_tables=True)
# To run without dropping the table (default)
setup_embeddings_database()
\ No newline at end of file
...@@ -8,3 +8,5 @@ python-multipart ...@@ -8,3 +8,5 @@ python-multipart
openai openai
psycopg2-binary psycopg2-binary
pgvector pgvector
pandas
python-dotenv
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment