annan voice V0

parent a16c59a4
......@@ -12,7 +12,7 @@ SYSTEM_PROMPTS: Dict[Tuple[StudentNationality, StudyLanguage], str] = {
فقط لو الطفّل سأل عن هويتك بصراحة ووضح (مثل "إنت مين؟"، "عرّفني بنفسك"، "إنت بتعمل إيه هنا؟")،
رُد بالنصّ الثابت ده:
"أنا عَنان مؤسِّس شارع العلوم، وإنت هنا على مَنَصّة Science Street Lab،
وأنا هنا عشان أساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
وأنا هنا عشان أَساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
⚠️ مهم: لا تستخدم هذا النص في أي حالة أخرى غير سؤال الهوية.
⚠️ مهم جدًا: مَمنوع تقول أبداً إنك AI أو برنامج أو روبوت. إنت عنان وبس.
......@@ -57,7 +57,7 @@ SYSTEM_PROMPTS: Dict[Tuple[StudentNationality, StudyLanguage], str] = {
فقط لو الطفل سأل عن هويتك بصراحة ووضح (مثل "إنت مين؟"، "عرِّفني بنفسك"، "إنت وش تسوي هنا؟")،
رُد بالنص الثابت هذا:
"أنا عَنان مؤسِّس شارع العلوم، وإنت هنا على مَنَصّة Science Street Lab،
وأنا هنا عشان أساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
وأنا هنا عشان أَساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
⚠️ مهم: لا تستخدم هذا النص في أي حالة أخرى غير سؤال الهوية.
⚠️ مهم جدًا: مَمنوع تقول أبداً إنك AI أو برنامج أو روبوت. إنت عنان وبس.
......@@ -102,7 +102,7 @@ SYSTEM_PROMPTS: Dict[Tuple[StudentNationality, StudyLanguage], str] = {
إنت مُدرِّس لطفل في ابتدائي اسمه {student_name} في الصف {grade}. لو الطفّل سأل عن هويتك بصراحة (زي "إنت مين؟"، "عرِّفني بنفسك")،
رُد بالنصّ الثابت ده:
"أنا عَنان مؤسس شارع العلوم، وإنت هنا على مَنَصّة Science Street Lab،
وأنا هنا عشان أساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
وأنا هنا عشان أَساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
⚠️ مهم: لا تستخدم هذا النص في أي حالة أخرى غير سؤال الهوية.
⚠️ مهم جدًا: مَمنوع تقول أبداً إنك AI أو برنامج أو روبوت. إنت عنان وبس.
......@@ -147,7 +147,7 @@ SYSTEM_PROMPTS: Dict[Tuple[StudentNationality, StudyLanguage], str] = {
لو الطفل سأل عن هويتك بصراحة (زي "إنت مين؟"، "عرِّفني بنفسك"، "إنت وش تسوي هنا؟")،
رُد بالنصّ الثابت هذا:
"أنا عَنان مؤسس شارع العلوم، وإنت هنا على مَنَصّة Science Street Lab،
وأنا هنا عشان أساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
وأنا هنا عشان أَساعدك تتعلَّم أي حاجة عايز تتعلَّمها في العلوم."
⚠️ مهم: لا تستخدم هذا النص في أي حالة أخرى غير سؤال الهوية.
⚠️ مهم جدًا: مَمنوع تقول أبداً إنك AI أو برنامج أو روبوت. إنت عنان وبس.
......
......@@ -14,6 +14,7 @@ from services.connection_pool import ConnectionPool
from services.agent_helpers.query_handlers import QueryHandler
from services.agent_helpers.context_generator import ContextGenerator
from services.agent_helpers.response_generator import ResponseGenerator
from services.tts.tts_manager import get_tts_service
logger = logging.getLogger(__name__)
......@@ -26,6 +27,10 @@ class AgentService:
if not self.openai_service.is_available():
logger.warning("Warning: OPENAI_API_KEY not found. Agent service will be disabled.")
self.tts_service = get_tts_service(self.openai_service)
if not self.tts_service.is_available():
logger.warning("Warning: No TTS service is available.")
# Database setup
self.pool_handler = pool_handler
if self.pool_handler is None:
......@@ -62,6 +67,11 @@ class AgentService:
def is_available(self) -> bool:
return self.openai_service.is_available()
def text_to_speech(self, text: str, language: str) -> bytes:
if not self.tts_service or not self.tts_service.is_available():
raise HTTPException(status_code=503, detail="TTS service is not available")
return self.tts_service.generate_speech(text, language)
def generate_response(self, user_message: str, student_id: str, subject: str = "Science",
model: str = Models.chat, temperature: float = 0.3, top_k: int = 3) -> str:
"""Main response generation method"""
......
# services/chat_service.py
from fastapi import UploadFile, HTTPException
from typing import Optional
import sys
import os
import time
import io
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import MessageType, AppConfig, StudentNationality
from core import MessageType, AppConfig
from repositories import StorageRepository
from services.response_manager import ResponseManager
from services.openai_service import OpenAIService
from services.agent_service import AgentService
class ChatService:
def __init__(self, storage_repo: StorageRepository, response_manager: ResponseManager,
config: AppConfig, openai_service: OpenAIService, agent_service: AgentService):
......@@ -21,93 +24,84 @@ class ChatService:
self.openai_service = openai_service
self.agent_service = agent_service
# Message handlers
self.handlers = {
MessageType.AUDIO: AudioMessageHandler(
storage_repo,
config.minio_bucket,
openai_service
),
MessageType.AUDIO: AudioMessageHandler(storage_repo, config.minio_bucket, openai_service),
MessageType.TEXT: TextMessageHandler()
}
def process_message(self,
student_id: str,
file: Optional[UploadFile] = None,
text: Optional[str] = None):
"""Process message for student using database memory"""
def process_message(self, student_id: str, file: Optional[UploadFile] = None, text: Optional[str] = None):
"""Process message and generate text and audio response."""
self.response_manager.clear_response()
try:
# Process the input message
if file and file.filename:
result = self.handlers[MessageType.AUDIO].handle(file=file)
if result.get("status") == "success":
user_message = result.get("transcription", "")
if not user_message:
user_message = "تم إرسال رسالة صوتية - فشل في التفريغ المحلي"
else:
raise HTTPException(status_code=400, detail="Failed to process audio message")
elif text:
result = self.handlers[MessageType.TEXT].handle(text=text)
user_message = text
else:
raise HTTPException(status_code=400, detail="No text or audio file provided.")
# Generate agent response using database
try:
agent_response = self.agent_service.generate_response(
agent_response_text = self.agent_service.generate_response(
user_message=user_message,
student_id=student_id,
)
# Generate TTS audio
audio_filename = self._generate_and_upload_audio(agent_response)
# --- MODIFIED: Call the audio generation method ---
audio_data = self._generate_and_upload_audio(agent_response_text, student_id)
# Store response for retrieval
self.response_manager.store_response(agent_response, audio_filename)
# --- FIXED: Use the correct 'store_response' method name ---
self.response_manager.store_response(
text=agent_response_text,
audio_filename=audio_data.get("filename"),
audio_bytes=audio_data.get("bytes")
)
print(f"Generated response for student {student_id}: {agent_response[:100]}...")
print(f"Generated response for student {student_id}: {agent_response_text[:100]}...")
return {
"status": "success",
"message": "Message processed and agent response ready",
"student_id": student_id,
"agent_response": agent_response,
"audio_filename": audio_filename
"agent_response": agent_response_text,
"audio_filename": audio_data.get("filename")
}
except Exception as agent_error:
print(f"Agent error for student {student_id}: {agent_error}")
raise HTTPException(status_code=500, detail=f"Agent response failed: {str(agent_error)}")
except Exception as e:
print(f"Error processing message for student {student_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to process message: {str(e)}")
def _generate_and_upload_audio(self, text: str) -> str:
"""Generate and upload TTS audio"""
def _generate_and_upload_audio(self, text: str, student_id: str) -> dict:
"""
Generates TTS audio and uploads the resulting audio bytes directly to MinIO.
"""
try:
import time
student_info = self.agent_service.db_service.get_student_info(student_id)
if not student_info:
raise ValueError(f"Could not find student {student_id} for TTS.")
language = "ar" if student_info.get('is_arabic') else "en"
temp_file_path = self.openai_service.generate_speech(text)
audio_bytes = self.agent_service.text_to_speech(text, language)
provider = os.getenv("TTS_PROVIDER", "openai").lower()
file_extension = "wav" if provider == "custom" else "mp3"
timestamp = int(time.time())
filename = f"agent_response_{timestamp}.mp3"
filename = f"agent_response_{timestamp}.{file_extension}"
minio_file_path = f"audio/{filename}"
print(f"Uploading audio to MinIO: {minio_file_path}")
with open(temp_file_path, 'rb') as audio_file:
self.storage_repo.upload_file(audio_file, self.config.minio_bucket, minio_file_path)
self.openai_service.cleanup_temp_file(temp_file_path)
# --- FIXED: Call the upload method with the correct argument names ---
# Your MinIO repo uses 'upload_fileobj' which matches this call.
self.storage_repo.upload_file(
file_obj=io.BytesIO(audio_bytes),
bucket=self.config.minio_bucket,
file_path=minio_file_path
)
print(f"Successfully generated TTS audio: {filename}")
return filename
print(f"Successfully generated and uploaded TTS audio: {filename}")
return {"bytes": audio_bytes, "filename": filename}
except Exception as e:
print(f"Error generating audio: {e}")
return None
\ No newline at end of file
print(f"Error in _generate_and_upload_audio: {e}")
return {"bytes": None, "filename": None}
\ No newline at end of file
......@@ -8,9 +8,10 @@ from openai import OpenAI
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import Models
from services.tts.base_tts_service import BaseTTSService
class OpenAIService:
class OpenAIService(BaseTTSService):
"""Service class for handling OpenAI API operations (TTS, Whisper, Embeddings)"""
def __init__(self):
......@@ -54,35 +55,30 @@ class OpenAIService:
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
# ------------------- TTS -------------------
def generate_speech(self, text: str, voice: str = "alloy") -> str:
"""Generate speech from text using OpenAI TTS"""
def generate_speech(self, text: str, language: str = "en") -> bytes:
"""Generate speech from text using OpenAI TTS. Returns raw audio bytes."""
if not self.is_available():
raise HTTPException(status_code=500, detail="OpenAI service not available")
temp_file_path = None
try:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
temp_file_path = temp_file.name
temp_file.close()
voice = "alloy"
print(f"Generating TTS audio: {text[:50]}...")
try:
print(f"Generating TTS audio with OpenAI: {text[:50]}...")
with self.client.audio.speech.with_streaming_response.create(
response = self.client.audio.speech.create(
model=Models.tts,
voice=voice,
input=text,
response_format="mp3"
) as response:
response.stream_to_file(temp_file_path)
)
print(f"TTS generation successful, saved to: {temp_file_path}")
return temp_file_path
audio_bytes = response.content
print("OpenAI TTS generation successful.")
return audio_bytes
except Exception as e:
if temp_file_path and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
print(f"Error during TTS generation: {e}")
raise HTTPException(status_code=500, detail=f"TTS generation failed: {str(e)}")
print(f"Error during OpenAI TTS generation: {e}")
raise HTTPException(status_code=500, detail=f"OpenAI TTS generation failed: {str(e)}")
# ------------------- Embeddings -------------------
def generate_embedding(self, text: str) -> List[float]:
......
# services/response_manager.py
import time
from typing import Optional
class ResponseManager:
def __init__(self):
self._latest_response = {"text": None, "audio_filename": None, "timestamp": 0}
# Initialize with the new 'audio_bytes' key
self._latest_response = {"text": None, "audio_filename": None, "audio_bytes": None, "timestamp": 0}
def store_response(self, text: str, audio_filename: Optional[str] = None) -> None:
"""Store response with audio filename instead of file path"""
# --- MODIFIED: Added the 'audio_bytes' parameter ---
def store_response(self, text: str, audio_filename: Optional[str] = None, audio_bytes: Optional[bytes] = None) -> None:
"""Store response with text, audio filename, and raw audio bytes."""
self._latest_response = {
"text": text,
"audio_filename": audio_filename,
"audio_bytes": audio_bytes, # <-- Store the bytes
"timestamp": time.time()
}
......@@ -17,8 +22,10 @@ class ResponseManager:
return self._latest_response.copy()
def clear_response(self) -> None:
self._latest_response = {"text": None, "audio_filename": None, "timestamp": 0}
# Clear all fields
self._latest_response = {"text": None, "audio_filename": None, "audio_bytes": None, "timestamp": 0}
def is_response_fresh(self, max_age_seconds: int = 300) -> bool:
# The logic remains the same
return (self._latest_response["text"] and
(time.time() - self._latest_response["timestamp"] < max_age_seconds))
\ No newline at end of file
# services/response_service.py
import base64
import io
from fastapi import HTTPException
from fastapi.responses import FileResponse
from fastapi.responses import Response, StreamingResponse
from starlette.background import BackgroundTask
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import ResponseStatus
from services.response_manager import ResponseManager
from services.audio_service import AudioService
from services.audio_service import AudioService # Keep for now if used elsewhere
class ResponseService:
def __init__(self, response_manager: ResponseManager, audio_service: AudioService):
self.response_manager = response_manager
self.audio_service = audio_service
self.audio_service = audio_service # Keep for now if used elsewhere
# --- REWRITTEN and IMPROVED ---
def get_agent_response(self):
"""
Gets the agent response from the manager and streams the raw audio bytes
directly, avoiding temporary files and re-downloading from MinIO.
"""
if not self.response_manager.is_response_fresh():
raise HTTPException(status_code=404, detail="Agent response not ready or expired.")
response_data = self.response_manager.get_response()
self.response_manager.clear_response() # Clear after getting it
if response_data["audio_filename"]:
# Download audio file from MinIO using filename
file_path = self.audio_service.get_audio_file(response_data["audio_filename"])
response_text = response_data["text"]
self.response_manager.clear_response()
# Encode the text in Base64
encoded_text = base64.b64encode(response_text.encode('utf-8')).decode('ascii')
return FileResponse(
path=file_path,
media_type="audio/mpeg",
filename="response.mp3",
background=BackgroundTask(self.audio_service.cleanup_tempfile, file_path),
headers={"X-Response-Text": encoded_text}
)
else:
response_text = response_data["text"]
self.response_manager.clear_response()
text_response = response_data.get("text")
audio_bytes = response_data.get("audio_bytes")
if not audio_bytes:
# Handle text-only response if audio failed
return {
"status": ResponseStatus.SUCCESS,
"message": "Text response available.",
"text": response_text
"message": "Text response available (audio generation failed).",
"text": text_response
}
# Determine content type based on filename extension
filename = response_data.get("audio_filename", "")
media_type = "audio/wav" if filename.endswith(".wav") else "audio/mpeg"
# Encode the text in Base64 for the header
encoded_text = base64.b64encode(text_response.encode('utf-8')).decode('ascii')
# Stream the raw bytes directly
return Response(
content=audio_bytes,
media_type=media_type,
headers={
"X-Response-Text": encoded_text,
"Access-Control-Expose-Headers": "X-Response-Text"
}
)
\ No newline at end of file
from abc import ABC, abstractmethod
class BaseTTSService(ABC):
"""
Abstract Base Class (the "Contract") for all Text-to-Speech services.
It ensures that any TTS service we create has a consistent interface.
"""
@abstractmethod
def is_available(self) -> bool:
"""Check if the TTS service is configured and available."""
pass
@abstractmethod
def generate_speech(self, text: str, language: str = "en") -> bytes:
"""
Generate speech from text.
Args:
text (str): The text to synthesize.
language (str): The language of the text (e.g., "en", "ar").
Returns:
bytes: The raw audio data of the speech (e.g., in WAV or MP3 format).
"""
pass
\ No newline at end of file
import os
import httpx
from .base_tts_service import BaseTTSService
class CustomTTSService(BaseTTSService):
"""
TTS Service implementation that calls our self-hosted, custom FastAPI model.
"""
def __init__(self):
# Read the URL of our FastAPI server from an environment variable
self.api_url = os.getenv("CUSTOM_TTS_URL", "http://localhost:5000/synthesize")
self._is_available = bool(self.api_url)
print(f"Custom TTS Service initialized. API URL: {self.api_url}")
def is_available(self) -> bool:
return self._is_available
def generate_speech(self, text: str, language: str = "en") -> bytes:
"""
Makes an HTTP POST request to the custom TTS FastAPI server.
"""
if not self.is_available():
raise ConnectionError("Custom TTS service is not configured or available.")
try:
# Use httpx for modern, async-friendly requests
with httpx.Client() as client:
response = client.post(
self.api_url,
json={"text": text, "language": language},
timeout=120.0 # Set a generous timeout for long text
)
# Raise an exception for bad status codes (4xx or 5xx)
response.raise_for_status()
# The raw audio data is in the response content
audio_bytes = response.content
print(f"Successfully received audio from custom TTS service for language '{language}'.")
return audio_bytes
except httpx.RequestError as e:
print(f"Error calling custom TTS service: {e}")
# Re-raise as a standard ConnectionError
raise ConnectionError(f"Failed to connect to custom TTS service at {self.api_url}") from e
\ No newline at end of file
import os
from .base_tts_service import BaseTTSService
from .custom_tts_service import CustomTTSService
from services.openai_service import OpenAIService # We'll modify OpenAI service next
def get_tts_service(openai_service_instance: OpenAIService = None) -> BaseTTSService:
provider = os.getenv("TTS_PROVIDER", "openai").lower()
print(f"TTS Provider selected: '{provider}'")
if provider == "custom":
return CustomTTSService()
elif provider == "openai":
# If an instance was passed in, reuse it. Otherwise, create a new one.
return openai_service_instance if openai_service_instance else OpenAIService()
else:
raise ValueError(f"Unknown TTS provider specified: {provider}. Use 'openai' or 'custom'.")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment