Commit 39ae3ad9 authored by salma's avatar salma

Parallel Speculative Execution

parent aa857505
......@@ -2,6 +2,8 @@ import os
import sys
from typing import Dict
from fastapi import HTTPException
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from services.agent_helpers.agent_prompts import SYSTEM_PROMPTS
import logging
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
......@@ -73,112 +75,149 @@ class ResponseGenerator:
temperature: float = 0.3,
top_k: int = 3
) -> str:
"""Enhanced AI response generation with JSON-based curriculum structure awareness"""
"""
Enhanced response generation with Parallel Speculative Execution.
Classification and Vector Search run simultaneously.
"""
if not self.openai_service.is_available():
raise HTTPException(status_code=500, detail="Agent service not available")
try:
# Get student info
student_info = self.db_service.get_student_info(student_id)
# 1. prepare (Sequential - fast DB lookups)
# Fetch student info and history first as they are needed for inputs
with ThreadPoolExecutor(max_workers=2) as pre_executor:
future_student = pre_executor.submit(self.db_service.get_student_info, student_id)
# Fetch history using the optimized
future_history = pre_executor.submit(self.get_conversation_history, student_id)
student_info = future_student.result()
conversation_history = future_history.result()
if not student_info:
raise HTTPException(status_code=404, detail=f"Student with ID {student_id} not found")
student_name = student_info.student_name.split()[0]
study_language = student_info.study_language
# Add user message to DB
# Save user message (Fire and forget - strictly speaking we should wait,
# but for speed we can do this without blocking the logic if you trust your DB)
self.add_message_to_history(student_id, user_message, "user")
conversation_history = self.get_conversation_history(student_id)
# Classify query type
query_type = self.query_handler.classify_query_type(
# 2. speculative parallel execution
query_type = "specific_content" # Default fallback
relevant_results = [] # Default empty
with ThreadPoolExecutor(max_workers=2) as executor:
# TASK A: THE BRAIN (Classification)
# We pass the conversation_history we already fetched to save the DB call
future_classification = executor.submit(
self.query_handler.classify_query_type,
user_message,
student_info,
student_id,
conversation_history
)
logger.info(f"Query type: {query_type} for student {student_name} ({study_language.value}) with conversation context")
# Prepare system prompt
formatted_base_prompt = self.prepare_system_prompt(student_info)
# Build base messages
messages = [{"role": "system", "content": formatted_base_prompt}]
messages.extend(conversation_history)
# TASK B: THE EYES (Vector Search)
# We "Speculate" that the user MIGHT ask a content question.
# We run the search immediately. If it turns out to be "general_chat",
# we just threw away 200ms of compute, but saved 1.5s of latency for real questions.
future_search = executor.submit(
self.context_generator.search_enhanced_content,
user_message,
student_info,
subject,
top_k
)
if query_type == "general_chat":
chat_context = self.query_handler.handle_general_chat_query(user_message, student_info)
messages.append({"role": "system", "content": f"سياق المحادثة العامة:\n{chat_context}"})
# TASK C (Optional Check): Rule-based check for game help is fast,
# but we can do it inside the classification logic or separately.
# Here we wait for the classification to finish.
query_type = future_classification.result()
elif query_type == "overview":
overview_response = self.query_handler.handle_overview_query(student_info, subject)
messages.append({"role": "system", "content": f"المنهج الكامل من ملف JSON:\n{overview_response}"})
logger.info(f"Query classified as: {query_type}")
elif query_type == "navigation":
navigation_response = self.query_handler.handle_navigation_query(user_message, student_info, subject)
messages.append({"role": "system", "content": f"تفاصيل الوحدة/المفهوم من JSON:\n{navigation_response}"})
# 3. SYNCHRONIZATION & DECISION
system_context_content = ""
if query_type == "specific_content":
# The brain says: "This is science!"
# We check the eyes (search results). They are ALREADY READY.
relevant_results = future_search.result()
elif query_type == "specific_content":
# Enhanced content search
relevant_results = self.context_generator.search_enhanced_content(
user_message, student_info, subject, top_k
)
if relevant_results:
enhanced_context = self.context_generator.generate_enhanced_context(
system_context_content = self.context_generator.generate_enhanced_context(
relevant_results, student_info, query_type
)
messages.append({"role": "system", "content": enhanced_context})
logger.info(f"Added enhanced context with {len(relevant_results)} chunks for student {student_name}")
logger.info(f"Using speculative search results: {len(relevant_results)} chunks")
elif query_type == "game_help":
# Handle game help
game_context, user_query = self.query_handler.handle_game_help_query(user_message)
logger.info(f"Handling game_help query. Context: {game_context}")
# Start building a single, comprehensive context string
system_context = f"سياق اللعبة التعليمية اللي هتساعد الطفل فيها:\n{game_context}"
system_context_content = f"سياق اللعبة التعليمية اللي هتساعد الطفل فيها:\n{game_context}"
# Search for and add curriculum context if it exists
relevant_results = self.context_generator.search_enhanced_content(
user_query, student_info, subject, top_k
)
# For game help, we also use the search results we started in Task B!
# (Assuming the game query might need knowledge)
relevant_results = future_search.result()
if relevant_results:
enhanced_context = self.context_generator.generate_enhanced_context(
enhanced_ctx = self.context_generator.generate_enhanced_context(
relevant_results, student_info, query_type
)
# Append the curriculum context to the same string
system_context += f"\n\nمحتوي المنهج اللي ليه علاقة بسؤال الطفل:\n{enhanced_context}"
logger.info(f"Added enhanced context with {len(relevant_results)} chunks for game help.")
system_context_content += f"\n\nمحتوي المنهج اللي ليه علاقة بسؤال الطفل:\n{enhanced_ctx}"
elif query_type == "general_chat":
# The brain says: "Just chatting."
# We IGNORE the search results from Task B.
# (Task B might still be running or finished, we just don't use the data).
chat_context = self.query_handler.handle_general_chat_query(user_message, student_info)
system_context_content = f"سياق المحادثة العامة:\n{chat_context}"
# Now, add only ONE system message with all the context
messages.append({"role": "system", "content": system_context})
elif query_type == "overview":
overview_response = self.query_handler.handle_overview_query(student_info, subject)
system_context_content = f"المنهج الكامل من ملف JSON:\n{overview_response}"
elif query_type == "navigation":
nav_response = self.query_handler.handle_navigation_query(user_message, student_info, subject)
system_context_content = f"تفاصيل الوحدة/المفهوم من JSON:\n{nav_response}"
if query_type == "ask_for_question":
mcq_data = self.agent_service.handle_ask_for_question(student_id)
elif query_type == "ask_for_question":
# Special case, returns dict immediately
return {
"type": "mcq",
"data": mcq_data
"data": self.agent_service.handle_ask_for_question(student_id)
}
# 4. FINAL GENERATION
# Finally add user message
messages.append({"role": "user", "content": user_message})
# Prepare system prompt
formatted_base_prompt = self.prepare_system_prompt(student_info)
# Build messages
messages = [{"role": "system", "content": formatted_base_prompt}]
messages.extend(conversation_history)
# Add the context we decided on (if any)
if system_context_content:
messages.append({"role": "system", "content": system_context_content})
# Add user message
messages.append({"role": "user", "content": user_message})
# ==========================
# CALL AI MODEL
# ==========================
# Call AI
response = self.openai_service.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature
)
ai_response = response.choices[0].message.content.strip()
if not ai_response:
raise ValueError("Empty response from AI model")
# Save AI response
self.add_message_to_history(student_id, ai_response, "assistant")
logger.info(f"Generated {query_type} response for {student_name} ({study_language.value}) with conversation context: {len(ai_response)} characters")
return ai_response
except HTTPException:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment