handle multi workers

parent 5c555257
......@@ -33,6 +33,16 @@ services:
timeout: 20s
retries: 3
redis:
image: redis:latest
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 5
voice-agent:
build: ./voice_agent
ports:
......@@ -51,10 +61,14 @@ services:
DB_HOST: "${DB_HOST}"
TTS_PROVIDER: "${TTS_PROVIDER}"
CUSTOM_TTS_URL: "${CUSTOM_TTS_URL}"
REDIS_HOST: "redis"
REDIS_PORT: "6379"
depends_on:
- minio
- postgres
- redis
volumes:
pgdata:
miniodata:
\ No newline at end of file
miniodata:
redisdata:
\ No newline at end of file
......@@ -144,12 +144,12 @@ def create_app() -> FastAPI:
raise HTTPException(status_code=500, detail=f"Chat processing error: {str(e)}")
@app.get("/get-audio-response")
async def get_audio_response():
async def get_audio_response(student_id: str = "student_001"):
"""Fetches the agent's text and audio response with proper CORS headers."""
try:
print("Getting audio response...")
result = container.response_service.get_agent_response()
result = container.response_service.get_agent_response(student_id=student_id)
if hasattr(result, 'status_code'):
# This is already a Response object from response_service
print(f"Response headers: {dict(result.headers)}")
......
......@@ -12,3 +12,4 @@ pandas
python-dotenv
httpx
langdetect
redis
\ No newline at end of file
......@@ -25,6 +25,7 @@ GENERAL_CHAT_CONTEXTS: Dict[StudentNationality, str] = {
- لو الطِّفل سأل: "أنا مين؟" أو "إنت عارف أنا مين؟" → رد باستخدام بيانات الطالب اللي فوق (الاسم + الصف)، مثلاً:
"أيوه طبعًا، إنت (اسم الطالب بالعربي) في سنة (سنة الطالب بالعربي). عايز نكمّل النهارده في موضوع معين في العلوم؟"
- لو السُّؤال له علاقة بالعلوم أو بالمنهج → جاوب عليه.
- لو الطفل سلم عليك او رحب بيك باي طريقة قوله اهلا بيك يا (اسم الطالب بالعربي) انا هنا عشان اساعدك في العلوم
- لو السُّؤال دردشة عامة أو خارج المنهج والعلوم وملوش علاقة بهوية الطالب أو هويتك → متردش على الكلام نهائيًا، وقوله الرد دا:
"الوقت دا للمذاكرة في العلوم، أنا هنا عشان أساعدك في العلوم وبس."
وبعدها اسأله بطريقة ودودة لو يحب يختار موضوع في العلوم تتكلموا فيه.
......@@ -39,12 +40,14 @@ GENERAL_CHAT_CONTEXTS: Dict[StudentNationality, str] = {
السؤال: "{query}"
رد باللهجة السعوديّة الطبيعية، خلّ الرد بسيط وودود.
- إذا سأل الطالب: "إنت مين؟" → رد بالهوية المخصصة لك (أنا عَنان...).
- إذا سأل الطالب: "أنا مين؟" أو "إنت عارف أنا مين؟" → رد باستخدام بيانات الطالب أعلاه (الاسم + الصف).
- إذا كان السؤال له علاقة بالعلوم أو بالمنهج → جاوب عليه.
- إذا كان السؤال دردشة عامة أو خارج المنهج → قوله الوقت هذا مخصّص للمذاكرة في العلوم، واسأله بطريقة ودودة لو يحب يختار موضوع في العلوم.
- لو الطِّفل سأل: "إنت مين؟" → رد بالهوية المخصصة ليك (أنا عَنان...).
- لو الطِّفل سأل: "أنا مين؟" أو "إنت عارف أنا مين؟" → رد باستخدام بيانات الطالب اللي فوق (الاسم + الصف)، مثلاً:
"أيوه طبعًا، إنت (اسم الطالب بالعربي) في سنة (سنة الطالب بالعربي). عايز نكمّل النهارده في موضوع معين في العلوم؟"
- لو السُّؤال له علاقة بالعلوم أو بالمنهج → جاوب عليه.
- لو الطفل سلم عليك او رحب بيك باي طريقة قوله اهلا بيك يا (اسم الطالب بالعربي) انا هنا عشان اساعدك في العلوم
- لو السُّؤال دردشة عامة أو خارج المنهج والعلوم وملوش علاقة بهوية الطالب أو هويتك → متردش على الكلام نهائيًا، وقوله الرد دا:
"الوقت دا للمذاكرة في العلوم، أنا هنا عشان أساعدك في العلوم وبس."
وبعدها اسأله بطريقة ودودة لو يحب يختار موضوع في العلوم تتكلموا فيه.
"""
}
......
......@@ -35,7 +35,7 @@ class ChatService:
def process_message(self, student_id: str, file: Optional[UploadFile] = None, text: Optional[str] = None, game_context: Optional[str] = None):
"""Process message and generate text and audio response."""
self.response_manager.clear_response()
self.response_manager.clear_response(student_id) # Clear any existing response
try:
if file and file.filename:
result = self.handlers[MessageType.AUDIO].handle(file=file)
......@@ -58,6 +58,7 @@ class ChatService:
audio_data = self._generate_and_upload_audio(agent_response_text, student_id)
self.response_manager.store_response(
student_id = student_id,
text=agent_response_text,
audio_filename=audio_data.get("filename"),
audio_bytes=audio_data.get("bytes")
......
import redis
import os
try:
redis_host = os.getenv("REDIS_HOST", "localhost")
redis_port = int(os.getenv("REDIS_PORT", 6379))
# decode_responses=True makes the client return strings instead of bytes
redis_client = redis.Redis(host=redis_host, port=redis_port, db=0, decode_responses=True)
redis_client.ping()
print(f"Successfully connected to Redis at {redis_host}:{redis_port}")
except redis.exceptions.ConnectionError as e:
print(f"FATAL: Could not connect to Redis: {e}")
redis_client = None
\ No newline at end of file
# services/response_manager.py
import json
import base64
from typing import Optional, Dict
import time
from typing import Optional
# Import the Redis client that all workers will share
from .redis_client import redis_client
class ResponseManager:
"""
Manages response state in a central Redis store, keyed by student_id.
This solution is safe for multiple workers.
"""
def __init__(self):
# Initialize with the new 'audio_bytes' key
self._latest_response = {"text": None, "audio_filename": None, "audio_bytes": None, "timestamp": 0}
"""Initializes by connecting to the shared Redis client."""
if redis_client is None:
raise ConnectionError("ResponseManager requires a valid Redis connection. Check your REDIS_HOST/PORT environment variables.")
self.redis = redis_client
self.ttl_seconds = 600 # Responses will expire after 10 minutes
def _get_key(self, student_id: str) -> str:
"""Creates a consistent key for Redis to avoid conflicts."""
return f"student_response:{student_id}"
# --- MODIFIED: Added the 'audio_bytes' parameter ---
def store_response(self, text: str, audio_filename: Optional[str] = None, audio_bytes: Optional[bytes] = None) -> None:
"""Store response with text, audio filename, and raw audio bytes."""
self._latest_response = {
def store_response(self, student_id: str, text: str, audio_filename: Optional[str] = None, audio_bytes: Optional[bytes] = None) -> None:
"""Stores a response for a specific student_id in Redis."""
key = self._get_key(student_id)
# Encode binary audio data into a string (Base64) to store it in JSON
encoded_audio = base64.b64encode(audio_bytes).decode('utf-8') if audio_bytes else None
payload = {
"text": text,
"audio_filename": audio_filename,
"audio_bytes": audio_bytes, # <-- Store the bytes
"timestamp": time.time()
"audio_bytes_b64": encoded_audio
}
# Convert the dictionary to a JSON string and store it in Redis with an expiration time
self.redis.setex(key, self.ttl_seconds, json.dumps(payload))
def get_response(self, student_id: str) -> Dict:
"""
Atomically gets the response for a student and removes it from Redis
to ensure it's claimed only once.
"""
key = self._get_key(student_id)
# Use a pipeline to get and delete the key in a single, atomic operation
pipe = self.redis.pipeline()
pipe.get(key)
pipe.delete(key)
results = pipe.execute()
json_value = results[0]
if not json_value:
# If nothing was found, return the same empty structure as the old class
return {"text": None, "audio_filename": None, "audio_bytes": None}
def get_response(self) -> dict:
return self._latest_response.copy()
# If data was found, decode it
payload = json.loads(json_value)
# Decode the Base64 string back into binary audio data
if payload.get("audio_bytes_b64"):
payload["audio_bytes"] = base64.b64decode(payload["audio_bytes_b64"])
else:
payload["audio_bytes"] = None
# Remove the temporary key before returning
del payload["audio_bytes_b64"]
return payload
def clear_response(self) -> None:
# Clear all fields
self._latest_response = {"text": None, "audio_filename": None, "audio_bytes": None, "timestamp": 0}
def clear_response(self, student_id: str) -> None:
"""Clears a response for a specific student from Redis."""
key = self._get_key(student_id)
self.redis.delete(key)
def is_response_fresh(self, max_age_seconds: int = 300) -> bool:
# The logic remains the same
return (self._latest_response["text"] and
(time.time() - self._latest_response["timestamp"] < max_age_seconds))
\ No newline at end of file
def is_response_fresh(self, student_id: str, max_age_seconds: int = 300) -> bool:
"""Checks if a response exists in Redis for the given student."""
key = self._get_key(student_id)
# redis.exists() is the direct equivalent of checking if the key is present
return self.redis.exists(key) > 0
\ No newline at end of file
......@@ -19,16 +19,16 @@ class ResponseService:
self.audio_service = audio_service # Keep for now if used elsewhere
# --- REWRITTEN and IMPROVED ---
def get_agent_response(self):
def get_agent_response(self, student_id: str):
"""
Gets the agent response from the manager and streams the raw audio bytes
directly, avoiding temporary files and re-downloading from MinIO.
"""
if not self.response_manager.is_response_fresh():
if not self.response_manager.is_response_fresh(student_id):
raise HTTPException(status_code=404, detail="Agent response not ready or expired.")
response_data = self.response_manager.get_response()
self.response_manager.clear_response() # Clear after getting it
response_data = self.response_manager.get_response(student_id)
self.response_manager.clear_response(student_id) # Clear after getting it
text_response = response_data.get("text")
audio_bytes = response_data.get("audio_bytes")
......
import redis
import json
import base64
import time
from typing import Optional, Dict
class SessionResponseManager:
"""
Manages response state in Redis, keyed by a session ID via cookies.
Implements the same interface as the old in-memory ResponseManager.
"""
def __init__(self, redis_client):
if redis_client is None:
raise ConnectionError("SessionResponseManager requires a valid Redis client.")
self.redis = redis_client
# Redis TTL serves the same purpose as checking timestamps
self.default_ttl = 600 # 10 minutes
def _get_key(self, session_id: str) -> str:
"""Helper to generate the Redis key."""
return f"session_response:{session_id}"
def store_response(self, session_id: str, text: str, audio_filename: Optional[str] = None, audio_bytes: Optional[bytes] = None) -> None:
"""Store response in Redis with TTL."""
key = self._get_key(session_id)
# Base64 encode binary data for JSON storage
encoded_audio = base64.b64encode(audio_bytes).decode('utf-8') if audio_bytes else None
payload = {
"text": text,
"audio_filename": audio_filename,
"audio_bytes_b64": encoded_audio,
# We store the timestamp to perfectly match the old object structure,
# even though Redis handles expiration automatically.
"timestamp": time.time()
}
value = json.dumps(payload)
# setex sets the value and the expiration (TTL) atomically
self.redis.setex(key, self.default_ttl, value)
def get_response(self, session_id: str) -> Dict:
"""
Atomically retrieves and deletes ('pops') the response from Redis.
Returns the dictionary structure expected by the service layer.
"""
key = self._get_key(session_id)
# Use a pipeline to get and delete atomically
pipe = self.redis.pipeline()
pipe.get(key)
pipe.delete(key) # Ensure it's only read once
results = pipe.execute()
json_value = results[0]
# Default empty structure if nothing found (matches old manager behavior)
empty_response = {"text": None, "audio_filename": None, "audio_bytes": None, "timestamp": 0}
if not json_value:
return empty_response
try:
payload = json.loads(json_value)
# Decode Base64 audio back to raw bytes
if payload.get("audio_bytes_b64"):
payload["audio_bytes"] = base64.b64decode(payload["audio_bytes_b64"])
else:
payload["audio_bytes"] = None
# Remove internal base64 key before returning
payload.pop("audio_bytes_b64", None)
return payload
except (TypeError, json.JSONDecodeError):
return empty_response
def clear_response(self, session_id: str) -> None:
"""Manually deletes the response key from Redis."""
key = self._get_key(session_id)
self.redis.delete(key)
def is_response_fresh(self, session_id: str, max_age_seconds: int = 300) -> bool:
"""
Checks if data exists in Redis.
Note: max_age_seconds is ignored because Redis handles TTL automatically,
but kept in signature for compatibility with the old interface.
"""
key = self._get_key(session_id)
# redis.exists returns > 0 if key exists
return self.redis.exists(key) > 0
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment