inegrate pdf uploed

parent a558073c
...@@ -61,8 +61,13 @@ services: ...@@ -61,8 +61,13 @@ services:
DB_HOST: "${DB_HOST}" DB_HOST: "${DB_HOST}"
TTS_PROVIDER: "${TTS_PROVIDER}" TTS_PROVIDER: "${TTS_PROVIDER}"
CUSTOM_TTS_URL: "${CUSTOM_TTS_URL}" CUSTOM_TTS_URL: "${CUSTOM_TTS_URL}"
GEMINI_API_KEY: "${GEMINI_API_KEY}"
REDIS_HOST: "redis" REDIS_HOST: "redis"
REDIS_PORT: "6379" REDIS_PORT: "6379"
volumes:
- ./voice_agent/embeddings:/app/embeddings
- ./voice_agent/All_Curriculums_grouped.json:/app/All_Curriculums_grouped.json
depends_on: depends_on:
- minio - minio
- postgres - postgres
......
...@@ -3,8 +3,12 @@ FROM python:3.10-slim ...@@ -3,8 +3,12 @@ FROM python:3.10-slim
WORKDIR /app WORKDIR /app
# Install postgresql-client for pg_isready # Install postgresql-client for pg_isready
RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y --no-install-recommends \
postgresql-client \ postgresql-client \
tesseract-ocr \
tesseract-ocr-ara \
libtesseract-dev \
poppler-utils \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install Python dependencies # Install Python dependencies
......
import os import os
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request import shutil
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, Response from fastapi.responses import FileResponse, Response
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
...@@ -8,15 +9,23 @@ from typing import Optional ...@@ -8,15 +9,23 @@ from typing import Optional
import uvicorn import uvicorn
import base64 import base64
from pathlib import Path from pathlib import Path
import tempfile
import json
import pandas as pd
from curriculum_structure import convert_json_to_db_format
from process_pdf_pipline import run_full_pipeline
# Import your existing modules # Import your existing modules
from core import AppConfig from core import AppConfig
from repositories import MinIOStorageRepository from repositories import MinIOStorageRepository
from services import ( from services import (
AudioService, ChatService, HealthService, ResponseService, AudioService, ChatService, HealthService, ResponseService,
ResponseManager, OpenAIService, AgentService, ConnectionPool, LanguageSegmentationService ResponseManager, OpenAIService, AgentService, ConnectionPool, LanguageSegmentationService,
DataIngestionService
) )
class DIContainer: class DIContainer:
def __init__(self): def __init__(self):
self.config = AppConfig.from_env() self.config = AppConfig.from_env()
...@@ -36,6 +45,9 @@ class DIContainer: ...@@ -36,6 +45,9 @@ class DIContainer:
print(os.getenv("DB_HOST"), os.getenv("POSTGRES_DB"), os.getenv("POSTGRES_USER")) print(os.getenv("DB_HOST"), os.getenv("POSTGRES_DB"), os.getenv("POSTGRES_USER"))
self.agent_service = AgentService(pool_handler=self.pool_handler) self.agent_service = AgentService(pool_handler=self.pool_handler)
self.data_ingestion_service = DataIngestionService(pool_handler=self.pool_handler)
# Initialize services # Initialize services
self.audio_service = AudioService(self.storage_repo, self.config.minio_bucket) self.audio_service = AudioService(self.storage_repo, self.config.minio_bucket)
self.segmentation_service = LanguageSegmentationService() self.segmentation_service = LanguageSegmentationService()
...@@ -51,6 +63,16 @@ class DIContainer: ...@@ -51,6 +63,16 @@ class DIContainer:
self.health_service = HealthService(self.storage_repo, self.config) self.health_service = HealthService(self.storage_repo, self.config)
def run_processing_pipeline(pdf_path: str, grade: int, subject: str) -> tuple[str, str]:
"""
Runs the full PDF processing pipeline and returns paths to the generated CSV and JSON files.
"""
temp_json_path = "temp_json.json"
temp_csv_path = "temp_embeddings.csv"
run_full_pipeline(pdf_path, grade, subject, temp_json_path, temp_csv_path, remove_lessons=True)
return temp_csv_path, temp_json_path
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
""" """
...@@ -99,10 +121,113 @@ def create_app() -> FastAPI: ...@@ -99,10 +121,113 @@ def create_app() -> FastAPI:
) )
# Serve static files if the directory exists def process_pdf_curriculum_in_background(pdf_bytes: bytes, original_filename: str, grade: int, subject: str):
static_path = Path("static") """
if static_path.exists(): Background task to process uploaded curriculum PDF.
app.mount("/static", StaticFiles(directory=static_path), name="static") This function runs asynchronously and won't block the API response.
"""
print(f"--- Background task started: Processing PDF '{original_filename}'. ---", flush=True)
pool_handler = None
try:
# --- Setup Paths ---
project_root = Path(__file__).parent
embeddings_dir = project_root / "embeddings"
main_json_path = project_root / "All_Curriculums_grouped.json"
embeddings_dir.mkdir(exist_ok=True)
# --- Create Dependencies ---
pool_handler = ConnectionPool(
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
host=os.getenv("DB_HOST", "postgres"),
port=int(os.getenv("DB_PORT", 5432))
)
ingestion_service = DataIngestionService(pool_handler=pool_handler)
# --- 1. Save and Run Pipeline ---
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
temp_pdf.write(pdf_bytes)
temp_pdf_path = temp_pdf.name
print(f"--- Background task: Saved temp PDF to {temp_pdf_path} ---", flush=True)
temp_csv_path, temp_json_path = run_processing_pipeline(temp_pdf_path, grade, subject)
# --- 2. Save the generated CSV ---
csv_filename = Path(temp_csv_path).name
csv_dest_path = embeddings_dir / csv_filename
shutil.move(temp_csv_path, csv_dest_path)
print(f"--- Background task: Saved new embeddings to '{csv_dest_path}' ---", flush=True)
# ==========================================================
# === CORRECTED LOGIC: APPEND NEW CURRICULUM TO EXISTING ===
# ==========================================================
# --- 3. Read both JSON files ---
print("--- Background task: Reading generated JSON structure... ---", flush=True)
with open(temp_json_path, 'r', encoding='utf-8') as f:
new_structure_data = json.load(f)
print(f"--- Background task: New structure contains keys: {list(new_structure_data.keys())} ---", flush=True)
# Load existing main JSON or start with empty dict
try:
with open(main_json_path, 'r', encoding='utf-8') as f:
existing_structure_data = json.load(f)
print(f"--- Background task: Loaded existing structure with {len(existing_structure_data)} curricula ---", flush=True)
except FileNotFoundError:
print("--- Background task: Main JSON file not found. Creating new one. ---", flush=True)
existing_structure_data = {}
except json.JSONDecodeError:
print("--- Background task: Main JSON file corrupted. Starting fresh. ---", flush=True)
existing_structure_data = {}
# Append new curriculum keys to the existing structure
for curriculum_key, curriculum_content in new_structure_data.items():
if curriculum_key in existing_structure_data:
print(f"--- WARNING: Key '{curriculum_key}' already exists. Overwriting. ---", flush=True)
else:
print(f"--- Background task: Adding new curriculum '{curriculum_key}' to main JSON. ---", flush=True)
existing_structure_data[curriculum_key] = curriculum_content
# Write the updated data back to the file
with open(main_json_path, 'w', encoding='utf-8') as f:
json.dump(existing_structure_data, f, indent=2, ensure_ascii=False)
print(f"--- Background task: Main JSON now contains {len(existing_structure_data)} curricula ---", flush=True)
# ==========================================================
# --- 4. Ingest structure into DB ---
print("--- Background task: Ingesting new structure into DB... ---", flush=True)
db_formatted_structure = convert_json_to_db_format(new_structure_data)
ingestion_service.ingest_curriculum_structure(db_formatted_structure)
# --- 5. Ingest embeddings into DB ---
print("--- Background task: Ingesting new embeddings into DB... ---", flush=True)
embeddings_df = pd.read_csv(csv_dest_path)
ingestion_service.ingest_embeddings_from_csv(embeddings_df)
print("--- Background task: Verifying database insertions... ---", flush=True)
from services.pgvector_service import PGVectorService
pgvector_service = PGVectorService(pool_handler)
pgvector_service.verify_recent_insertions()
# --- 6. Cleanup ---
os.unlink(temp_pdf_path)
os.unlink(temp_json_path)
print("--- Background task: Cleaned up temporary files ---", flush=True)
print("--- ✅ Background task completed successfully. ---", flush=True)
except Exception as e:
import traceback
print(f"--- ❌ FATAL ERROR in background task: {e} ---", flush=True)
print(f"--- Traceback: {traceback.format_exc()} ---", flush=True)
finally:
if pool_handler:
pool_handler.close_all()
print("--- Background task: Database connection pool closed. ---", flush=True)
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
...@@ -128,6 +253,23 @@ def create_app() -> FastAPI: ...@@ -128,6 +253,23 @@ def create_app() -> FastAPI:
print(f"Error serving audio recorder: {e}") print(f"Error serving audio recorder: {e}")
raise HTTPException(status_code=500, detail=f"Error serving interface: {str(e)}") raise HTTPException(status_code=500, detail=f"Error serving interface: {str(e)}")
@app.get("/curriculum-upload")
async def serve_curriculum_upload():
"""Serve the curriculum upload HTML file"""
try:
static_file = Path("static/curriculum_PDF_uploader.html")
if static_file.exists():
return FileResponse(static_file)
current_file = Path("curriculum_PDF_uploader.html")
if current_file.exists():
return FileResponse(current_file)
raise HTTPException(status_code=404, detail="Curriculum upload interface not found")
except Exception as e:
print(f"Error serving curriculum upload interface: {e}")
raise HTTPException(status_code=500, detail=f"Error serving interface: {str(e)}")
@app.post("/chat") @app.post("/chat")
async def chat_handler( async def chat_handler(
request: Request, request: Request,
...@@ -167,10 +309,32 @@ def create_app() -> FastAPI: ...@@ -167,10 +309,32 @@ def create_app() -> FastAPI:
print(f"Error getting audio response: {str(e)}") print(f"Error getting audio response: {str(e)}")
raise HTTPException(status_code=500, detail=f"Audio response error: {str(e)}") raise HTTPException(status_code=500, detail=f"Audio response error: {str(e)}")
@app.options("/chat") @app.post("/process-curriculum", status_code=202)
async def chat_options(): async def process_curriculum_webhook(
"""Handle preflight CORS requests for chat endpoint""" background_tasks: BackgroundTasks,
return Response(status_code=204, headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "*"}) grade: int = Form(...),
subject: str = Form(...),
file: UploadFile = File(...)
):
"""
Accepts a PDF and adds a background task to process it.
Returns immediately.
"""
pdf_bytes = await file.read()
# --- THIS IS THE FIX ---
# We now pass BOTH required arguments to the add_task method.
background_tasks.add_task(
process_pdf_curriculum_in_background,
pdf_bytes,
file.filename,
grade,
subject
)
# Return immediately to the user
return {"status": "processing_started", "message": "The curriculum is being processed in the background."}
@app.options("/get-audio-response") @app.options("/get-audio-response")
async def audio_response_options(): async def audio_response_options():
......
"""
!apt-get install -y tesseract-ocr
!apt-get install -y tesseract-ocr-ara
!apt-get install -y libtesseract-dev
!apt-get install -y poppler-utils
!pip install pdf2image pytesseract pandas fuzzywuzzy python-Levenshtein tqdm
!apt-get install -y poppler-utils tesseract-ocr tesseract-ocr-ara
!pip install pdf2image pytesseract google-genai pydantic opencv-python-headless
"""
import os
import cv2
import pytesseract
import pandas as pd
from pdf2image import convert_from_path, pdfinfo_from_path
import re
from tqdm import tqdm
import json
import numpy as np
from openai import OpenAI
from typing import List, Dict, Union, Any, Optional, Tuple
from pydantic import BaseModel, Field
import google.generativeai as genai
from google.generativeai import types
import csv
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# You can change level=logging.DEBUG for more verbose output during debugging
# =========================
# 1. Initialization and Setup
# =========================
class Config:
GEMINI_MODEL = "gemini-2.5-flash"
OPENAI_EMBEDDING_MODEL = "text-embedding-3-small"
OPENAI_EMBEDDING_DIMENSION = 1536 # Dimension for text-embedding-3-small
CHUNK_SIZE = 500
OCR_DPI = 300
BATCH_SIZE = 50 # For OCR processing (not used in current logic but good to keep)
ARABIC_CHAR_THRESHOLD = 0.3 # Percentage of Arabic chars to consider text Arabic
TOC_DETECTION_PAGES = 10 # Number of initial pages to scan for TOC
HEADER_FOOTER_CROP_PERCENT = 0.05 # Crop 5% from top/bottom to remove headers/footers for better OCR on main content
MAX_SAMPLE_PAGES_FOR_GEMINI = 40 # Max pages to send to Gemini for structure inference
MAX_CHARS_FOR_GEMINI_SAMPLE = 20000 # Max characters for sample pages in Gemini prompt
# Initialize Gemini Client
gemini_client = None
try:
# Read Gemini API key from environment (accepts GIMINI_API_KEY or GEMINI_API_KEY)
gemini_api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GEMINI_API_KEY")
if gemini_api_key:
gemini_api_key = gemini_api_key.strip()
if not gemini_api_key:
raise ValueError("GEMINI_API_KEY not set. Please set env var GIMINI_API_KEY or GEMINI_API_KEY.")
genai.configure(api_key=gemini_api_key)
test_model = genai.GenerativeModel(Config.GEMINI_MODEL)
test_response = test_model.generate_content("test")
if test_response.text:
gemini_client = test_model
logging.info("✅ Gemini client initialized successfully.")
else:
raise ValueError("Gemini API key initialized but failed a simple test.")
except Exception as e:
logging.error(f"❌ Failed to initialize Gemini client: {e}")
# Initialize OpenAI Client
openai_client = None
try:
# Read OpenAI API key from environment
openai_api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_API")
if openai_api_key:
openai_api_key = openai_api_key.strip()
if not openai_api_key:
raise ValueError("OPENAI_API_KEY not set. Please set the OPENAI_API_KEY environment variable.")
openai_client = OpenAI(api_key=openai_api_key)
test_embedding_response = openai_client.embeddings.create(
model=Config.OPENAI_EMBEDDING_MODEL,
input="test",
dimensions=Config.OPENAI_EMBEDDING_DIMENSION,
encoding_format="float"
)
if test_embedding_response.data and test_embedding_response.data[0].embedding:
logging.info("✅ OpenAI client initialized successfully and API key is valid for embeddings.")
else:
raise ValueError("OpenAI API key initialized but failed a simple embedding test.")
except Exception as e:
logging.error(f"❌ Failed to initialize OpenAI client or API key is invalid: {e}")
if not gemini_client or not openai_client:
logging.warning("🚨 Warning: One or more AI clients failed to initialize. Some pipeline steps may not work.")
# =========================
# 2. Text Cleaning Functions
# =========================
def clean_text_for_ai(text: str) -> str:
"""Removes non-alphanumeric, extra spaces, and specific Unicode for general OCR output."""
text = str(text)
# Allow Arabic, English, numbers, and basic punctuation
text = re.sub(r"[^ \u0600-\u06FFa-zA-Z0-9\.\,\?\!\:\;\-\(\)\/%]", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def deep_clean_text_arabic(text: str) -> str:
"""Further cleans Arabic text, removing long numbers, character repetitions, and short lines."""
text = str(text)
text = re.sub(r"\b\d{4,}\b", " ", text) # Remove numbers >= 4 digits
text = re.sub(r"(.)\1{2,}", r"\1\1", text) # Reduce >2 repetitions (e.g., ااالله -> الله)
lines = text.split('\n')
# Remove lines with only numbers/symbols or very short lines (less than 3 words)
lines = [line.strip() for line in lines if not re.fullmatch(r"[\d\W]+", line.strip()) and len(line.strip().split()) > 2]
return " ".join(lines).strip()
def super_clean_text_en(text: str) -> str:
"""Further cleans English text, removing non-ASCII, repeated words, long words, and duplicate sentences."""
text = str(text)
text = re.sub(r"[^a-zA-Z0-9\s\.\,\?\!\:\;\-\(\)\/%]", " ", text) # Keep only standard English chars
text = re.sub(r"\b(\w+)( \1){2,}\b", r"\1", text) # Remove repeated words (e.g., the the the -> the)
text = re.sub(r"(.)\1{2,}", r"\1\1", text) # Reduce >2 char repetitions (e.g., helllo -> hello)
text = re.sub(r"(\d)\1{2,}", r"\1\1\1", text) # Allow up to 3 digit repetitions
words = text.split()
cleaned_words = [w for w in words if len(w) < 15] # Remove very long words (often OCR errors)
text = " ".join(cleaned_words)
text = re.sub(r"\s+", " ", text) # Normalize whitespace
# Remove duplicate sentences (basic approach)
lines = text.split(". ")
seen = set()
cleaned_lines = []
for line in lines:
l = line.strip()
if l and l not in seen:
cleaned_lines.append(l)
seen.add(l)
text = ". ".join(cleaned_lines)
return text.strip()
# =========================
# 3. OCR and Language/Grade Detection
# =========================
def detect_language_from_text(text: str) -> str:
"""Detects primary language (Arabic or English) based on character presence."""
# Count Arabic characters
arabic_chars = len(re.findall(r"[\u0600-\u06FF]", text))
# Count English/Latin characters
english_chars = len(re.findall(r"[a-zA-Z]", text))
if arabic_chars > english_chars * 0.5: # If Arabic chars are significantly more
return "arabic"
elif english_chars > arabic_chars * 0.5: # If English chars are significantly more
return "english"
return "unknown"
def detect_grade_from_text(text: str) -> str:
"""Extracts grade information from text (e.g., 'Grade 4', 'الصف الرابع')."""
# Expanded regex for more patterns
grade_patterns = [
r"(الصف\s+(?:الأول|الثاني|الثالث|الرابع|الخامس|السادس|السابع|الثامن|التاسع|العاشر|الحادي عشر|الثاني عشر))", # الصف الأول، الثاني،...
r"(Grade\s+\d+)",
r"(Primary\s+\d+)",
r"(prim\s*\d+)",
r"(K\d)", # K5, K6 (Kindergarten)
r"(Pre-?\s*K)", # Pre-K
r"(Year\s+\d+)" # Year 1, Year 2
]
for pattern in grade_patterns:
grade_match = re.search(pattern, text, re.IGNORECASE)
if grade_match:
detected = grade_match.group(0).strip()
# Normalize common patterns
if "prim" in detected.lower():
return "Grade " + re.search(r'\d+', detected).group(0)
if "primary" in detected.lower():
return detected.replace("Primary", "Grade")
if "year" in detected.lower():
return detected.replace("Year", "Grade")
return detected
return "unknown"
def ocr_image_with_cropping(image_np: np.ndarray, lang_code: str) -> str:
"""Performs OCR on an image, optionally cropping headers/footers."""
h, w = image_np.shape[:2]
crop_h = int(h * Config.HEADER_FOOTER_CROP_PERCENT)
cropped_img = image_np[crop_h : h - crop_h, :] # Crop top and bottom
gray = cv2.cvtColor(cropped_img, cv2.COLOR_RGB2GRAY)
_, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
text = pytesseract.image_to_string(thresh, lang=lang_code)
return text
def process_pdf_to_text(pdf_path: str) -> Tuple[Dict[int, str], str, str, Dict[str, int]]:
"""
OPTIMIZED VERSION: Converts PDF and performs OCR page by page to save memory.
"""
if not os.path.exists(pdf_path):
logging.error(f"PDF file not found at: {pdf_path}")
raise FileNotFoundError(f"PDF file not found at: {pdf_path}")
logging.info("📄 Getting PDF info...")
try:
# Get total page count without loading the whole file
info = pdfinfo_from_path(pdf_path)
total_pages = info['Pages']
except Exception as e:
logging.critical(f"❌ Failed to get PDF info. Error: {e}")
raise
logging.info(f"📄 PDF has {total_pages} pages. Starting page-by-page OCR...")
page_texts_cleaned = {}
initial_language = "unknown"
initial_grade = "unknown"
tracked_titles_with_pages: Dict[str, int] = {}
# Iterate through the PDF one page at a time
for i in tqdm(range(1, total_pages + 1), desc="Page-by-Page OCR & Analysis"):
try:
# convert_from_path can process a single page
page_image = convert_from_path(pdf_path, dpi=Config.OCR_DPI, first_page=i, last_page=i)[0]
except Exception as e:
logging.warning(f"⚠️ Could not process page {i}. Skipping. Error: {e}")
continue
img_np = np.array(page_image)
# Use a broad language pack for initial detection passes
text = ocr_image_with_cropping(img_np, lang_code="ara+eng")
cleaned_text = clean_text_for_ai(text)
# On the first few pages, perform detection
if i <= Config.TOC_DETECTION_PAGES:
if initial_language == "unknown" and cleaned_text:
initial_language = detect_language_from_text(cleaned_text)
if initial_language != "unknown":
logging.info(f"🌐 Initial Language Detected: {initial_language} on page {i}")
if initial_grade == "unknown" and cleaned_text:
initial_grade = detect_grade_from_text(cleaned_text)
if initial_grade != "unknown":
logging.info(f"🎓 Initial Grade Detected: {initial_grade} on page {i}")
# --- Title Tracking Logic (Unchanged) ---
if initial_language == "arabic":
title_patterns = [r"(الوحدة\s+...)", r"(المفهوم\s+...)", r"(الدرس\s+...)", r"(مقدمة|...)"]
else:
title_patterns = [r"(Unit\s+\d+)", r"(Concept\s+\d+)", r"(Lesson\s+...)", r"(Introduction|...)"]
text_lines = text.split('\n')
for line in text_lines[:20]:
for pattern in title_patterns:
match = re.search(pattern, line, re.IGNORECASE)
if match:
title = match.group(0).strip()
if title not in tracked_titles_with_pages:
tracked_titles_with_pages[title] = i
logging.debug(f"Tracked title '{title}' on page {i}")
break
# Apply deep cleaning based on the detected language
final_cleaned_text = cleaned_text
lang_for_deep_clean = initial_language if initial_language != "unknown" else "english" # Default
if lang_for_deep_clean == "arabic":
final_cleaned_text = deep_clean_text_arabic(cleaned_text)
elif lang_for_deep_clean == "english":
final_cleaned_text = super_clean_text_en(cleaned_text)
if final_cleaned_text:
page_texts_cleaned[i] = final_cleaned_text
# The page_image and img_np are automatically discarded at the end of the loop, saving memory.
# Final fallback for language detection
if initial_language == "unknown":
initial_language = "english"
logging.warning("⚠️ Language could not be detected, defaulting to English.")
logging.info(f"✅ OCR and cleaning complete. Processed {len(page_texts_cleaned)} pages.")
logging.info(f"✨ Tracked titles: {tracked_titles_with_pages}")
return page_texts_cleaned, initial_language, initial_grade, tracked_titles_with_pages
# =========================
# 4. Table of Contents (TOC) Extraction
# =========================
def extract_toc_pages_from_first_n(page_texts: Dict[int, str], language: str, n_pages: int = Config.TOC_DETECTION_PAGES) -> Dict[int, str]:
"""
Identifies TOC pages within the first N pages and extracts their content.
Returns a dictionary of {page_number: toc_text}.
"""
toc_content = {}
toc_keywords = {
'arabic': ["قائمة المحتوى", "المحتويات", "فهرس", "محور", "جدول المحتويات"],
'english': ["Table of Contents", "Contents", "Index", "Overview"]
}
for page_num in range(1, min(n_pages + 1, len(page_texts) + 1)):
text = page_texts.get(page_num, "")
is_toc_page = False
if language in toc_keywords:
if any(keyword in text for keyword in toc_keywords[language]):
is_toc_page = True
if is_toc_page:
toc_content[page_num] = text
if toc_content:
logging.info(f"📚 TOC pages detected: {list(toc_content.keys())}")
else:
logging.warning(f"⚠️ No TOC pages explicitly detected within the first {n_pages} pages.")
return toc_content
# =========================
# 5. Gemini-powered Structure Extraction (Pydantic Models)
# =========================
class PageRange(BaseModel):
start_page: int = 0
end_page: int = 0
class Lesson(BaseModel):
lesson_name: str
pages: Optional[PageRange] = None
class Concept(BaseModel):
concept_name: str
pages: Optional[PageRange] = None
lessons: List[Lesson] = Field(default_factory=list)
class Unit(BaseModel):
unit_name: str
pages: Optional[PageRange] = None
concepts: List[Concept] = Field(default_factory=list)
other_sections: Dict[str, Optional[PageRange]] = Field(default_factory=dict)
class BookStructure(BaseModel):
units: List[Unit]
grade: str
language: str
def extract_structure_with_gemini(
toc_texts: Dict[int, str],
full_page_texts: Dict[int, str],
lang: str,
grade: str,
tracked_titles_with_pages: Dict[str, int],
pdf_total_pages: int
) -> Dict[str, Any]:
"""
Asks Gemini to extract the book's structure based on TOC, sample pages, and tracked titles.
Returns a dictionary conforming to the Pydantic BookStructure.
"""
if not gemini_client:
logging.error("❌ Gemini client (model) not initialized. Cannot extract structure.")
return None
toc_prompt_text = "\n\n".join([f"📄 Page {k}\n{text}" for k, text in toc_texts.items()])
if not toc_prompt_text:
toc_prompt_text = "No explicit Table of Contents found. Infer structure from content."
# Prepare tracked titles for Gemini
tracked_titles_str = "\n".join([f"- '{title}': Page {page_num}" for title, page_num in tracked_titles_with_pages.items()])
if not tracked_titles_str:
tracked_titles_str = "No specific section titles were pre-identified with page numbers."
sample_pages_list = []
current_char_count = 0
# Provide a more extensive sample of pages to Gemini for better context
for page_num in sorted(full_page_texts.keys()):
text = full_page_texts[page_num]
if current_char_count + len(text) <= Config.MAX_CHARS_FOR_GEMINI_SAMPLE and page_num <= Config.MAX_SAMPLE_PAGES_FOR_GEMINI:
sample_pages_list.append(f"📄 Page {page_num}\n{text}")
current_char_count += len(text)
else:
break
sample_pages_prompt_text = "\n\n".join(sample_pages_list)
if not sample_pages_prompt_text:
logging.warning("No sample page text available for Gemini context.")
# --- Step 1: Initial Structure Extraction ---
# We ask Gemini to generate an initial structure.
initial_prompt = f"""
You are an extremely meticulous expert at analyzing {lang} school textbooks to extract their hierarchical structure.
Your task is to identify Units, Concepts, Lessons, and other sections, along with their exact start and end page numbers.
**Crucially, assign page ranges for UNITS and CONCEPTS by determining the start of the first lesson/concept and the end of the last lesson/concept within them, if no explicit range is given for the parent.**
Pay extremely close attention to titles, headings, and page number references in the provided text.
Infer page ranges for all elements as accurately as possible.
**Definitions and Page Range Rules:**
- **Unit:** The highest level (e.g., "Unit 1", "الوحدة الأولى"). `start_page` is where the unit title first appears. `end_page` is the page *before* the next unit starts, or the last page of the book if it's the final unit ({pdf_total_pages}).
- **Concept:** A sub-division within a Unit (e.g., "Concept 1", "المفهوم الأول"). If it has lessons, its `start_page` is the `start_page` of its first lesson, and its `end_page` is the `end_page` of its last lesson. If no lessons, its `start_page` is where its title appears, and `end_page` is the page *before* the next section.
- **Lesson:** A specific topic within a Concept (e.g., "Lesson 1.1", "الدرس 1.1"). `start_page` is where its title appears. `end_page` is the page *before* the next lesson or section.
- **Other Sections:** Introductory/concluding elements within a Unit (e.g., "Get Started", "مقدمة", "Unit Project", "مشروع الوحدة"). `start_page` where its title appears, `end_page` is page *before* next section.
Output must be ONLY a valid JSON object (no markdown, no extra text) conforming to this exact Pydantic structure definition:
{json.dumps(BookStructure.model_json_schema(), indent=2)}
--- Input Context ---
- **Total Pages in PDF:** {pdf_total_pages}
- **Grade:** {grade}
- **Language:** {lang}
- **Pre-identified Section Titles with Pages (strong hints, but verify):**
{tracked_titles_str}
--- Table of Contents (TOC) for primary clues ---
{toc_prompt_text}
--- Sample Pages (for content context and page number validation) ---
{sample_pages_prompt_text}
"""
logging.info("🧠 Asking Gemini for initial structure extraction...")
try:
resp_initial = gemini_client.generate_content(
contents=initial_prompt,
generation_config=types.GenerationConfig(response_mime_type="application/json")
)
initial_json_string = resp_initial.text.strip()
initial_data = json.loads(initial_json_string)
# Add internal fields
initial_data['_page_count'] = pdf_total_pages
initial_data['_source_file'] = "N/A" # Placeholder
book_structure = BookStructure(**initial_data)
logging.info("✅ Gemini initial structure extraction successful.")
# --- Step 2: Refinement/Correction Prompt ---
# Now, we give Gemini the initial structure and ask it to refine it
# using the TOC and page tracking as explicit correction tools.
refinement_prompt = f"""
You are an expert at validating and correcting hierarchical book structures.
I have an initial structure extracted from a {lang} textbook (Grade: {grade}), but it needs careful review and correction, especially for page ranges and the completeness of sections.
**Instructions for Refinement:**
1. **Strictly adhere to the Pydantic schema.** Ensure all fields are present and types are correct.
2. **Verify all page ranges.** Ensure `start_page` is always less than or equal to `end_page`. If `start_page` is 0, attempt to find a valid page. If `end_page` is missing or invalid, infer it as the page *before* the next section, or the total pages of the book ({pdf_total_pages}) if it's the last section.
3. **Cross-reference with TOC and Tracked Titles:**
* If the `start_page` for a Unit, Concept, or Lesson in the initial structure conflicts with a `start_page` explicitly mentioned in the provided TOC or `Pre-identified Section Titles`, **prioritize the TOC/Pre-identified value**.
* Use the TOC content to identify any *missing* Units, Concepts, or Lessons that should be included. If found, add them with inferred page ranges.
4. **Sequential Page Range Logic:**
* `end_page` of a section should ideally be `start_page - 1` of the *next* logical section (at the same or higher hierarchical level).
* For parent elements (Units, Concepts), if their `pages` are not explicitly defined, ensure they span from the `start_page` of their first child to the `end_page` of their last child.
5. **Completeness:** Ensure all significant Units, Concepts, Lessons, and 'other_sections' that logically exist in the book are present in the final structure. Use the TOC and tracked titles as primary guides for completeness.
6. **Do NOT include any markdown or extra text outside the JSON.**
--- Input for Refinement ---
- **Total Pages in PDF:** {pdf_total_pages}
- **Grade:** {grade}
- **Language:** {lang}
- **Initial Structure (to be refined):**
{json.dumps(book_structure.model_dump(by_alias=True), indent=2, ensure_ascii=False)}
- **Table of Contents (Primary Reference):**
{toc_prompt_text}
- **Pre-identified Section Titles with Pages (Strong Hints):**
{tracked_titles_str}
- **Sample Pages (for contextual validation):**
{sample_pages_prompt_text}
"""
logging.info("🧠 Asking Gemini for structure refinement...")
resp_refined = gemini_client.generate_content(
contents=refinement_prompt,
generation_config=types.GenerationConfig(response_mime_type="application/json")
)
refined_json_string = resp_refined.text.strip()
refined_data = json.loads(refined_json_string)
book_structure_refined = BookStructure(**refined_data)
logging.info("✅ Gemini refined structure extraction successful.")
return book_structure_refined.model_dump(by_alias=True)
except json.JSONDecodeError as e:
logging.error(f"❌ Gemini structure extraction/refinement failed due to JSON decoding error: {e}")
if 'resp_initial' in locals() and hasattr(resp_initial, 'text'):
logging.error(f"Gemini initial raw response (non-JSON): {resp_initial.text}")
if 'resp_refined' in locals() and hasattr(resp_refined, 'text'):
logging.error(f"Gemini refined raw response (non-JSON): {resp_refined.text}")
return None
except Exception as e:
logging.error(f"❌ Gemini structure extraction/refinement failed: {e}")
if 'resp_initial' in locals() and hasattr(resp_initial, 'text'):
logging.error(f"Gemini initial raw response: {resp_initial.text}")
if 'resp_refined' in locals() and hasattr(resp_refined, 'text'):
logging.error(f"Gemini refined raw response: {e.response.text}") # type: ignore
return None
# =========================
# 6. Transform Structure to Desired JSON Format
# =========================
def transform_to_desired_json(structured_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Transforms the structured data from BookStructure Pydantic model
into the specific JSON format requested by the user.
"""
if not structured_data:
logging.warning("No structured data provided for transformation.")
return {}
output_json = {}
grade_key = structured_data.get("grade", "unknown") + " " + structured_data.get("language", "curriculum")
curriculum_data = {}
for unit_data in structured_data.get("units", []):
unit_name = unit_data.get("unit_name", "Unknown Unit")
unit_output = {}
# Handle 'other_sections' first
for section_name, pages in unit_data.get("other_sections", {}).items():
if pages:
unit_output[section_name] = [pages.get("start_page", 0), pages.get("end_page", 0)]
# Handle Concepts
concepts_output = {}
for concept_data in unit_data.get("concepts", []):
concept_name = concept_data.get("concept_name", "Unknown Concept")
concept_pages = concept_data.get("pages", {})
if concept_data.get("lessons"):
lessons_output = {}
for lesson_data in concept_data["lessons"]:
lesson_name = lesson_data.get("lesson_name", "Unknown Lesson")
pages = lesson_data.get("pages", {})
if pages:
lessons_output[lesson_name] = [pages.get("start_page", 0), pages.get("end_page", 0)]
concepts_output[concept_name] = lessons_output
else:
# If a concept has no lessons, it might have its own page range
if concept_pages:
concepts_output[concept_name] = [concept_pages.get("start_page", 0), concept_pages.get("end_page", 0)]
else:
concepts_output[concept_name] = {}
if concepts_output:
concepts_key = "Concepts" if structured_data.get("language") == "english" else "المفاهيم"
unit_output[concepts_key] = concepts_output
# Final order adjustment for Arabic: "مقدمة" (introduction) usually comes before "المفاهيم"
# and "مشروع الوحدة" (unit project) or "المشروع بيني التخصصات" (interdisciplinary project) comes after.
ordered_unit_output = {}
# Add 'مقدمة' or 'Introduction' first if present
intro_key_en = "Introduction"
intro_key_ar = "مقدمة"
if intro_key_en in unit_output:
ordered_unit_output[intro_key_en] = unit_output.pop(intro_key_en)
elif intro_key_ar in unit_output:
ordered_unit_output[intro_key_ar] = unit_output.pop(intro_key_ar)
elif "Get Started" in unit_output: # Also common for intro
ordered_unit_output["Get Started"] = unit_output.pop("Get Started")
# Then add 'المفاهيم' or 'Concepts'
concepts_key = "Concepts" if structured_data.get("language") == "english" else "المفاهيم"
if concepts_key in unit_output:
ordered_unit_output[concepts_key] = unit_output.pop(concepts_key)
# Add remaining 'other_sections' in a potentially sorted manner (e.g., projects last)
remaining_keys = [k for k in unit_output.keys()]
project_like_keys = []
other_non_project_keys = []
for key in remaining_keys:
if "مشروع" in key.lower() or "project" in key.lower() or "assessment" in key.lower() or "تقييم" in key.lower() or "review" in key.lower():
project_like_keys.append(key)
else:
other_non_project_keys.append(key)
# Sort non-project keys alphabetically
for key in sorted(other_non_project_keys):
ordered_unit_output[key] = unit_output[key]
# Sort project keys alphabetically
for key in sorted(project_like_keys):
ordered_unit_output[key] = unit_output[key]
curriculum_data[unit_name] = ordered_unit_output
output_json[grade_key] = curriculum_data
return output_json
def remove_lessons_from_json_structure(structured_data: Dict[str, Any]) -> Dict[str, Any]:
if not structured_data:
logging.warning("No structured data provided for lesson removal.")
return {}
cleaned = {}
for grade_key, units in structured_data.items():
cleaned[grade_key] = {}
for unit_name, unit_content in units.items():
new_unit = {}
for section, content in unit_content.items():
if ("المفاهيم" in section or "Concepts" in section) and isinstance(content, dict):
new_concepts = {}
for concept_name, concept_details in content.items():
# If concept_details is a list (meaning it has its own page range directly)
if isinstance(concept_details, list):
new_concepts[concept_name] = concept_details
else:
new_concepts[concept_name] = {} # Remove lesson details (or set to the concept's own page range if available)
new_unit[section] = new_concepts
else:
new_unit[section] = content
cleaned[grade_key][unit_name] = new_unit
logging.info("✅ Lessons removed from the transformed JSON structure.")
return cleaned
# =========================
# 7. Embedding Generation (from your previous script)
# =========================
class EmbeddingProcessor:
def __init__(self, client: OpenAI = None):
if client is None:
raise ValueError("OpenAI client must be initialized and passed to EmbeddingProcessor.")
self.client = client
self.embedding_model = Config.OPENAI_EMBEDDING_MODEL
def chunk_text(self, text: str, is_arabic: bool = False) -> List[str]:
"""Split text into chunks of approximately Config.CHUNK_SIZE words, preserving sentence boundaries."""
if not text or pd.isna(text):
return [""]
text = str(text).strip()
if not text:
return [""]
sentence_pattern = r'(?<=[.!?؟،؛:؛.«»“”‘’\'`"\-])\s+' if is_arabic else r'(?<=[.!?])\s+'
sentences = re.split(sentence_pattern, text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
current_chunk = []
current_word_count = 0
for sentence in sentences:
sentence_words = len(sentence.split())
if current_word_count + sentence_words > Config.CHUNK_SIZE and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_word_count = sentence_words
else:
current_chunk.append(sentence)
current_word_count += sentence_words
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks if chunks else [""]
def get_embedding(self, text: str) -> List[float]:
"""Generate embedding for the given text using OpenAI."""
if not self.client:
logging.error("❌ OpenAI client not initialized. Cannot generate embeddings.")
return [0.0] * Config.OPENAI_EMBEDDING_DIMENSION
try:
text = str(text).strip()
if not text:
text = "empty"
logging.debug(f"Sending text for embedding: '{text[:100]}...'")
response = self.client.embeddings.create(
model=self.embedding_model,
input=text,
dimensions=Config.OPENAI_EMBEDDING_DIMENSION,
encoding_format="float"
)
if response.data and response.data[0].embedding:
embedding = response.data[0].embedding
if all(val == 0.0 for val in embedding):
logging.warning(f"⚠️ Received all-zero embedding for text: '{text[:100]}...'")
return embedding
else:
logging.error(f"❌ OpenAI API returned no embedding data for text: '{text[:100]}...'")
return [0.0] * Config.OPENAI_EMBEDDING_DIMENSION
except Exception as e:
logging.error(f"❌ Error generating embedding for text: '{text[:100]}...' - {str(e)}", exc_info=True)
return [0.0] * Config.OPENAI_EMBEDDING_DIMENSION
def detect_arabic_text(self, text: str) -> bool:
"""Simple detection of Arabic text based on character ranges."""
if not text or pd.isna(text):
return False
text = str(text)
arabic_chars = 0
total_chars = 0
for char in text:
if char.strip():
total_chars += 1
if ('\u0600' <= char <= '\u06FF') or ('\u0750' <= char <= '\u077F') or \
('\u08A0' <= char <= '\u08FF') or ('\uFB50' <= char <= '\uFDFF') or \
('\uFE70' <= char <= '\uFEFF'):
arabic_chars += 1
return total_chars > 0 and (arabic_chars / total_chars) > Config.ARABIC_CHAR_THRESHOLD
def process_structured_data_for_embeddings(
self,
structured_data: Dict[str, Any],
page_texts: Dict[int, str],
lang: str,
grade: int,
subject: str,
output_csv_path: str
):
"""
Takes Gemini-extracted data and generates embeddings.
This version has a more robust repair logic for missing page numbers.
"""
if not structured_data or not structured_data.get("units"):
logging.warning("❌ No structured units found for embedding.")
return
logging.info(f"Generating robust embeddings for grade {grade}, {lang} content...")
output_rows = []
is_arabic = (lang == "arabic")
total_pages = max(page_texts.keys()) if page_texts else 0
last_known_page = 0
for unit_data in tqdm(structured_data.get("units", []), desc="Processing Units for Embeddings"):
unit_name = unit_data.get("unit_name", "Unknown Unit")
for concept_data in unit_data.get("concepts", []):
concept_name = concept_data.get("concept_name", "Unknown Concept")
for lesson_data in concept_data.get("lessons", []):
lesson_name = lesson_data.get("lesson_name", "Unknown Lesson")
pages = lesson_data.get("pages")
start_page, end_page = 0, 0
if pages and pages.get('start_page', 0) > 0 and pages.get('end_page', 0) > 0:
# This is the "happy path": Gemini gave us valid pages
start_page, end_page = pages['start_page'], pages['end_page']
else:
# --- THIS IS THE NEW, SMARTER REPAIR LOGIC ---
# Suggest a start page based on the last known page
suggested_start = last_known_page + 1
# If our suggestion is already past the end of the book, we can't continue.
if suggested_start > total_pages:
logging.warning(f" -> Skipping Lesson '{lesson_name}': Suggested start ({suggested_start}) is beyond total pages ({total_pages}).")
continue # Move to the next lesson
start_page = suggested_start
# Ensure the end page is at least the start page, and not past the end of the book.
end_page = min(start_page + 4, total_pages)
logging.warning(f" -> Repairing Lesson '{lesson_name}': Applying default pages [{start_page}-{end_page}]")
# Final safety check, although the logic above should prevent this.
if start_page > end_page:
logging.error(f" -> CRITICAL SKIP for Lesson '{lesson_name}': Invalid final page range [{start_page}-{end_page}]")
continue
# Update the tracker for the next iteration with a valid page number
last_known_page = end_page
lesson_full_text = " ".join([page_texts.get(p, "") for p in range(start_page, end_page + 1)])
for chunk_idx, chunk_text in enumerate(self.chunk_text(lesson_full_text, is_arabic)):
if not chunk_text: continue
output_rows.append({
'Grade': grade, 'Subject': subject, 'Unit': unit_name, 'Concept': concept_name,
'Lesson': lesson_name, 'From page': start_page, 'To page': end_page,
'Chunk index': chunk_idx, 'Chunk text': chunk_text, 'Is Arabic': is_arabic,
'Embedding': json.dumps(self.get_embedding(chunk_text))
})
df = pd.DataFrame(output_rows)
df.to_csv(output_csv_path, index=False, quoting=csv.QUOTE_MINIMAL, encoding="utf-8-sig")
logging.info(f"✅ Embeddings saved to: {output_csv_path} ({len(output_rows)} chunks generated)")
# process_pdf_pipline.py
def repair_and_enrich_structure(gemini_output: Dict, lang: str, grade: str, total_pages: int) -> Dict[str, Any]:
logging.warning("🔧 Sanitizing and repairing Gemini's output...")
gemini_output.setdefault('units', [])
gemini_output['language'] = lang
gemini_output['grade'] = grade
last_known_page = 0
for unit in gemini_output.get('units', []):
unit.setdefault('concepts', [])
unit.setdefault('other_sections', {})
# Repair nulls inside page objects
for item_with_pages in [unit] + unit.get('concepts', []) + [lesson for concept in unit.get('concepts', []) for lesson in concept.get('lessons', [])]:
if item_with_pages.get('pages') and isinstance(item_with_pages['pages'], dict):
if item_with_pages['pages'].get('start_page') is None: item_with_pages['pages']['start_page'] = 0
if item_with_pages['pages'].get('end_page') is None: item_with_pages['pages']['end_page'] = 0
# Repair completely missing page objects
for concept in unit.get('concepts', []):
concept.setdefault('lessons', [])
for lesson in concept.get('lessons', []):
if not lesson.get('pages'):
start_page = max(last_known_page + 1, 1)
end_page = min(start_page + 4, total_pages)
lesson['pages'] = {"start_page": start_page, "end_page": end_page}
logging.info(f" -> Repaired Lesson '{lesson.get('lesson_name')}': Set default pages [{start_page}-{end_page}]")
last_known_page = lesson.get('pages', {}).get('end_page', last_known_page)
logging.info("🔧 Sanitization complete.")
return gemini_output
# =========================
# 8. Main Pipeline Function
# =========================
def run_full_pipeline(pdf_path: str, grade: int, subject: str, output_json_path: str, output_embeddings_csv_path: str, remove_lessons: bool = False):
logging.info(f"\n--- Starting Pipeline for {pdf_path} (Grade: {grade}, Subject: {subject}) ---")
gemini_raw_output = {}
try:
page_texts, lang,_, tracked_titles = process_pdf_to_text(pdf_path)
if not page_texts: return
pdf_total_pages = max(page_texts.keys())
toc_contents = extract_toc_pages_from_first_n(page_texts, lang)
gemini_raw_output = extract_structure_with_gemini(
toc_contents, page_texts, lang, str(grade), tracked_titles, pdf_total_pages
)
if not gemini_raw_output:
logging.critical("❌ CRITICAL: Gemini returned no data. Aborting.")
return
sanitized_data = repair_and_enrich_structure(gemini_raw_output, lang, str(grade), pdf_total_pages)
gemini_structured_data = None
try:
gemini_structured_data = BookStructure(**sanitized_data).model_dump()
logging.info("✅ Gemini output successfully sanitized and validated.")
except ValidationError as e:
logging.critical(f"❌ CRITICAL: Could not validate structure even after repair. Error: {e}. Aborting.")
return
final_json_structure = transform_to_desired_json(gemini_structured_data)
if remove_lessons:
final_json_structure = remove_lessons_from_json_structure(final_json_structure)
with open(output_json_path, 'w', encoding='utf-8') as f:
json.dump(final_json_structure, f, ensure_ascii=False, indent=2)
logging.info(f"✅ Final JSON saved to: {output_json_path}")
if openai_client:
embedding_processor = EmbeddingProcessor(client=openai_client)
embedding_processor.process_structured_data_for_embeddings(
gemini_structured_data,
page_texts,
lang,
grade,
subject,
output_embeddings_csv_path
)
except Exception as e:
logging.critical(f"Pipeline error: {e}", exc_info=True)
logging.info(f"\n--- Pipeline finished for {pdf_path} ---")
...@@ -13,3 +13,13 @@ python-dotenv ...@@ -13,3 +13,13 @@ python-dotenv
httpx httpx
langdetect langdetect
redis redis
pdf2image
pytesseract
fuzzywuzzy
python-Levenshtein
tqdm
google-generativeai
pydantic
opencv-python-headless
numpy
Pillow
\ No newline at end of file
...@@ -10,3 +10,4 @@ from .chat_database_service import ChatDatabaseService ...@@ -10,3 +10,4 @@ from .chat_database_service import ChatDatabaseService
from .connection_pool import ConnectionPool from .connection_pool import ConnectionPool
from .pedagogy_service import PedagogyService from .pedagogy_service import PedagogyService
from .segmentation_service import LanguageSegmentationService from .segmentation_service import LanguageSegmentationService
from .data_ingestion_service import DataIngestionService
\ No newline at end of file
import psycopg2
import pandas as pd
import json
from pgvector.psycopg2 import register_vector
from typing import Dict
class DataIngestionService:
"""A service dedicated to inserting new curriculum data into the database."""
def __init__(self, pool_handler):
self.pool_handler = pool_handler
def ingest_curriculum_structure(self, curriculum_json_data: Dict):
"""
Takes parsed JSON data for curriculum structure and inserts it into the DB.
This logic is adapted from your curriculum_structure.py script.
"""
print("Inserting curriculum structure data...")
# Use the connection pool for thread safety
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
for (grade, is_arabic, subject), curriculum in curriculum_json_data.items():
try:
cur.execute(
"""
INSERT INTO curriculum_structure (grade, is_arabic, subject, curriculum_data)
VALUES (%s, %s, %s, %s)
ON CONFLICT (grade, is_arabic, subject)
DO UPDATE SET curriculum_data = EXCLUDED.curriculum_data;
""",
(grade, is_arabic, subject, json.dumps(curriculum))
)
print(f"✅ Ingested structure for Grade {grade} ({'Arabic' if is_arabic else 'English'})")
except Exception as e:
print(f"❌ Error ingesting structure for Grade {grade}: {e}")
conn.rollback() # Rollback on error for this item
conn.commit()
print("Curriculum structure ingestion complete.")
def ingest_embeddings_from_csv(self, df: pd.DataFrame):
"""
Takes a pandas DataFrame of embeddings and inserts it into the DB.
This logic is adapted from your insert_csv_embeddings.py script.
"""
print("Inserting embeddings from CSV data...")
insert_query = """
INSERT INTO educational_chunks
(grade, subject, unit, concept, lesson, from_page, to_page, chunk_index, chunk_text, is_arabic, embedding)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
records_to_insert = []
for _, row in df.iterrows():
try:
# Assuming the intern's code provides the embedding as a list/string
embedding = json.loads(row["Embedding"]) if isinstance(row["Embedding"], str) else row["Embedding"]
records_to_insert.append((
int(row["Grade"]), row["Subject"], row.get("Unit"), row.get("Concept"),
row.get("Lesson"), int(row["From page"]), int(row["To page"]),
int(row["Chunk index"]), row["Chunk text"], bool(row["Is Arabic"]),
embedding
))
except Exception as e:
print(f"Skipping row due to malformed data: {e}")
if not records_to_insert:
print("No valid records to insert.")
return
with self.pool_handler.get_connection() as conn:
with conn.cursor() as cur:
# Use execute_batch for efficient insertion
psycopg2.extras.execute_batch(cur, insert_query, records_to_insert)
conn.commit()
print(f"✅ Ingested {len(records_to_insert)} embedding chunks successfully.")
\ No newline at end of file
...@@ -484,3 +484,43 @@ class PGVectorService: ...@@ -484,3 +484,43 @@ class PGVectorService:
ORDER BY grade, is_arabic, subject; ORDER BY grade, is_arabic, subject;
""") """)
return cur.fetchall() return cur.fetchall()
def verify_recent_insertions(self, limit: int = 5):
"""
Fetches and prints the most recently added educational chunks
to verify a successful ingestion.
"""
print("\n" + "="*50)
print("🔍 Verifying recent embeddings in the database...")
print("="*50)
try:
with self.pool_handler.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Fetches the 5 rows with the highest 'id' (most recent)
cur.execute(
"""
SELECT id, grade, subject, unit, concept, chunk_text, is_arabic
FROM educational_chunks
ORDER BY id DESC
LIMIT %s;
""",
(limit,)
)
results = cur.fetchall()
if not results:
print("❌ No data found in the 'educational_chunks' table.")
return
print(f"✅ Found {len(results)} recent records. Here they are:\n")
for row in results:
print(f" - ID: {row['id']}, Grade: {row['grade']}, Arabic: {row['is_arabic']}")
print(f" Unit: {row['unit']}")
print(f" Concept: {row['concept']}")
print(f" Text: '{row['chunk_text'][:80]}...'\n")
print("="*50)
except Exception as e:
print(f"❌ Database verification failed: {e}")
\ No newline at end of file
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Curriculum PDF Uploader</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
max-width: 700px;
margin: 40px auto;
padding: 20px;
background-color: #f9f9f9;
color: #333;
line-height: 1.6;
}
.container {
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 4px 15px rgba(0,0,0,0.1);
}
h1 {
text-align: center;
color: #2c3e50;
}
input[type="file"] {
display: block;
margin-bottom: 20px;
border: 2px dashed #ccc;
padding: 20px;
border-radius: 5px;
width: 95%;
text-align: center;
cursor: pointer;
}
input[type="file"]::file-selector-button {
padding: 10px 15px;
border-radius: 5px;
border: none;
background-color: #3498db;
color: white;
cursor: pointer;
transition: background-color 0.2s;
}
input[type="file"]::file-selector-button:hover {
background-color: #2980b9;
}
button {
display: block;
width: 100%;
padding: 12px;
font-size: 16px;
font-weight: bold;
background: #27ae60;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
transition: background 0.2s;
}
button:hover {
background: #229954;
}
button:disabled {
background: #95a5a6;
cursor: not-allowed;
}
.status {
margin-top: 20px;
padding: 15px;
border-radius: 5px;
font-weight: bold;
display: none; /* Hidden by default */
}
.status.success { background-color: #d4edda; color: #155724; border: 1px solid #c3e6cb; }
.status.error { background-color: #f8d7da; color: #721c24; border: 1px solid #f5c6cb; }
.status.processing { background-color: #e7f3ff; color: #004085; border: 1px solid #b3d9ff; }
pre {
background-color: #ecf0f1;
padding: 15px;
border-radius: 5px;
white-space: pre-wrap;
word-wrap: break-word;
}
</style>
</head>
<body>
<div class="container">
<h1>Curriculum PDF Uploader</h1>
<div class="form-group" style="margin-bottom: 20px;">
<label for="gradeInput" style="display: block; margin-bottom: 5px; font-weight: bold;">Grade:</label>
<input type="number" id="gradeInput" value="4" style="width: 98%; padding: 10px; border: 1px solid #ccc; border-radius: 5px;">
</div>
<div class="form-group" style="margin-bottom: 20px;">
<label for="subjectInput" style="display: block; margin-bottom: 5px; font-weight: bold;">Subject:</label>
<input type="text" id="subjectInput" value="Science" style="width: 98%; padding: 10px; border: 1px solid #ccc; border-radius: 5px;">
</div>
<input type="file" id="pdfFile" accept=".pdf">
<button id="uploadButton">Upload and Process Curriculum</button>
<div id="status"></div>
<pre id="response" style="display:none;"></pre>
</div>
<script>
const API_URL = 'http://localhost:8000/process-curriculum';
const pdfFileInput = document.getElementById('pdfFile');
const uploadButton = document.getElementById('uploadButton');
const statusDiv = document.getElementById('status');
const responsePre = document.getElementById('response');
const gradeInput = document.getElementById('gradeInput');
const subjectInput = document.getElementById('subjectInput'); // <-- Get the new subject field
uploadButton.addEventListener('click', async () => {
const selectedFile = pdfFileInput.files[0];
const grade = gradeInput.value;
const subject = subjectInput.value; // <-- Get the subject value
// --- Update validation ---
if (!selectedFile) { showStatus('Please select a PDF file first.', 'error'); return; }
if (!grade) { showStatus('Please enter a grade.', 'error'); return; }
if (!subject) { showStatus('Please enter a subject.', 'error'); return; }
const formData = new FormData();
formData.append('file', selectedFile);
formData.append('grade', grade);
formData.append('subject', subject);
// 3. Update UI to show processing state
showStatus('Uploading and starting background processing...', 'processing');
uploadButton.disabled = true;
responsePre.style.display = 'none';
try {
// 4. Send the file AND grade to the API
const response = await fetch(API_URL, {
method: 'POST',
body: formData,
});
const responseData = await response.json();
// 5. Handle the server's response
if (!response.ok) {
throw new Error(responseData.detail || `Server error: ${response.statusText}`);
}
showStatus('Success! The server has started processing your file in the background.', 'success');
responsePre.textContent = JSON.stringify(responseData, null, 2);
responsePre.style.display = 'block';
} catch (error) {
showStatus(`An error occurred: ${error.message}`, 'error');
} finally {
// 6. Re-enable the button
uploadButton.disabled = false;
}
});
// Helper function to show status messages
function showStatus(message, type) {
statusDiv.textContent = message;
statusDiv.className = `status ${type}`;
statusDiv.style.display = 'block';
}
</script>
</body>
</html>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment