dir struc to fit new container

parent 421cdc50
import pandas as pd
import numpy as np
import os
import re
from openai import OpenAI
from typing import List, Dict, Any
import csv
import json
class EducationalContentProcessor:
def __init__(self, api_key: str = None):
"""
Initialize the processor with OpenAI API key
Args:
api_key: OpenAI API key. If None, will try to get from environment
"""
if api_key is None:
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
raise ValueError("OpenAI API key is required. Set OPENAI_API_KEY environment variable or pass api_key parameter.")
self.client = OpenAI(api_key=api_key)
self.embedding_model = "text-embedding-3-small"
def chunk_text(self, text: str, chunk_size: int = 500, is_arabic: bool = False) -> List[str]:
"""
Split text into chunks of approximately chunk_size words
Args:
text: Input text to chunk
chunk_size: Target number of words per chunk
is_arabic: Whether the text is in Arabic (affects punctuation handling)
Returns:
List of text chunks
"""
if not text or pd.isna(text):
return [""]
# Clean the text
text = str(text).strip()
if not text:
return [""]
# Define sentence-ending punctuation based on language
if is_arabic:
# Arabic punctuation marks for sentence endings
sentence_pattern = r'(?<=[.!?؟۔।])\s+'
else:
# English/Latin punctuation marks
sentence_pattern = r'(?<=[.!?])\s+'
# Split into sentences to preserve sentence boundaries
sentences = re.split(sentence_pattern, text)
chunks = []
current_chunk = []
current_word_count = 0
for sentence in sentences:
# Count words differently for Arabic vs other languages
if is_arabic:
# For Arabic, split on whitespace and filter out empty strings
sentence_words = len([word for word in sentence.split() if word.strip()])
else:
sentence_words = len(sentence.split())
# If adding this sentence would exceed chunk_size, start a new chunk
if current_word_count + sentence_words > chunk_size and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_word_count = sentence_words
else:
current_chunk.append(sentence)
current_word_count += sentence_words
# Add the last chunk if it has content
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks if chunks else [""]
def get_embedding(self, text: str) -> List[float]:
"""
Generate embedding for the given text using OpenAI's text-embedding-3-small
Args:
text: Text to embed
Returns:
Embedding vector as list of floats
"""
try:
# Clean text for embedding
text = str(text).strip()
if not text:
text = "empty"
response = self.client.embeddings.create(
model=self.embedding_model,
input=text,
encoding_format="float"
)
return response.data[0].embedding
except Exception as e:
print(f"Error generating embedding: {str(e)}")
# Return a zero vector of appropriate dimension (1536 for text-embedding-3-small)
return [0.0] * 1536
def detect_arabic_text(self, text: str) -> bool:
"""
Simple detection of Arabic text based on character ranges
Args:
text: Text to analyze
Returns:
True if text contains significant Arabic content
"""
if not text or pd.isna(text):
return False
text = str(text)
arabic_chars = 0
total_chars = 0
for char in text:
if char.strip(): # Skip whitespace
total_chars += 1
# Arabic Unicode ranges
if ('\u0600' <= char <= '\u06FF') or \
('\u0750' <= char <= '\u077F') or \
('\u08A0' <= char <= '\u08FF') or \
('\uFB50' <= char <= '\uFDFF') or \
('\uFE70' <= char <= '\uFEFF'):
arabic_chars += 1
# Consider text Arabic if more than 30% of characters are Arabic
return total_chars > 0 and (arabic_chars / total_chars) > 0.3
def process_csv(self, input_file: str, output_file: str, chunk_size: int = 500, is_arabic: bool = None, auto_detect_arabic: bool = True):
"""
Process the input CSV file and create chunked output with embeddings
Args:
input_file: Path to input CSV file
output_file: Path to output CSV file
chunk_size: Target words per chunk
is_arabic: Explicitly set whether content is Arabic. If None, will use auto_detect_arabic
auto_detect_arabic: Whether to automatically detect Arabic text (ignored if is_arabic is set)
"""
print(f"Reading CSV file: {input_file}")
try:
# Read the input CSV
df = pd.read_csv(input_file)
# Verify required columns
required_columns = ['Unit', 'Concept', 'Lesson', 'From page', 'To page', 'Lesson text']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
print(f"Found {len(df)} rows in input file")
# Prepare output data
output_rows = []
for idx, row in df.iterrows():
print(f"Processing row {idx + 1}/{len(df)}: {row['Unit']} - {row['Concept']} - {row['Lesson']}")
# Extract lesson text
lesson_text = row['Lesson text']
# Determine if text is Arabic
if is_arabic is not None:
text_is_arabic = is_arabic
elif auto_detect_arabic:
text_is_arabic = self.detect_arabic_text(lesson_text)
else:
text_is_arabic = False
if text_is_arabic:
print(f" Detected Arabic text - using Arabic punctuation rules")
# Chunk the text
chunks = self.chunk_text(lesson_text, chunk_size, is_arabic=text_is_arabic)
print(f" Created {len(chunks)} chunks")
# Process each chunk
for chunk_idx, chunk_text in enumerate(chunks):
print(f" Generating embedding for chunk {chunk_idx + 1}/{len(chunks)}")
# Generate embedding
embedding = self.get_embedding(chunk_text)
# Create output row
output_row = {
'Unit': row['Unit'],
'Concept': row['Concept'],
'Lesson': row['Lesson'],
'From page': row['From page'],
'To page': row['To page'],
'Chunk index': chunk_idx,
'Chunk text': chunk_text,
'Is Arabic': text_is_arabic,
'Embedding': json.dumps(embedding) # Store as JSON string
}
output_rows.append(output_row)
# Create output DataFrame and save
print(f"Saving {len(output_rows)} chunks to {output_file}")
output_df = pd.DataFrame(output_rows)
output_df.to_csv(output_file, index=False, quoting=csv.QUOTE_MINIMAL)
print("Processing complete!")
print(f"Output saved to: {output_file}")
print(f"Total chunks created: {len(output_rows)}")
except Exception as e:
print(f"Error processing file: {str(e)}")
raise
def main():
"""
Example usage of the EducationalContentProcessor
"""
# Initialize processor (API key will be read from environment)
processor = EducationalContentProcessor()
# Define file paths
input_file = "educational_content.csv" # Change this to your input file path
output_file = "educational_content_chunked_with_embeddings.csv"
# Process the file
try:
# For Arabic content, set is_arabic=True
processor.process_csv(input_file, output_file, chunk_size=500, is_arabic=False)
except Exception as e:
print(f"Failed to process file: {str(e)}")
if __name__ == "__main__":
main()
\ No newline at end of file
import os
import psycopg2
import pandas as pd
from psycopg2.extras import execute_values
from dotenv import load_dotenv
load_dotenv()
def get_db_connection():
return psycopg2.connect(
dbname=os.getenv("POSTGRES_DB","embeddings_db"),
user=os.getenv("POSTGRES_USER","db_admin"),
password=os.getenv("POSTGRES_PASSWORD"),
host=os.getenv("POSTGRES_HOST", "localhost"),
port=os.getenv("POSTGRES_PORT", 5432)
)
def insert_lessons_from_csv(file_path, conn, grade, subject):
df = pd.read_csv(file_path)
df.rename(columns={
"الوحدة": "Unit",
"المفهوم": "Concept",
"الدرس": "Lesson",
"من صفحة": "From page",
"إلى صفحة": "To page",
"النص": "Lesson text"
}, inplace=True)
required_columns = ["Unit", "Concept", "Lesson", "From page", "To page", "Lesson text"]
missing = [col for col in required_columns if col not in df.columns]
if missing:
print(f"⚠️ Missing columns in {file_path}: {', '.join(missing)}")
return
rows = []
for _, row in df.iterrows():
rows.append((
grade,
subject,
row["Unit"],
row["Concept"],
row["Lesson"],
row["From page"],
row["To page"],
row["Lesson text"]
))
query = """
INSERT INTO lessons (grade, subject, unit, concept, lesson, start_page, end_page, lesson_text)
VALUES %s
"""
with conn.cursor() as cur:
execute_values(cur, query, rows)
conn.commit()
print(f" Inserted {len(rows)} rows from {os.path.basename(file_path)}")
def main():
folder = input("Enter the path to the folder containing CSV files: ").strip()
if not os.path.exists(folder):
print("Folder not found.")
return
files = [f for f in os.listdir(folder) if f.endswith(".csv")]
if not files:
print(" No CSV files found.")
return
print("Available files:")
for i, f in enumerate(files, 1):
print(f"{i}. {f}")
selected = input("Enter the numbers of the files you want to import (e.g., 1 3 4): ").split()
selected_files = [files[int(i) - 1] for i in selected]
grade = input("Enter grade manually (e.g., Grade 5): ").strip()
subject = input("Enter subject manually (default: Science): ").strip() or "Science"
conn = get_db_connection()
try:
for f in selected_files:
file_path = os.path.join(folder, f)
insert_lessons_from_csv(file_path, conn, grade, subject)
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM lessons;")
total = cur.fetchone()[0]
print(f" Total rows in lessons table: {total}")
finally:
conn.close()
print(" Connection closed.")
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment