Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
A
AI Tutor
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Salma Mohammed Hamed
AI Tutor
Commits
396e5e31
Commit
396e5e31
authored
Nov 19, 2025
by
salma
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactor main
parent
a2f45412
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
639 additions
and
745 deletions
+639
-745
main.py
self_hosted_env/voice_agent/main.py
+49
-744
chat.py
self_hosted_env/voice_agent/routers/chat.py
+51
-0
curriculum.py
self_hosted_env/voice_agent/routers/curriculum.py
+56
-0
frontend.py
self_hosted_env/voice_agent/routers/frontend.py
+46
-0
multiplayer.py
self_hosted_env/voice_agent/routers/multiplayer.py
+191
-0
quiz.py
self_hosted_env/voice_agent/routers/quiz.py
+217
-0
system.py
self_hosted_env/voice_agent/routers/system.py
+28
-0
redis_client.py
self_hosted_env/voice_agent/services/redis_client.py
+1
-1
No files found.
self_hosted_env/voice_agent/main.py
View file @
396e5e31
import
os
import
os
import
uuid
import
asyncio
import
asyncio
from
fastapi
import
WebSocket
,
WebSocketDisconnect
,
Depends
import
logging
from
typing
import
List
,
Dict
from
fastapi
import
FastAPI
import
shutil
from
fastapi
import
FastAPI
,
UploadFile
,
File
,
Form
,
HTTPException
,
Request
,
BackgroundTasks
,
logger
from
fastapi.middleware.cors
import
CORSMiddleware
from
fastapi.middleware.cors
import
CORSMiddleware
from
fastapi.responses
import
FileResponse
,
Response
from
fastapi.staticfiles
import
StaticFiles
from
contextlib
import
asynccontextmanager
from
contextlib
import
asynccontextmanager
from
typing
import
Optional
import
uvicorn
import
base64
from
pathlib
import
Path
import
tempfile
import
json
import
pandas
as
pd
import
logging
from
process_pdf_pipline
import
process_pdf_curriculum_in_background
# Import your existing modules
from
core
import
AppConfig
from
repositories
import
MinIOStorageRepository
from
services
import
(
AudioService
,
ChatService
,
HealthService
,
ResponseService
,
ResponseManager
,
OpenAIService
,
AgentService
,
ConnectionPool
,
LanguageSegmentationService
,
DataIngestionService
,
WebSocketManager
,
redis_client
,
redis_listener
,
get_room_key
,
get_room_channel
)
from
utils
import
DateTimeEncoder
from
schemas.mcq
import
QuestionResponse
,
QuizResponse
,
MCQListResponse
,
QuizSubmission
# Instantiate one manager per worker
manager
=
WebSocketManager
()
from
core
import
DIContainer
# Import Core
from
core.container
import
DIContainer
from
services
import
WebSocketManager
,
redis_client
,
redis_listener
# Import Routers
from
routers
import
chat
,
quiz
,
multiplayer
,
curriculum
,
frontend
,
system
logger
=
logging
.
getLogger
(
"uvicorn.error"
)
@
asynccontextmanager
@
asynccontextmanager
async
def
lifespan
(
app
:
FastAPI
):
async
def
lifespan
(
app
:
FastAPI
):
"""
Manages application startup and shutdown events.
We'll start the Redis listener here.
"""
# --- Code to run ON STARTUP ---
print
(
"Application starting up..."
)
print
(
"Application starting up..."
)
# 1. Initialize Container
container
=
DIContainer
()
container
=
DIContainer
()
app
.
state
.
container
=
container
app
.
state
.
container
=
container
print
(
"DIContainer created."
)
# 2. Initialize WebSocket Manager
app
.
state
.
websocket_manager
=
WebSocketManager
()
print
(
"DIContainer and WebSocketManager created."
)
#
Start the background Redis listener task
#
3. Start Redis Listener
if
redis_client
:
if
redis_client
:
listener_task
=
asyncio
.
create_task
(
redis_listener
(
manager
))
print
(
"Starting Redis Listener Task..."
)
# Pass the manager stored in state
listener_task
=
asyncio
.
create_task
(
redis_listener
(
app
.
state
.
websocket_manager
))
# Add a done callback to log immediate failures
def
handle_listener_failure
(
task
):
try
:
task
.
result
()
except
asyncio
.
CancelledError
:
pass
except
Exception
as
e
:
logger
.
error
(
f
"Redis Listener crashed: {e}"
,
exc_info
=
True
)
listener_task
.
add_done_callback
(
handle_listener_failure
)
app
.
state
.
redis_listener_task
=
listener_task
app
.
state
.
redis_listener_task
=
listener_task
else
:
else
:
app
.
state
.
redis_listener_task
=
None
app
.
state
.
redis_listener_task
=
None
print
(
"WARNING: Redis not connected. Live Quiz feature will not work across multiple workers."
)
print
(
"WARNING: Redis not connected. Live Quiz feature will not work across multiple workers."
)
yield
# The application is now running
yield
# --- Code to run ON SHUTDOWN ---
print
(
"Application shutting down..."
)
print
(
"Application shutting down..."
)
if
app
.
state
.
redis_listener_task
:
if
app
.
state
.
redis_listener_task
:
app
.
state
.
redis_listener_task
.
cancel
()
app
.
state
.
redis_listener_task
.
cancel
()
await
app
.
state
.
redis_listener_task
try
:
await
app
.
state
.
redis_listener_task
except
asyncio
.
CancelledError
:
pass
app
.
state
.
container
.
agent_service
.
close
()
if
hasattr
(
app
.
state
.
container
,
'agent_service'
):
print
(
"Database connection pool closed successfully."
)
app
.
state
.
container
.
agent_service
.
close
()
print
(
"Database connection pool closed."
)
def
create_app
()
->
FastAPI
:
def
create_app
()
->
FastAPI
:
# Connect the lifespan manager to your FastAPI app instance
app
=
FastAPI
(
title
=
"Unified Chat API with Local Agent"
,
lifespan
=
lifespan
)
app
=
FastAPI
(
title
=
"Unified Chat API with Local Agent"
,
lifespan
=
lifespan
)
logger
=
logging
.
getLogger
(
"uvicorn.error"
)
#
Fixed CORS configuration for CapRover
#
CORS Config
app
.
add_middleware
(
app
.
add_middleware
(
CORSMiddleware
,
CORSMiddleware
,
allow_origins
=
[
allow_origins
=
[
"*"
],
"https://voice-agent.caprover.al-arcade.com"
,
"http://voice-agent.caprover.al-arcade.com"
,
"http://localhost:8000"
,
# For local development
"http://127.0.0.1:8000"
,
"*"
# Allow all origins for testing - remove in production
],
allow_credentials
=
True
,
allow_credentials
=
True
,
allow_methods
=
[
"GET"
,
"POST"
,
"PUT"
,
"DELETE"
,
"OPTIONS"
],
allow_methods
=
[
"*"
],
allow_headers
=
[
allow_headers
=
[
"*"
],
"Accept"
,
"Accept-Language"
,
"Content-Language"
,
"Content-Type"
,
"Authorization"
,
"X-Response-Text"
],
expose_headers
=
[
"X-Response-Text"
],
expose_headers
=
[
"X-Response-Text"
],
)
)
@
app
.
on_event
(
"startup"
)
@
app
.
on_event
(
"startup"
)
async
def
startup_event
():
async
def
startup_event
():
# Access the container from app state to print config on startup
container
=
app
.
state
.
container
container
=
app
.
state
.
container
print
(
"MinIO Endpoint:"
,
container
.
config
.
minio_endpoint
)
print
(
"MinIO Endpoint:"
,
container
.
config
.
minio_endpoint
)
print
(
"MinIO Bucket:"
,
container
.
config
.
minio_bucket
)
print
(
"OpenAI Service Available:"
,
container
.
openai_service
.
is_available
())
print
(
"OpenAI Service Available:"
,
container
.
openai_service
.
is_available
())
print
(
"Agent Service Available:"
,
container
.
agent_service
.
is_available
())
@
app
.
get
(
"/chat-interface"
)
async
def
serve_audio_recorder
():
"""Serve the audio recorder HTML file"""
try
:
static_file
=
Path
(
"static/audio-recorder.html"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
current_file
=
Path
(
"audio-recorder.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Audio recorder interface not found"
)
except
Exception
as
e
:
print
(
f
"Error serving audio recorder: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
app
.
get
(
"/curriculum-upload"
)
# Include Routers
async
def
serve_curriculum_upload
():
app
.
include_router
(
frontend
.
router
)
"""Serve the curriculum upload HTML file"""
app
.
include_router
(
chat
.
router
)
try
:
app
.
include_router
(
quiz
.
router
)
static_file
=
Path
(
"static/curriculum_PDF_uploader.html"
)
app
.
include_router
(
multiplayer
.
router
)
if
static_file
.
exists
():
app
.
include_router
(
curriculum
.
router
)
return
FileResponse
(
static_file
)
app
.
include_router
(
system
.
router
)
current_file
=
Path
(
"curriculum_PDF_uploader.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Curriculum upload interface not found"
)
except
Exception
as
e
:
print
(
f
"Error serving curriculum upload interface: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
app
.
post
(
"/chat"
)
async
def
chat_handler
(
request
:
Request
,
file
:
Optional
[
UploadFile
]
=
File
(
None
),
text
:
Optional
[
str
]
=
Form
(
None
),
student_id
:
str
=
Form
(
"student_001"
),
game_context
:
Optional
[
str
]
=
Form
(
None
)
):
"""Handles incoming chat messages using the shared container instance."""
container
=
request
.
app
.
state
.
container
try
:
if
not
student_id
.
strip
():
raise
HTTPException
(
status_code
=
400
,
detail
=
"Student ID is required"
)
result
=
container
.
chat_service
.
process_message
(
student_id
=
student_id
,
file
=
file
,
text
=
text
,
game_context
=
game_context
)
return
result
except
Exception
as
e
:
print
(
f
"Error in chat handler: {str(e)}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Chat processing error: {str(e)}"
)
@
app
.
get
(
"/get-audio-response"
)
async
def
get_audio_response
(
request
:
Request
,
student_id
:
str
=
"student_001"
):
"""Fetches the agent's text and audio response using the shared container."""
container
=
request
.
app
.
state
.
container
try
:
result
=
container
.
response_service
.
get_agent_response
(
student_id
=
student_id
)
if
hasattr
(
result
,
'status_code'
):
return
result
# This should be unreachable if response_service always returns a Response object
return
result
except
Exception
as
e
:
print
(
f
"Error getting audio response: {str(e)}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Audio response error: {str(e)}"
)
@
app
.
post
(
"/process-curriculum"
,
status_code
=
202
)
async
def
process_curriculum_webhook
(
background_tasks
:
BackgroundTasks
,
grade
:
int
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
file
:
UploadFile
=
File
(
...
)
):
"""
Accepts a PDF and adds a background task to process it.
Returns immediately.
"""
pdf_bytes
=
await
file
.
read
()
background_tasks
.
add_task
(
process_pdf_curriculum_in_background
,
pdf_bytes
,
file
.
filename
,
grade
,
subject
)
# Return immediately to the user
return
{
"status"
:
"processing_started"
,
"message"
:
"The curriculum is being processed in the background."
}
@
app
.
post
(
"/mcq/generate"
)
async
def
generate_mcqs_handler
(
request
:
Request
,
curriculum
:
str
=
Form
(
...
),
grade
:
str
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
unit
:
str
=
Form
(
...
),
concept
:
str
=
Form
(
...
),
count
:
int
=
Form
(
5
),
is_arabic
:
bool
=
Form
(
False
),
):
"""
Generates and stores a new set of MCQs.
NOTE: This endpoint intentionally returns the FULL question object,
including curriculum, grade, etc., as it might be useful for the client
that just initiated the generation. The GET endpoints will be filtered.
"""
container
=
request
.
app
.
state
.
container
try
:
generated_questions
=
container
.
agent_service
.
generate_and_store_mcqs
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
num_questions
=
count
,
is_arabic
=
is_arabic
,
)
return
{
"status"
:
"success"
,
"message"
:
f
"Successfully generated and stored {len(generated_questions)} MCQs."
,
"questions"
:
generated_questions
}
except
HTTPException
as
e
:
raise
e
except
Exception
as
e
:
logger
.
error
(
f
"Error in generate_mcqs_handler: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
# --- STEP 2: UPDATE THE /mcq ENDPOINT SIGNATURE ---
@
app
.
get
(
"/mcq"
,
response_model
=
MCQListResponse
)
async
def
get_mcqs_handler
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
,
subject
:
str
,
unit
:
str
,
concept
:
str
,
is_arabic
:
bool
,
limit
:
Optional
[
int
]
=
None
):
"""
Retrieves existing MCQs, filtered to the 11-field response model.
"""
container
=
request
.
app
.
state
.
container
try
:
# The service layer still returns the full objects from the DB
questions_from_db
=
container
.
agent_service
.
pgvector
.
get_mcqs
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
is_arabic
=
is_arabic
,
limit
=
limit
)
# FastAPI will automatically filter `questions_from_db` to match the model
return
{
"status"
:
"success"
,
"count"
:
len
(
questions_from_db
),
"questions"
:
questions_from_db
}
except
Exception
as
e
:
logger
.
error
(
f
"Error in get_mcqs_handler: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
# --- STEP 3: UPDATE THE /quiz/dynamic ENDPOINT SIGNATURE ---
@
app
.
post
(
"/quiz/dynamic"
,
response_model
=
QuizResponse
)
async
def
get_dynamic_quiz_handler
(
request
:
Request
,
curriculum
:
str
=
Form
(
...
),
grade
:
str
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
unit
:
str
=
Form
(
...
),
concept
:
str
=
Form
(
...
),
is_arabic
:
bool
=
Form
(
...
),
count
:
int
=
Form
(
5
)
):
"""
Generates a dynamic quiz, filtered to the 11-field response model.
"""
container
=
request
.
app
.
state
.
container
try
:
# The service layer still returns the full objects
quiz_questions_full
=
container
.
agent_service
.
get_dynamic_quiz
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
is_arabic
=
is_arabic
,
count
=
count
)
# FastAPI will automatically filter `quiz_questions_full` to match the model
return
{
"status"
:
"success"
,
"message"
:
f
"Successfully generated a dynamic quiz with {len(quiz_questions_full)} questions."
,
"quiz"
:
quiz_questions_full
}
except
HTTPException
as
e
:
raise
e
except
Exception
as
e
:
logger
.
error
(
f
"Error in get_dynamic_quiz_handler: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
get
(
"/quiz-interface"
)
async
def
serve_quiz_interface
():
"""Serve the dynamic quiz generator HTML file"""
try
:
# Check for the file in a 'static' folder first
static_file
=
Path
(
"static/dynamic_quiz_interface.html"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
# Fallback to the root directory
current_file
=
Path
(
"dynamic_quiz_interface.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Dynamic quiz interface not found"
)
except
Exception
as
e
:
print
(
f
"Error serving quiz interface: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
app
.
post
(
"/quiz/grade"
)
async
def
grade_quiz_handler
(
submission
:
QuizSubmission
):
"""
Receives a quiz submission, grades it, and returns the results.
"""
correct_answers_count
=
0
results
=
[]
# Create a simple lookup map for correct answers from the full question objects
correct_answer_map
=
{
q
[
'question_text'
]:
q
[
'correct_answer'
]
for
q
in
submission
.
questions
}
for
question_text
,
user_answer
in
submission
.
answers
.
items
():
correct_answer
=
correct_answer_map
.
get
(
question_text
)
is_correct
=
(
user_answer
==
correct_answer
)
if
is_correct
:
correct_answers_count
+=
1
results
.
append
({
"question_text"
:
question_text
,
"user_answer"
:
user_answer
,
"correct_answer"
:
correct_answer
,
"is_correct"
:
is_correct
})
total_questions
=
len
(
submission
.
questions
)
percentage
=
(
correct_answers_count
/
total_questions
)
*
100
if
total_questions
>
0
else
0
return
{
"status"
:
"success"
,
"score"
:
correct_answers_count
,
"total_questions"
:
total_questions
,
"percentage"
:
round
(
percentage
,
2
),
"results"
:
results
}
@
app
.
get
(
"/test-yourself"
)
async
def
serve_test_yourself_interface
():
"""Serve the interactive 'Test Yourself' HTML file"""
try
:
# Check for the file in a 'static' folder first
static_file
=
Path
(
"static/test_yourself_interface.html"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
# Fallback to the root directory
current_file
=
Path
(
"test_yourself_interface.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Interactive quiz interface not found"
)
except
Exception
as
e
:
print
(
f
"Error serving 'Test Yourself' interface: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
app
.
get
(
"/quiz/options/curricula"
)
async
def
get_curricula_options
(
request
:
Request
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_curricula_from_structure
()
return
{
"options"
:
options
}
@
app
.
get
(
"/quiz/options/grades"
)
async
def
get_grades_options
(
request
:
Request
,
curriculum
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_grades_from_structure
(
curriculum
)
return
{
"options"
:
options
}
@
app
.
get
(
"/quiz/options/subjects"
)
async
def
get_subjects_options
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_subjects_from_structure
(
curriculum
,
grade
)
return
{
"options"
:
options
}
@
app
.
get
(
"/quiz/options/units"
)
async
def
get_units_options
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
,
subject
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_units_from_structure
(
curriculum
,
grade
,
subject
)
return
{
"options"
:
options
}
@
app
.
get
(
"/quiz/options/concepts"
)
async
def
get_concepts_options
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
,
subject
:
str
,
unit
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_concepts_from_structure
(
curriculum
,
grade
,
subject
,
unit
)
return
{
"options"
:
options
}
@
app
.
post
(
"/quiz/room/create"
)
async
def
create_quiz_room
(
request
:
Request
,
curriculum
:
str
=
Form
(
...
),
grade
:
str
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
unit
:
str
=
Form
(
...
),
concept
:
str
=
Form
(
...
),
is_arabic
:
bool
=
Form
(
...
),
count
:
int
=
Form
(
5
),
host_id
:
str
=
Form
(
...
)
):
if
not
redis_client
:
raise
HTTPException
(
status_code
=
503
,
detail
=
"Service unavailable: Redis connection is not configured."
)
container
=
request
.
app
.
state
.
container
try
:
quiz_questions
=
container
.
agent_service
.
get_dynamic_quiz
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
is_arabic
=
is_arabic
,
count
=
count
)
if
not
quiz_questions
:
raise
HTTPException
(
status_code
=
404
,
detail
=
"Could not generate questions for this topic."
)
room_id
=
str
(
uuid
.
uuid4
())[:
6
]
.
upper
()
room_key
=
get_room_key
(
room_id
)
print
(
f
"Creating room with ID: {room_id}"
)
print
(
f
"Room key: {room_key}"
)
room_state
=
{
"status"
:
"lobby"
,
"host_id"
:
host_id
,
"quiz_questions"
:
json
.
dumps
(
quiz_questions
,
cls
=
DateTimeEncoder
),
"participants"
:
json
.
dumps
({}),
"results"
:
json
.
dumps
([])
}
redis_client
.
hset
(
room_key
,
mapping
=
room_state
)
redis_client
.
expire
(
room_key
,
7200
)
# VERIFY it was created
verify_exists
=
redis_client
.
exists
(
room_key
)
print
(
f
"Room created and verified: {verify_exists}"
)
return
{
"status"
:
"success"
,
"room_id"
:
room_id
}
except
Exception
as
e
:
logger
.
error
(
f
"Error creating quiz room: {e}"
,
exc_info
=
True
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
get
(
"/quiz/room/{room_id}"
)
async
def
get_room_status
(
room_id
:
str
):
print
(
f
"=== GET /quiz/room/{room_id} called ==="
)
room_key
=
get_room_key
(
room_id
)
print
(
f
"Room key: {room_key}"
)
if
not
redis_client
:
print
(
"ERROR: Redis client is None"
)
raise
HTTPException
(
status_code
=
503
,
detail
=
"Service unavailable: Redis connection is not configured."
)
try
:
# Try to ping Redis first
redis_client
.
ping
()
print
(
"Redis ping successful"
)
except
Exception
as
e
:
print
(
f
"Redis ping failed: {e}"
)
raise
HTTPException
(
status_code
=
503
,
detail
=
"Redis connection failed"
)
exists
=
redis_client
.
exists
(
room_key
)
print
(
f
"redis_client.exists('{room_key}') returned: {exists}"
)
if
not
exists
:
# Show what keys DO exist
try
:
all_keys
=
redis_client
.
keys
(
"quiz_room:*"
)
print
(
f
"All quiz room keys in Redis: {all_keys}"
)
except
Exception
as
e
:
print
(
f
"Failed to list keys: {e}"
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Room not found."
)
room_status
=
redis_client
.
hget
(
room_key
,
"status"
)
print
(
f
"Room status retrieved: {room_status}"
)
return
{
"status"
:
"exists"
,
"room_status"
:
room_status
}
@
app
.
websocket
(
"/ws/quiz/room/{room_id}/{student_id}"
)
async
def
websocket_endpoint
(
websocket
:
WebSocket
,
room_id
:
str
,
student_id
:
str
):
room_key
=
get_room_key
(
room_id
)
room_channel
=
get_room_channel
(
room_id
)
print
(
f
"WebSocket connection attempt - Room: {room_id}, Student: {student_id}"
)
# IMPORTANT: Accept connection first
await
manager
.
connect
(
websocket
,
room_id
)
# Then check validity
if
not
redis_client
:
print
(
"ERROR: Redis client not available!"
)
await
websocket
.
close
(
code
=
1003
,
reason
=
"Redis not available"
)
manager
.
disconnect
(
websocket
,
room_id
)
return
room_exists
=
redis_client
.
exists
(
room_key
)
print
(
f
"Room {room_id} exists: {room_exists}"
)
if
not
room_exists
:
print
(
f
"ERROR: Room {room_id} not found in Redis!"
)
await
websocket
.
close
(
code
=
1008
,
reason
=
"Room not found"
)
manager
.
disconnect
(
websocket
,
room_id
)
return
container
=
websocket
.
app
.
state
.
container
pipe
=
redis_client
.
pipeline
()
try
:
# 1. Handle new student joining
student_info
=
container
.
agent_service
.
db_service
.
get_student_info
(
student_id
)
student_name
=
student_info
[
'student_name'
]
if
student_info
else
"Unknown Student"
# Atomically get and update participants
room_data
=
redis_client
.
hgetall
(
room_key
)
participants
=
json
.
loads
(
room_data
.
get
(
"participants"
,
"{}"
))
participants
[
student_id
]
=
{
"name"
:
student_name
,
"status"
:
"connected"
}
pipe
.
hset
(
room_key
,
"participants"
,
json
.
dumps
(
participants
))
pipe
.
execute
()
print
(
f
"Student {student_id} joined room {room_id}. Publishing participant_update..."
)
# Broadcast the update via Redis Pub/Sub
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"participant_update"
,
"participants"
:
participants
,
"host_id"
:
room_data
.
get
(
"host_id"
)
}))
print
(
f
"Published participant_update to channel: {room_channel}"
)
# Main loop to listen for messages from this client
while
True
:
data
=
await
websocket
.
receive_json
()
message_type
=
data
.
get
(
"type"
)
print
(
f
"Received {message_type} from {student_id} in room {room_id}"
)
# Use HGETALL to get the latest state before updating
current_room_data
=
redis_client
.
hgetall
(
room_key
)
host_id
=
current_room_data
.
get
(
"host_id"
)
if
message_type
==
"start_quiz"
and
student_id
==
host_id
:
pipe
.
hset
(
room_key
,
"status"
,
"in_progress"
)
pipe
.
execute
()
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"quiz_started"
,
"questions"
:
json
.
loads
(
current_room_data
.
get
(
"quiz_questions"
,
"[]"
))
}))
print
(
f
"Published quiz_started to channel: {room_channel}"
)
elif
message_type
==
"submit_answers"
:
user_answers
=
data
.
get
(
"answers"
,
{})
time_taken
=
data
.
get
(
"time_seconds"
,
0
)
questions
=
json
.
loads
(
current_room_data
.
get
(
"quiz_questions"
,
"[]"
))
results
=
json
.
loads
(
current_room_data
.
get
(
"results"
,
"[]"
))
participants
=
json
.
loads
(
current_room_data
.
get
(
"participants"
,
"{}"
))
score
=
0
correct_answers
=
{
q
[
'question_text'
]:
q
[
'correct_answer'
]
for
q
in
questions
}
for
q_text
,
u_answer
in
user_answers
.
items
():
if
correct_answers
.
get
(
q_text
)
==
u_answer
:
score
+=
1
results
.
append
({
"student_id"
:
student_id
,
"name"
:
student_name
,
"score"
:
score
,
"time_seconds"
:
time_taken
})
results
.
sort
(
key
=
lambda
x
:
(
-
x
[
'score'
],
x
[
'time_seconds'
]))
participants
[
student_id
][
"status"
]
=
"finished"
all_finished
=
all
(
p
[
"status"
]
==
"finished"
for
p
in
participants
.
values
())
if
all_finished
:
pipe
.
hset
(
room_key
,
"status"
,
"finished"
)
pipe
.
hset
(
room_key
,
"results"
,
json
.
dumps
(
results
))
pipe
.
hset
(
room_key
,
"participants"
,
json
.
dumps
(
participants
))
pipe
.
execute
()
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"results_update"
,
"results"
:
results
,
"is_final"
:
all_finished
}))
except
WebSocketDisconnect
:
print
(
f
"Student {student_id} disconnected from room {room_id}"
)
# Handle student leaving
current_participants
=
json
.
loads
(
redis_client
.
hget
(
room_key
,
"participants"
)
or
"{}"
)
if
student_id
in
current_participants
:
del
current_participants
[
student_id
]
redis_client
.
hset
(
room_key
,
"participants"
,
json
.
dumps
(
current_participants
))
# Get the host_id before broadcasting
room_data
=
redis_client
.
hgetall
(
room_key
)
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"participant_update"
,
"participants"
:
current_participants
,
"host_id"
:
room_data
.
get
(
"host_id"
)
}))
except
Exception
as
e
:
print
(
f
"WebSocket error for {student_id} in room {room_id}: {e}"
)
finally
:
manager
.
disconnect
(
websocket
,
room_id
)
print
(
f
"Cleaned up connection for {student_id} in room {room_id}"
)
@
app
.
get
(
"/live-quiz"
)
async
def
serve_live_quiz_interface
():
"""Serve the live quiz HTML file"""
try
:
# Check for the file in a 'static' folder first
static_file
=
Path
(
"static/live_quiz_interface.html"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
# Fallback to the root directory
current_file
=
Path
(
"live_quiz_interface.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Live quiz interface not found"
)
except
Exception
as
e
:
logger
.
error
(
f
"Error serving live quiz interface: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
app
.
options
(
"/get-audio-response"
)
async
def
audio_response_options
():
"""Handle preflight CORS requests for audio response endpoint"""
return
Response
(
status_code
=
204
,
headers
=
{
"Access-Control-Allow-Origin"
:
"*"
,
"Access-Control-Allow-Methods"
:
"GET, OPTIONS"
,
"Access-Control-Allow-Headers"
:
"*"
,
"Access-Control-Expose-Headers"
:
"X-Response-Text"
})
@
app
.
get
(
"/health"
)
async
def
health_check
(
request
:
Request
):
"""Health check endpoint using the shared container."""
container
=
request
.
app
.
state
.
container
try
:
health_status
=
container
.
health_service
.
get_health_status
()
health_status
.
update
({
"openai_service_status"
:
"available"
if
container
.
openai_service
.
is_available
()
else
"unavailable"
,
"agent_service_status"
:
"available"
if
container
.
agent_service
.
is_available
()
else
"unavailable"
,
"minio_endpoint"
:
container
.
config
.
minio_endpoint
,
"minio_bucket"
:
container
.
config
.
minio_bucket
})
return
health_status
except
Exception
as
e
:
print
(
f
"Health check error: {e}"
)
return
{
"status"
:
"error"
,
"message"
:
str
(
e
)}
# Agent management endpoints
@
app
.
get
(
"/conversation/stats"
)
async
def
get_conversation_stats
(
request
:
Request
,
student_id
:
str
=
"student_001"
):
container
=
request
.
app
.
state
.
container
try
:
return
container
.
chat_service
.
get_agent_stats
(
student_id
)
except
Exception
as
e
:
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
post
(
"/conversation/clear"
)
async
def
clear_conversation
(
request
:
Request
,
student_id
:
str
=
Form
(
"student_001"
)):
container
=
request
.
app
.
state
.
container
try
:
return
container
.
chat_service
.
clear_conversation
(
student_id
)
except
Exception
as
e
:
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
post
(
"/agent/system-prompt"
)
async
def
set_system_prompt
(
req_body
:
dict
,
request
:
Request
):
container
=
request
.
app
.
state
.
container
try
:
prompt
=
req_body
.
get
(
"prompt"
,
""
)
if
not
prompt
:
raise
HTTPException
(
status_code
=
400
,
detail
=
"System prompt cannot be empty"
)
return
container
.
chat_service
.
set_system_prompt
(
prompt
)
except
Exception
as
e
:
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
get
(
"/agent/system-prompt"
)
async
def
get_system_prompt
(
request
:
Request
):
container
=
request
.
app
.
state
.
container
try
:
return
{
"system_prompt"
:
container
.
agent_service
.
system_prompt
,
"status"
:
"success"
}
except
Exception
as
e
:
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
get
(
"/conversation/export"
)
async
def
export_conversation
(
request
:
Request
,
student_id
:
str
=
"student_001"
):
container
=
request
.
app
.
state
.
container
try
:
history
=
container
.
agent_service
.
export_conversation
(
student_id
)
return
{
"student_id"
:
student_id
,
"messages"
:
history
,
"total_messages"
:
len
(
history
)
}
except
Exception
as
e
:
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
post
(
"/conversation/import"
)
async
def
import_conversation
(
req_body
:
dict
,
request
:
Request
):
container
=
request
.
app
.
state
.
container
try
:
student_id
=
req_body
.
get
(
"student_id"
,
"student_001"
)
messages
=
req_body
.
get
(
"messages"
,
[])
if
not
messages
:
raise
HTTPException
(
status_code
=
400
,
detail
=
"Messages list cannot be empty"
)
container
.
agent_service
.
import_conversation
(
messages
,
student_id
)
return
{
"status"
:
"success"
,
"message"
:
f
"Imported {len(messages)} messages"
}
except
Exception
as
e
:
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
app
.
get
(
"/debug/test-response"
)
async
def
debug_test_response
():
"""Debug endpoint to test response generation"""
test_text
=
"This is a test response"
encoded_text
=
base64
.
b64encode
(
test_text
.
encode
(
'utf-8'
))
.
decode
(
'utf-8'
)
return
Response
(
content
=
b
"test audio data"
,
media_type
=
"audio/mpeg"
,
headers
=
{
"X-Response-Text"
:
encoded_text
,
"Access-Control-Expose-Headers"
:
"X-Response-Text"
})
@
app
.
get
(
"/"
)
async
def
serve_index
():
"""Serves the main navigation hub HTML file."""
try
:
# Check for the file in a 'static' folder first
static_file
=
Path
(
"static/index.html"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
# Fallback to the root directory
current_file
=
Path
(
"index.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Index page not found"
)
except
Exception
as
e
:
logger
.
error
(
f
"Error serving index page: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
app
.
get
(
"/api-info"
)
async
def
root
():
"""Root endpoint with API info"""
return
{
"service"
:
"Unified Chat API with Local Agent"
,
"version"
:
"2.2.0-lifespan"
,
"status"
:
"running"
}
return
app
return
app
# Application entry point
app
=
create_app
()
app
=
create_app
()
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
# For development
import
uvicorn
uvicorn
.
run
(
uvicorn
.
run
(
"main:app"
,
"main:app"
,
host
=
"0.0.0.0"
,
host
=
"0.0.0.0"
,
...
...
self_hosted_env/voice_agent/routers/chat.py
0 → 100644
View file @
396e5e31
from
fastapi
import
APIRouter
,
UploadFile
,
File
,
Form
,
HTTPException
,
Depends
,
Response
from
typing
import
Optional
from
starlette.requests
import
Request
router
=
APIRouter
(
tags
=
[
"Chat"
])
@
router
.
post
(
"/chat"
)
async
def
chat_handler
(
request
:
Request
,
file
:
Optional
[
UploadFile
]
=
File
(
None
),
text
:
Optional
[
str
]
=
Form
(
None
),
student_id
:
str
=
Form
(
"student_001"
),
game_context
:
Optional
[
str
]
=
Form
(
None
)
):
"""Handles incoming chat messages using the shared container instance."""
container
=
request
.
app
.
state
.
container
try
:
if
not
student_id
.
strip
():
raise
HTTPException
(
status_code
=
400
,
detail
=
"Student ID is required"
)
result
=
container
.
chat_service
.
process_message
(
student_id
=
student_id
,
file
=
file
,
text
=
text
,
game_context
=
game_context
)
return
result
except
Exception
as
e
:
print
(
f
"Error in chat handler: {str(e)}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Chat processing error: {str(e)}"
)
@
router
.
get
(
"/get-audio-response"
)
async
def
get_audio_response
(
request
:
Request
,
student_id
:
str
=
"student_001"
):
"""Fetches the agent's text and audio response using the shared container."""
container
=
request
.
app
.
state
.
container
try
:
result
=
container
.
response_service
.
get_agent_response
(
student_id
=
student_id
)
if
hasattr
(
result
,
'status_code'
):
return
result
# This should be unreachable if response_service always returns a Response object
return
result
except
Exception
as
e
:
print
(
f
"Error getting audio response: {str(e)}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Audio response error: {str(e)}"
)
@
router
.
options
(
"/get-audio-response"
)
async
def
audio_response_options
():
"""Handle preflight CORS requests for audio response endpoint"""
return
Response
(
status_code
=
204
,
headers
=
{
"Access-Control-Allow-Origin"
:
"*"
,
"Access-Control-Allow-Methods"
:
"GET, OPTIONS"
,
"Access-Control-Allow-Headers"
:
"*"
,
"Access-Control-Expose-Headers"
:
"X-Response-Text"
})
self_hosted_env/voice_agent/routers/curriculum.py
0 → 100644
View file @
396e5e31
from
fastapi
import
APIRouter
,
UploadFile
,
File
,
Form
,
BackgroundTasks
from
fastapi.responses
import
FileResponse
from
fastapi
import
HTTPException
import
os
from
pathlib
import
Path
import
sys
sys
.
path
.
append
(
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
)))
from
process_pdf_pipline
import
process_pdf_curriculum_in_background
router
=
APIRouter
(
tags
=
[
"Curriculum"
])
@
router
.
get
(
"/curriculum-upload"
)
async
def
serve_curriculum_upload
():
"""Serve the curriculum upload HTML file"""
try
:
static_file
=
Path
(
"static/curriculum_PDF_uploader.html"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
current_file
=
Path
(
"curriculum_PDF_uploader.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Curriculum upload interface not found"
)
except
Exception
as
e
:
print
(
f
"Error serving curriculum upload interface: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
router
.
post
(
"/process-curriculum"
,
status_code
=
202
)
async
def
process_curriculum_webhook
(
background_tasks
:
BackgroundTasks
,
grade
:
int
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
file
:
UploadFile
=
File
(
...
)
):
"""
Accepts a PDF and adds a background task to process it.
Returns immediately.
"""
pdf_bytes
=
await
file
.
read
()
background_tasks
.
add_task
(
process_pdf_curriculum_in_background
,
pdf_bytes
,
file
.
filename
,
grade
,
subject
)
# Return immediately to the user
return
{
"status"
:
"processing_started"
,
"message"
:
"The curriculum is being processed in the background."
}
self_hosted_env/voice_agent/routers/frontend.py
0 → 100644
View file @
396e5e31
from
fastapi
import
APIRouter
,
HTTPException
from
fastapi.responses
import
FileResponse
from
pathlib
import
Path
router
=
APIRouter
(
tags
=
[
"Frontend"
])
def
serve_html
(
filename
:
str
):
try
:
static_file
=
Path
(
f
"static/{filename}"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
current_file
=
Path
(
filename
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
f
"{filename} not found"
)
except
Exception
as
e
:
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
router
.
get
(
"/"
)
async
def
serve_homepage
():
"""Serve the main homepage HTML file"""
return
serve_html
(
"index.html"
)
@
router
.
get
(
"/live-quiz"
)
async
def
serve_live_quiz_interface
():
"""Serve the live quiz HTML file"""
return
serve_html
(
"live_quiz_interface.html"
)
@
router
.
get
(
"/quiz-interface"
)
async
def
serve_quiz_interface
():
"""Serve the quiz interface HTML file"""
return
serve_html
(
"dynamic_quiz_interface.html"
)
@
router
.
get
(
"/chat-interface"
)
async
def
serve_chat_interface
():
"""Serve the chat interface HTML file"""
return
serve_html
(
"audio-recorder.html"
)
@
router
.
get
(
"/test-yourself"
)
async
def
serve_test_yourself_interface
():
"""Serve the interactive 'Test Yourself' HTML file"""
return
serve_html
(
"test_yourself_interface.html"
)
@
router
.
get
(
"/curriculum-upload"
)
async
def
serve_curriculum_upload
():
"""Serve the curriculum upload HTML file"""
return
serve_html
(
"curriculum_PDF_uploader.html"
)
\ No newline at end of file
self_hosted_env/voice_agent/routers/multiplayer.py
0 → 100644
View file @
396e5e31
from
fastapi
import
APIRouter
,
WebSocket
,
WebSocketDisconnect
,
Form
,
HTTPException
,
Depends
from
starlette.requests
import
Request
import
json
import
uuid
import
logging
import
os
import
sys
# Ensure path is correct for imports
sys
.
path
.
append
(
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
)))
from
services
import
WebSocketManager
,
redis_client
,
get_room_key
,
get_room_channel
from
utils
import
DateTimeEncoder
router
=
APIRouter
(
tags
=
[
"Live Quiz"
])
logger
=
logging
.
getLogger
(
"uvicorn.error"
)
@
router
.
post
(
"/quiz/room/create"
)
async
def
create_quiz_room
(
request
:
Request
,
curriculum
:
str
=
Form
(
...
),
grade
:
str
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
unit
:
str
=
Form
(
...
),
concept
:
str
=
Form
(
...
),
is_arabic
:
bool
=
Form
(
...
),
count
:
int
=
Form
(
5
),
host_id
:
str
=
Form
(
...
)
):
if
not
redis_client
:
raise
HTTPException
(
status_code
=
503
,
detail
=
"Service unavailable: Redis connection is not configured."
)
container
=
request
.
app
.
state
.
container
try
:
quiz_questions
=
container
.
agent_service
.
get_dynamic_quiz
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
is_arabic
=
is_arabic
,
count
=
count
)
if
not
quiz_questions
:
raise
HTTPException
(
status_code
=
404
,
detail
=
"Could not generate questions for this topic."
)
room_id
=
str
(
uuid
.
uuid4
())[:
6
]
.
upper
()
room_key
=
get_room_key
(
room_id
)
logger
.
info
(
f
"Creating room with ID: {room_id}"
)
room_state
=
{
"status"
:
"lobby"
,
"host_id"
:
host_id
,
"quiz_questions"
:
json
.
dumps
(
quiz_questions
,
cls
=
DateTimeEncoder
),
"participants"
:
json
.
dumps
({}),
"results"
:
json
.
dumps
([])
}
redis_client
.
hset
(
room_key
,
mapping
=
room_state
)
redis_client
.
expire
(
room_key
,
7200
)
return
{
"status"
:
"success"
,
"room_id"
:
room_id
}
except
Exception
as
e
:
logger
.
error
(
f
"Error creating quiz room: {e}"
,
exc_info
=
True
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
router
.
get
(
"/quiz/room/{room_id}"
)
async
def
get_room_status
(
room_id
:
str
):
room_key
=
get_room_key
(
room_id
)
if
not
redis_client
:
raise
HTTPException
(
status_code
=
503
,
detail
=
"Redis connection is not configured."
)
if
not
redis_client
.
exists
(
room_key
):
raise
HTTPException
(
status_code
=
404
,
detail
=
"Room not found."
)
room_status
=
redis_client
.
hget
(
room_key
,
"status"
)
return
{
"status"
:
"exists"
,
"room_status"
:
room_status
}
@
router
.
websocket
(
"/ws/quiz/room/{room_id}/{student_id}"
)
async
def
websocket_endpoint
(
websocket
:
WebSocket
,
room_id
:
str
,
student_id
:
str
):
# Retrieve manager from app state
manager
:
WebSocketManager
=
websocket
.
app
.
state
.
websocket_manager
room_key
=
get_room_key
(
room_id
)
room_channel
=
get_room_channel
(
room_id
)
logger
.
info
(
f
"WebSocket connection attempt - Room: {room_id}, Student: {student_id}"
)
# 1. Accept Connection
await
manager
.
connect
(
websocket
,
room_id
)
# 2. Validate Dependencies
if
not
redis_client
:
logger
.
error
(
"Redis client not available during WebSocket connection"
)
await
websocket
.
close
(
code
=
1003
,
reason
=
"Redis not available"
)
manager
.
disconnect
(
websocket
,
room_id
)
return
if
not
redis_client
.
exists
(
room_key
):
logger
.
warning
(
f
"Room {room_id} not found during WebSocket connection"
)
await
websocket
.
close
(
code
=
1008
,
reason
=
"Room not found"
)
manager
.
disconnect
(
websocket
,
room_id
)
return
container
=
websocket
.
app
.
state
.
container
pipe
=
redis_client
.
pipeline
()
try
:
# 3. Update Participants in DB
logger
.
info
(
f
"Fetching student info for {student_id}"
)
student_info
=
container
.
agent_service
.
db_service
.
get_student_info
(
student_id
)
student_name
=
student_info
[
'student_name'
]
if
student_info
else
"Unknown Student"
room_data
=
redis_client
.
hgetall
(
room_key
)
participants
=
json
.
loads
(
room_data
.
get
(
"participants"
,
"{}"
))
participants
[
student_id
]
=
{
"name"
:
student_name
,
"status"
:
"connected"
}
pipe
.
hset
(
room_key
,
"participants"
,
json
.
dumps
(
participants
))
pipe
.
execute
()
# 4. Broadcast Update
logger
.
info
(
f
"Student {student_id} joined room {room_id}. Publishing update to {room_channel}"
)
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"participant_update"
,
"participants"
:
participants
,
"host_id"
:
room_data
.
get
(
"host_id"
)
}))
# 5. Message Loop
while
True
:
data
=
await
websocket
.
receive_json
()
message_type
=
data
.
get
(
"type"
)
logger
.
info
(
f
"Received {message_type} from {student_id} in room {room_id}"
)
current_room_data
=
redis_client
.
hgetall
(
room_key
)
host_id
=
current_room_data
.
get
(
"host_id"
)
if
message_type
==
"start_quiz"
and
student_id
==
host_id
:
pipe
.
hset
(
room_key
,
"status"
,
"in_progress"
)
pipe
.
execute
()
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"quiz_started"
,
"questions"
:
json
.
loads
(
current_room_data
.
get
(
"quiz_questions"
,
"[]"
))
}))
elif
message_type
==
"submit_answers"
:
user_answers
=
data
.
get
(
"answers"
,
{})
time_taken
=
data
.
get
(
"time_seconds"
,
0
)
questions
=
json
.
loads
(
current_room_data
.
get
(
"quiz_questions"
,
"[]"
))
results
=
json
.
loads
(
current_room_data
.
get
(
"results"
,
"[]"
))
participants
=
json
.
loads
(
current_room_data
.
get
(
"participants"
,
"{}"
))
score
=
0
correct_answers
=
{
q
[
'question_text'
]:
q
[
'correct_answer'
]
for
q
in
questions
}
for
q_text
,
u_answer
in
user_answers
.
items
():
if
correct_answers
.
get
(
q_text
)
==
u_answer
:
score
+=
1
results
.
append
({
"student_id"
:
student_id
,
"name"
:
student_name
,
"score"
:
score
,
"time_seconds"
:
time_taken
})
results
.
sort
(
key
=
lambda
x
:
(
-
x
[
'score'
],
x
[
'time_seconds'
]))
participants
[
student_id
][
"status"
]
=
"finished"
all_finished
=
all
(
p
[
"status"
]
==
"finished"
for
p
in
participants
.
values
())
if
all_finished
:
pipe
.
hset
(
room_key
,
"status"
,
"finished"
)
pipe
.
hset
(
room_key
,
"results"
,
json
.
dumps
(
results
))
pipe
.
hset
(
room_key
,
"participants"
,
json
.
dumps
(
participants
))
pipe
.
execute
()
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"results_update"
,
"results"
:
results
,
"is_final"
:
all_finished
}))
except
WebSocketDisconnect
:
logger
.
info
(
f
"Student {student_id} disconnected from room {room_id}"
)
current_participants
=
json
.
loads
(
redis_client
.
hget
(
room_key
,
"participants"
)
or
"{}"
)
if
student_id
in
current_participants
:
del
current_participants
[
student_id
]
redis_client
.
hset
(
room_key
,
"participants"
,
json
.
dumps
(
current_participants
))
room_data
=
redis_client
.
hgetall
(
room_key
)
redis_client
.
publish
(
room_channel
,
json
.
dumps
({
"type"
:
"participant_update"
,
"participants"
:
current_participants
,
"host_id"
:
room_data
.
get
(
"host_id"
)
}))
except
Exception
as
e
:
logger
.
error
(
f
"WebSocket error for {student_id} in room {room_id}: {e}"
,
exc_info
=
True
)
finally
:
manager
.
disconnect
(
websocket
,
room_id
)
logger
.
info
(
f
"Cleaned up connection for {student_id} in room {room_id}"
)
\ No newline at end of file
self_hosted_env/voice_agent/routers/quiz.py
0 → 100644
View file @
396e5e31
from
fastapi
import
APIRouter
,
Form
,
HTTPException
,
Request
,
Depends
from
fastapi.responses
import
FileResponse
from
typing
import
Optional
import
logging
import
os
from
pathlib
import
Path
import
sys
sys
.
path
.
append
(
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
)))
from
schemas.mcq
import
MCQListResponse
,
QuizResponse
,
QuizSubmission
router
=
APIRouter
(
tags
=
[
"Quiz & MCQ"
])
logger
=
logging
.
getLogger
(
"uvicorn.error"
)
@
router
.
post
(
"/mcq/generate"
)
async
def
generate_mcqs_handler
(
request
:
Request
,
curriculum
:
str
=
Form
(
...
),
grade
:
str
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
unit
:
str
=
Form
(
...
),
concept
:
str
=
Form
(
...
),
count
:
int
=
Form
(
5
),
is_arabic
:
bool
=
Form
(
False
),
):
"""
Generates and stores a new set of MCQs.
NOTE: This endpoint intentionally returns the FULL question object,
including curriculum, grade, etc., as it might be useful for the client
that just initiated the generation. The GET endpoints will be filtered.
"""
container
=
request
.
app
.
state
.
container
try
:
generated_questions
=
container
.
agent_service
.
generate_and_store_mcqs
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
num_questions
=
count
,
is_arabic
=
is_arabic
,
)
return
{
"status"
:
"success"
,
"message"
:
f
"Successfully generated and stored {len(generated_questions)} MCQs."
,
"questions"
:
generated_questions
}
except
HTTPException
as
e
:
raise
e
except
Exception
as
e
:
logger
.
error
(
f
"Error in generate_mcqs_handler: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
# --- STEP 2: UPDATE THE /mcq ENDPOINT SIGNATURE ---
@
router
.
get
(
"/mcq"
,
response_model
=
MCQListResponse
)
async
def
get_mcqs_handler
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
,
subject
:
str
,
unit
:
str
,
concept
:
str
,
is_arabic
:
bool
,
limit
:
Optional
[
int
]
=
None
):
"""
Retrieves existing MCQs, filtered to the 11-field response model.
"""
container
=
request
.
app
.
state
.
container
try
:
# The service layer still returns the full objects from the DB
questions_from_db
=
container
.
agent_service
.
pgvector
.
get_mcqs
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
is_arabic
=
is_arabic
,
limit
=
limit
)
# FastAPI will automatically filter `questions_from_db` to match the model
return
{
"status"
:
"success"
,
"count"
:
len
(
questions_from_db
),
"questions"
:
questions_from_db
}
except
Exception
as
e
:
logger
.
error
(
f
"Error in get_mcqs_handler: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
# --- STEP 3: UPDATE THE /quiz/dynamic ENDPOINT SIGNATURE ---
@
router
.
post
(
"/quiz/dynamic"
,
response_model
=
QuizResponse
)
async
def
get_dynamic_quiz_handler
(
request
:
Request
,
curriculum
:
str
=
Form
(
...
),
grade
:
str
=
Form
(
...
),
subject
:
str
=
Form
(
...
),
unit
:
str
=
Form
(
...
),
concept
:
str
=
Form
(
...
),
is_arabic
:
bool
=
Form
(
...
),
count
:
int
=
Form
(
5
)
):
"""
Generates a dynamic quiz, filtered to the 11-field response model.
"""
container
=
request
.
app
.
state
.
container
try
:
# The service layer still returns the full objects
quiz_questions_full
=
container
.
agent_service
.
get_dynamic_quiz
(
curriculum
=
curriculum
,
grade
=
grade
,
subject
=
subject
,
unit
=
unit
,
concept
=
concept
,
is_arabic
=
is_arabic
,
count
=
count
)
# FastAPI will automatically filter `quiz_questions_full` to match the model
return
{
"status"
:
"success"
,
"message"
:
f
"Successfully generated a dynamic quiz with {len(quiz_questions_full)} questions."
,
"quiz"
:
quiz_questions_full
}
except
HTTPException
as
e
:
raise
e
except
Exception
as
e
:
logger
.
error
(
f
"Error in get_dynamic_quiz_handler: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
str
(
e
))
@
router
.
post
(
"/quiz/grade"
)
async
def
grade_quiz_handler
(
submission
:
QuizSubmission
):
"""
Receives a quiz submission, grades it, and returns the results.
"""
correct_answers_count
=
0
results
=
[]
# Create a simple lookup map for correct answers from the full question objects
correct_answer_map
=
{
q
[
'question_text'
]:
q
[
'correct_answer'
]
for
q
in
submission
.
questions
}
for
question_text
,
user_answer
in
submission
.
answers
.
items
():
correct_answer
=
correct_answer_map
.
get
(
question_text
)
is_correct
=
(
user_answer
==
correct_answer
)
if
is_correct
:
correct_answers_count
+=
1
results
.
append
({
"question_text"
:
question_text
,
"user_answer"
:
user_answer
,
"correct_answer"
:
correct_answer
,
"is_correct"
:
is_correct
})
total_questions
=
len
(
submission
.
questions
)
percentage
=
(
correct_answers_count
/
total_questions
)
*
100
if
total_questions
>
0
else
0
return
{
"status"
:
"success"
,
"score"
:
correct_answers_count
,
"total_questions"
:
total_questions
,
"percentage"
:
round
(
percentage
,
2
),
"results"
:
results
}
@
router
.
get
(
"/test-yourself"
)
async
def
serve_test_yourself_interface
():
"""Serve the interactive 'Test Yourself' HTML file"""
try
:
# Check for the file in a 'static' folder first
static_file
=
Path
(
"static/test_yourself_interface.html"
)
if
static_file
.
exists
():
return
FileResponse
(
static_file
)
# Fallback to the root directory
current_file
=
Path
(
"test_yourself_interface.html"
)
if
current_file
.
exists
():
return
FileResponse
(
current_file
)
raise
HTTPException
(
status_code
=
404
,
detail
=
"Interactive quiz interface not found"
)
except
Exception
as
e
:
print
(
f
"Error serving 'Test Yourself' interface: {e}"
)
raise
HTTPException
(
status_code
=
500
,
detail
=
f
"Error serving interface: {str(e)}"
)
@
router
.
get
(
"/quiz/options/curricula"
)
async
def
get_curricula_options
(
request
:
Request
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_curricula_from_structure
()
return
{
"options"
:
options
}
@
router
.
get
(
"/quiz/options/grades"
)
async
def
get_grades_options
(
request
:
Request
,
curriculum
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_grades_from_structure
(
curriculum
)
return
{
"options"
:
options
}
@
router
.
get
(
"/quiz/options/subjects"
)
async
def
get_subjects_options
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_subjects_from_structure
(
curriculum
,
grade
)
return
{
"options"
:
options
}
@
router
.
get
(
"/quiz/options/units"
)
async
def
get_units_options
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
,
subject
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_units_from_structure
(
curriculum
,
grade
,
subject
)
return
{
"options"
:
options
}
@
router
.
get
(
"/quiz/options/concepts"
)
async
def
get_concepts_options
(
request
:
Request
,
curriculum
:
str
,
grade
:
str
,
subject
:
str
,
unit
:
str
):
container
=
request
.
app
.
state
.
container
options
=
container
.
agent_service
.
pgvector
.
get_distinct_concepts_from_structure
(
curriculum
,
grade
,
subject
,
unit
)
return
{
"options"
:
options
}
\ No newline at end of file
self_hosted_env/voice_agent/routers/system.py
0 → 100644
View file @
396e5e31
from
fastapi
import
Request
from
fastapi
import
APIRouter
import
logging
router
=
APIRouter
(
tags
=
[
"System"
])
logger
=
logging
.
getLogger
(
"uvicorn.error"
)
@
router
.
get
(
"/health"
)
async
def
health_check
(
request
:
Request
):
"""Health check endpoint using the shared container."""
container
=
request
.
app
.
state
.
container
try
:
health_status
=
container
.
health_service
.
get_health_status
()
health_status
.
update
({
"openai_service_status"
:
"available"
if
container
.
openai_service
.
is_available
()
else
"unavailable"
,
"agent_service_status"
:
"available"
if
container
.
agent_service
.
is_available
()
else
"unavailable"
,
"minio_endpoint"
:
container
.
config
.
minio_endpoint
,
"minio_bucket"
:
container
.
config
.
minio_bucket
})
return
health_status
except
Exception
as
e
:
print
(
f
"Health check error: {e}"
)
return
{
"status"
:
"error"
,
"message"
:
str
(
e
)}
@
router
.
get
(
"/api-info"
)
async
def
root
():
"""Root endpoint with API info"""
return
{
"service"
:
"Unified Chat API with Local Agent"
,
"version"
:
"2.2.0-lifespan"
,
"status"
:
"running"
}
self_hosted_env/voice_agent/services/redis_client.py
View file @
396e5e31
...
@@ -3,7 +3,7 @@ import redis.asyncio as aioredis
...
@@ -3,7 +3,7 @@ import redis.asyncio as aioredis
import
os
import
os
import
asyncio
import
asyncio
from
.websocket_service
import
WebSocketManager
from
.websocket_service
import
WebSocketManager
import
json
# Synchronous client for regular operations
# Synchronous client for regular operations
try
:
try
:
redis_host
=
os
.
getenv
(
"REDIS_HOST"
,
"localhost"
)
redis_host
=
os
.
getenv
(
"REDIS_HOST"
,
"localhost"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment