finalize mcq and add test cases

parent 6c10aac1
......@@ -405,6 +405,44 @@ def create_app() -> FastAPI:
except Exception as e:
logger.error(f"Error in get_mcqs_handler: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/quiz/dynamic")
async def get_dynamic_quiz_handler(
request: Request,
grade: int = Form(...),
subject: str = Form(...),
unit: str = Form(...),
concept: str = Form(...),
is_arabic: bool = Form(...),
count: int = Form(5)
):
"""
Generates a dynamic quiz for a topic.
This endpoint ensures freshness by generating a few new questions
and then randomly selects the total requested 'count' from the
entire pool of available questions (new and old).
"""
container = request.app.state.container
try:
quiz_questions = container.agent_service.get_dynamic_quiz(
grade=grade,
subject=subject,
unit=unit,
concept=concept,
is_arabic=is_arabic,
count=count
)
return {
"status": "success",
"message": f"Successfully generated a dynamic quiz with {len(quiz_questions)} questions.",
"quiz": quiz_questions
}
except HTTPException as e:
raise e # Re-raise FastAPI specific exceptions
except Exception as e:
logger.error(f"Error in get_dynamic_quiz_handler: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.options("/get-audio-response")
async def audio_response_options():
......
......@@ -121,6 +121,8 @@ class QueryHandler:
2. "overview" - أسئلة عن نظرة عامة على المنهج أو المحتوى الكامل
3. "navigation" - أسئلة عن وحدة أو مفهوم معين
4. "specific_content" - أسئلة محددة عن موضوع علمي معين
5. "ask_for_question" - إذا كان الطالب يطلب أن تسأله سؤالاً أو يطلب اختبارًا (مثل "اسألني سؤال", "اختبرني", "quiz me", "ask me a question", "اسالني سؤال تاني" , "عايز سؤال).
{conversation_context}
......@@ -147,7 +149,7 @@ class QueryHandler:
classification: str = response.choices[0].message.content.strip().lower().strip('"').strip("'")
valid_classes = {
"general_chat", "overview", "navigation", "specific_content"
"general_chat", "overview", "navigation", "specific_content", "ask_for_question"
}
if classification in valid_classes:
......
......@@ -11,13 +11,14 @@ logger = logging.getLogger(__name__)
class ResponseGenerator:
"""Handles AI response generation and conversation management"""
def __init__(self, openai_service, db_service, pedagogy_service, query_handler, context_generator):
def __init__(self, openai_service, db_service, pedagogy_service, query_handler, context_generator, agent_service):
self.openai_service = openai_service
self.db_service = db_service
self.pedagogy_service = pedagogy_service
self.query_handler = query_handler
self.context_generator = context_generator
self.agent_service = agent_service
def get_conversation_history(self, student_id: str) -> list[Dict[str, str]]:
"""Get conversation history from database"""
......@@ -150,6 +151,13 @@ class ResponseGenerator:
# Now, add only ONE system message with all the context
messages.append({"role": "system", "content": system_context})
if query_type == "ask_for_question":
mcq_data = self.agent_service.handle_ask_for_question(student_id)
return {
"type": "mcq",
"data": mcq_data
}
# Finally add user message
messages.append({"role": "user", "content": user_message})
......
# services/chat_service.py
from fastapi import UploadFile, HTTPException
from typing import Optional
from typing import Optional, Dict
import sys
import os
import time
import io
import random
import json
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import MessageType, AppConfig
from repositories import StorageRepository
......@@ -31,79 +34,114 @@ class ChatService:
MessageType.TEXT: TextMessageHandler()
}
def _format_mcq_for_tts(self, mcq_data: Dict, is_arabic: bool) -> str:
""" Formats a structured MCQ dictionary into a natural, speakable string for TTS. """
question_text = mcq_data.get("question_text", "")
options = [
mcq_data.get("correct_answer"), mcq_data.get("wrong_answer_1"),
mcq_data.get("wrong_answer_2"), mcq_data.get("wrong_answer_3")
]
valid_options = [opt for opt in options if opt]
random.shuffle(valid_options)
spoken_text = f"{question_text}\n\n"
spoken_text += "والاختيارات هي:\n" if is_arabic else "The options are:\n"
for option in valid_options:
spoken_text += f"{option}, \n"
return spoken_text.strip()
# In chat_service.py
# REPLACE the entire process_message method with this one.
def process_message(self, student_id: str, file: Optional[UploadFile] = None, text: Optional[str] = None, game_context: Optional[str] = None):
"""Process message and generate text and audio response."""
"""
Processes a message, stores the full response in Redis, and returns a
detailed confirmation payload that differs for text vs. MCQ responses.
"""
try:
if file and file.filename:
# Assuming handle method reads the file content
file_content = file.file.read()
user_message = self.handlers[MessageType.AUDIO].openai_service.transcribe_audio(file_content, file.filename)
user_message = self.handlers[MessageType.AUDIO].openai_service.transcribe_audio(file.file.read(), file.filename)
elif text:
user_message = text
else:
raise HTTPException(status_code=400, detail="No text or audio file provided.")
final_message_for_agent = user_message
if game_context:
print(f"Game context provided: {game_context}")
final_message_for_agent = f"game context: {game_context}\nuser query: {user_message}"
final_message_for_agent = f"game context: {game_context}\nuser query: {user_message}" if game_context else user_message
agent_response_text = self.agent_service.generate_response(
agent_response = self.agent_service.generate_response(
user_message=final_message_for_agent,
student_id=student_id,
)
audio_data = self._generate_and_upload_audio(agent_response_text, student_id)
response_payload_for_redis = None
text_for_audio = ""
agent_response_for_confirmation = ""
# This block determines what to store in Redis and what text to generate audio from.
# The logic here remains the same.
if isinstance(agent_response, dict) and agent_response.get("type") == "mcq":
mcq_data = agent_response.get("data")
response_payload_for_redis = mcq_data
student_info = self.agent_service.db_service.get_student_info(student_id)
is_arabic = student_info.get('is_arabic', True) if student_info else True
text_for_audio = self._format_mcq_for_tts(mcq_data, is_arabic)
agent_response_for_confirmation = text_for_audio
else:
agent_response_text = str(agent_response)
response_payload_for_redis = agent_response_text
text_for_audio = agent_response_text
agent_response_for_confirmation = agent_response_text
# Generate audio for the prepared text
audio_data = self._generate_and_upload_audio(text_for_audio, student_id)
# Store the full payload (dict or string) and audio bytes in Redis for the polling endpoint
self.response_manager.store_response(
student_id=student_id,
text=agent_response_text,
text=response_payload_for_redis,
audio_filepath=audio_data.get("filepath"),
audio_bytes=audio_data.get("bytes")
)
print(f"Generated response for student {student_id}: {agent_response_text[:100]}...")
return {
"status": "success",
"message": "Message processed and agent response ready",
"student_id": student_id,
"agent_response": agent_response_text,
"audio_filepath": audio_data.get("filepath")
}
# Case 1: The response was an MCQ. Return the special structure.
if isinstance(agent_response, dict) and agent_response.get("type") == "mcq":
return {
"status": "success",
"message": "Message processed and MCQ response ready",
"student_id": student_id,
"response_type": "mcq",
"agent_response": agent_response_for_confirmation, # The speakable version
"question": agent_response.get("data"), # The NEW structured data field
"audio_filepath": audio_data.get("filepath")
}
# Case 2: The response was normal text. Return the standard structure.
else:
return {
"status": "success",
"message": "Message processed and agent response ready",
"student_id": student_id,
"response_type": "text",
"agent_response": agent_response_for_confirmation,
"audio_filepath": audio_data.get("filepath")
}
except Exception as e:
print(f"Error processing message for student {student_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to process message: {str(e)}")
def _generate_and_upload_audio(self, text: str, student_id: str) -> dict:
"""
Segments mixed-language text and generates TTS audio, then uploads to MinIO.
"""
""" Segments text, generates TTS audio, and uploads to MinIO. """
try:
segments = self.segmentation_service.segment_text(text)
audio_bytes = self.agent_service.tts_service.generate_speech_from_sequence(segments)
file_extension = "wav"
timestamp = int(time.time())
filename = f"agent_response_{timestamp}_{student_id}.{file_extension}"
filename = f"agent_response_{timestamp}_{student_id}.wav"
minio_file_path = f"audio/{filename}"
self.storage_repo.upload_file(
file_obj=io.BytesIO(audio_bytes),
bucket=self.config.minio_bucket,
file_path=minio_file_path
)
full_url = self.storage_repo.get_file_url(
bucket=self.config.minio_bucket,
file_path=minio_file_path,
expires=3600 # 1 hour
)
self.storage_repo.upload_file(io.BytesIO(audio_bytes), self.config.minio_bucket, minio_file_path)
full_url = self.storage_repo.get_file_url(self.config.minio_bucket, minio_file_path, expires=3600)
print(f"Successfully generated and uploaded TTS audio: {filename}")
return {"bytes": audio_bytes, "filepath": full_url}
except Exception as e:
print(f"Error in _generate_and_upload_audio: {e}")
return {"bytes": None, "filepath": None}
\ No newline at end of file
# services/response_manager.py
import json
import base64
from typing import Optional, Dict
from typing import Optional, Dict, Union
from .redis_client import redis_client
class ResponseManager:
......@@ -21,20 +21,24 @@ class ResponseManager:
"""Creates a consistent key for the student's queue."""
return f"student_queue:{student_id}"
def store_response(self, student_id: str, text: str, audio_filepath: Optional[str] = None, audio_bytes: Optional[bytes] = None) -> None:
"""Adds a new response to the END of the queue for a specific student."""
def store_response(self, student_id: str, text: Union[str, Dict], audio_filepath: Optional[str] = None, audio_bytes: Optional[bytes] = None) -> None:
"""
Adds a new response to the queue. The 'text' can be a string or a dictionary.
"""
key = self._get_key(student_id)
encoded_audio = base64.b64encode(audio_bytes).decode('utf-8') if audio_bytes else None
# This payload now flexibly stores either a string or a dict in the 'text' field.
payload = {
"text": text,
"audio_filepath": audio_filepath,
"audio_bytes_b64": encoded_audio
}
# RPUSH adds the new item to the right (end) of the list.
self.redis.rpush(key, json.dumps(payload))
# Reset the expiration time for the whole queue each time a new item is added.
self.redis.expire(key, self.ttl_seconds)
def get_response(self, student_id: str) -> Dict:
"""Atomically retrieves and removes the OLDEST response from the front of the queue."""
key = self._get_key(student_id)
......
......@@ -7,6 +7,7 @@ from fastapi.responses import Response, StreamingResponse
from starlette.background import BackgroundTask
import sys
import os
import json
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import ResponseStatus
from services.response_manager import ResponseManager
......@@ -20,42 +21,46 @@ class ResponseService:
def get_agent_response(self, student_id: str):
"""
Gets the agent response from the manager and streams the raw audio bytes
directly, avoiding temporary files and re-downloading from MinIO.
Gets the agent response from the manager and streams the audio.
It intelligently handles both text and structured MCQ data by encoding
the payload in the 'X-Response-Text' header and signaling the type
in the 'X-Response-Type' header.
"""
if not self.response_manager.is_response_fresh(student_id):
raise HTTPException(status_code=404, detail="Agent response not ready or expired.")
response_data = self.response_manager.get_response(student_id)
text_response = response_data.get("text")
payload_data = response_data.get("text") # This can be a string or a dict
audio_bytes = response_data.get("audio_bytes")
if not audio_bytes:
# Handle text-only response if audio failed
return {
"status": ResponseStatus.SUCCESS,
"message": "Text response available (audio generation failed).",
"text": text_response
}
if not response_data or not response_data.get("text"):
raise HTTPException(status_code=404, detail=f"Response for student {student_id} was already claimed or expired.")
if not payload_data or not audio_bytes:
raise HTTPException(status_code=404, detail=f"Response for {student_id} was incomplete, claimed, or expired.")
response_type = "text"
encoded_text = ""
# Check the type of the payload to decide how to encode it
if isinstance(payload_data, dict):
# It's an MCQ
response_type = "mcq"
# Serialize the dictionary to a JSON string
json_string = json.dumps(payload_data, ensure_ascii=False)
# Base64-encode the JSON string
encoded_text = base64.b64encode(json_string.encode('utf-8')).decode('ascii')
else:
# It's a normal text string
response_type = "text"
# Base64-encode the string directly
encoded_text = base64.b64encode(str(payload_data).encode('utf-8')).decode('ascii')
# Determine content type based on filename extension
filename = response_data.get("audio_filename", "")
media_type = "audio/wav" if filename.endswith(".wav") else "audio/mpeg"
# Encode the text in Base64 for the header
encoded_text = base64.b64encode(text_response.encode('utf-8')).decode('ascii')
# Stream the raw bytes directly
# Stream the raw audio bytes
return Response(
content=audio_bytes,
media_type=media_type,
media_type="audio/wav",
headers={
"X-Response-Type": response_type, # Signal the payload type
"X-Response-Text": encoded_text,
"Access-Control-Expose-Headers": "X-Response-Text"
"Access-Control-Expose-Headers": "X-Response-Text, X-Response-Type" # Expose the new header
}
)
\ No newline at end of file
import requests
import json
# The base URL of your locally running FastAPI application
BASE_URL = "https://voice-agent.caprover.al-arcade.com"
# --- Test Configuration ---
# Use a student ID that has some recent chat history on any topic.
TEST_STUDENT_ID = "student_001"
# Change this to a normal phrase or the trigger phrase to test different paths
TRIGGER_PHRASE = "اسألني سؤال" # "Ask me a question"
# TRIGGER_PHRASE = "ما هو التكيف؟" # "What is adaptation?"
def test_chat_endpoint(student_id: str, message: str):
"""
Sends a single request to the /chat endpoint and prints the full response.
"""
endpoint = f"{BASE_URL}/chat"
payload = {
"student_id": student_id,
"text": message,
}
print(f"▶️ Sending message to /chat for student '{student_id}'...")
print(f" Message: '{message}'")
try:
# Make the POST request
response = requests.post(endpoint, data=payload, timeout=120)
# Print the HTTP status code and headers for context
print(f"\n--- API Response from /chat ---")
print(f"Status Code: {response.status_code}")
print("Headers:")
for key, value in response.headers.items():
print(f" {key}: {value}")
# Try to parse and pretty-print the JSON response body
try:
response_data = response.json()
print("\nResponse Body (JSON):")
print(json.dumps(response_data, indent=2, ensure_ascii=False))
except json.JSONDecodeError:
print("\nResponse Body (Not JSON):")
print(response.text)
except requests.exceptions.RequestException as e:
print(f"\n❌ FAILED: An error occurred while making the request: {e}")
if __name__ == "__main__":
print("="*60)
print(" SIMPLE /chat ENDPOINT TEST")
print(" This script sends one message and prints the immediate response.")
print("="*60)
test_chat_endpoint(TEST_STUDENT_ID, TRIGGER_PHRASE)
print("\n" + "="*60)
print(" Test complete.")
print("="*60)
\ No newline at end of file
"""
======================================================================
Dynamic Quiz API Cookbook & Test Script
======================================================================
Purpose:
This script serves as a live integration test and a practical guide ("cookbook")
for using the new Dynamic Quiz API endpoint.
It demonstrates how to request a quiz of a specific size ('n') for a given topic.
----------------------------------------------------------------------
API Endpoint Guide
----------------------------------------------------------------------
Generate a Dynamic Quiz (POST /quiz/dynamic)
---------------------------------------------
This is the primary endpoint for creating quizzes for students. It's designed
to be both fresh and comprehensive.
How it Works:
1. It intelligently calculates a small number of *new* questions to generate based on
the requested quiz size ('count'). This ensures the question bank is always growing.
2. It calls the AI to generate these new, unique questions and saves them to the database.
3. It retrieves ALL available questions for the topic (both old and new).
4. It randomly shuffles this complete list and returns the number of questions the user asked for.
This provides a dynamic, varied quiz experience every time while efficiently expanding
your question database.
- Method: POST
- URL: [BASE_URL]/quiz/dynamic
- Data Format: Must be sent as `application/x-www-form-urlencoded` (form data).
Parameters (Form Data):
- grade (int, required): The grade level of the curriculum (e.g., 4).
- subject (str, required): The subject of the curriculum (e.g., "Science").
- unit (str, required): The exact name of the unit.
- concept (str, required): The exact name of the concept.
- is_arabic (bool, required): Set to `true` for Arabic curriculum, `false` for English.
- count (int, optional, default=5): The total number of questions you want in the final quiz.
Example Usage (using cURL):
# Request a quiz of 10 random questions for the topic.
# This will generate ~3 new questions and then pick 10 from the whole pool.
curl -X POST [BASE_URL]/quiz/dynamic \
-F "grade=4" \
-F "subject=Science" \
-F "unit=الوحدة الأولى: الأنظمة الحية" \
-F "concept=المفهوم الأول: التكيف والبقاء" \
-F "is_arabic=true" \
-F "count=10"
----------------------------------------------------------------------
How to Run This Script
----------------------------------------------------------------------
1. Ensure your FastAPI server is running.
2. Make sure the BASE_URL variable below is set to your server's address.
3. Run the script from your terminal: python3 simple_dynamic_quiz_test.py
"""
import requests
import json
# The base URL of your API server.
# Change this to "http://localhost:8000" if you are testing locally.
BASE_URL = "https://voice-agent.caprover.al-arcade.com"
def test_dynamic_quiz(grade: int, subject: str, unit: str, concept: str, is_arabic: bool, count: int):
"""
Calls the /quiz/dynamic endpoint and prints the raw JSON response.
"""
endpoint = f"{BASE_URL}/quiz/dynamic"
payload = {
"grade": grade,
"subject": subject,
"unit": unit,
"concept": concept,
"is_arabic": is_arabic,
"count": count,
}
print(f">> Requesting a dynamic quiz of {count} questions for:")
print(f" Topic: Grade {grade} {subject} -> {unit} -> {concept}")
print(f" Language: {'Arabic' if is_arabic else 'English'}")
try:
# Make the POST request with a long timeout to allow for new question generation
response = requests.post(endpoint, data=payload, timeout=180)
print(f"\n--- API Response ---")
print(f"Status Code: {response.status_code}")
# Try to parse and pretty-print the JSON response
try:
response_data = response.json()
print("\nResponse Body (JSON):")
print(json.dumps(response_data, indent=2, ensure_ascii=False))
except json.JSONDecodeError:
print("\nResponse Body (Not JSON):")
print(response.text)
except requests.exceptions.RequestException as e:
print(f"\nFAILED: An error occurred while making the request: {e}")
if __name__ == "__main__":
print("\n" + "="*50)
print("STARTING TEST 1: ARABIC DYNAMIC QUIZ")
print("="*50)
# Updated test data as requested
arabic_test_data = {
"grade": 4,
"subject": "Science",
"unit": "الوحدة الأولى: الأنظمة الحية",
"concept": "المفهوم الأول: التكيف والبقاء",
"is_arabic": True,
"count": 3
}
test_dynamic_quiz(**arabic_test_data)
print("\n" + "="*50)
print("STARTING TEST 2: ENGLISH DYNAMIC QUIZ")
print("="*50)
# Updated test data as requested
english_test_data = {
"grade": 5,
"subject": "Science",
"unit": "Unit 1: Matter and Energy in Ecosystems",
"concept": "Concept 1.1: Properties of Matter",
"is_arabic": False,
"count": 2
}
test_dynamic_quiz(**english_test_data)
print("\n" + "="*50)
print("All tests complete.")
print("="*50)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment