tashkeel fixer under test

parent be1630c3
import logging
from services.agent_helpers.agent_prompts import tashkeel_agent_prompt
logger = logging.getLogger(__name__)
class TashkeelAgent:
"""Agent to apply Arabic (Egyptian) diacritization on text for TTS"""
def __init__(self, openai_service):
self.openai_service = openai_service
def apply_tashkeel(self, text: str) -> str:
"""Send text to LLM and return fully diacritized version"""
try:
if not self.openai_service.is_available():
logger.warning("OpenAI service not available for TashkeelAgent")
return text # fallback: return original
messages = [
{"role": "system", "content": tashkeel_agent_prompt},
{"role": "user", "content": text}
]
response = self.openai_service.client.chat.completions.create(
model="gpt-4o-mini", # أو أي موديل خفيف سريع
messages=messages,
temperature=0.1
)
return response.choices[0].message.content.strip()
except Exception as e:
logger.error(f"TashkeelAgent error: {e}")
return text # fallback
custom_fixes = {
"التكيف": "التَكَيُّف",
"بقاء": "البَقَّاء",
"قدرة": "القُدرَة",
"النقل": "النَقْل",
"الدب": "الدُبّ",
"النمر": "النَمِر",
"فرو": "فَروُ",
"البني": "البُنّي",
"ملونة": "مِلوِنَةْ",
"قوس قزح": "قُوس قُزَح",
"معينة": "مُعيَّنَة",
"الفنك": "الفنِك",
"الحر": "الحَر",
"الشم": "الَشَمْ",
"البصر": "البَصَر",
"الأذن": "الاُذُن",
"الفم": "الفَم",
"العين": "العِين",
"اللهث": "اللَّهْث",
"القطط": "القطط",
"لنقل": "لنَقْل",
"قدم": "قَدَمْ",
"مية": "مَيَّةْ",
"حاسة": "حاسة",
}
def apply_fixes(text, fixes_dict):
for wrong, fixed in fixes_dict.items():
if wrong in text:
text = text.replace(wrong, fixed)
return text
...@@ -14,6 +14,8 @@ from services.connection_pool import ConnectionPool ...@@ -14,6 +14,8 @@ from services.connection_pool import ConnectionPool
from services.agent_helpers.query_handlers import QueryHandler from services.agent_helpers.query_handlers import QueryHandler
from services.agent_helpers.context_generator import ContextGenerator from services.agent_helpers.context_generator import ContextGenerator
from services.agent_helpers.response_generator import ResponseGenerator from services.agent_helpers.response_generator import ResponseGenerator
from services.agent_helpers.tashkeel_agent import TashkeelAgent
from services.agent_helpers.tashkeel_fixer import apply_fixes, custom_fixes
from services.tts.tts_manager import get_tts_service from services.tts.tts_manager import get_tts_service
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -64,20 +66,34 @@ class AgentService: ...@@ -64,20 +66,34 @@ class AgentService:
self.query_handler, self.context_generator self.query_handler, self.context_generator
) )
self.tashkeel_agent = TashkeelAgent(self.openai_service)
def is_available(self) -> bool: def is_available(self) -> bool:
return self.openai_service.is_available() return self.openai_service.is_available()
def text_to_speech(self, text: str, language: str) -> bytes: def text_to_speech(self, text: str, language: str) -> bytes:
if not self.tts_service or not self.tts_service.is_available(): if not self.tts_service or not self.tts_service.is_available():
raise HTTPException(status_code=503, detail="TTS service is not available") raise HTTPException(status_code=503, detail="TTS service is not available")
# Step 1: apply tashkeel before sending to TTS
text = self.tashkeel_agent.apply_tashkeel(text)
print(f"Tashkeel applied: {text}")
# Step 2: send to TTS
return self.tts_service.generate_speech(text, language) return self.tts_service.generate_speech(text, language)
def generate_response(self, user_message: str, student_id: str, subject: str = "Science", def generate_response(self, user_message: str, student_id: str, subject: str = "Science",
model: str = Models.chat, temperature: float = 0.3, top_k: int = 3) -> str: model: str = Models.chat, temperature: float = 0.3, top_k: int = 3) -> str:
"""Main response generation method""" """Main response generation method"""
return self.response_generator.generate_response( response = self.response_generator.generate_response(
user_message, student_id, subject, model, temperature, top_k user_message, student_id, subject, model, temperature, top_k
) )
response = apply_fixes(response, custom_fixes)
#response = self.tashkeel_agent.apply_tashkeel(response)
print(f"response: {response}")
return response
def search_similar(self, query_embedding: List[float], student_id: str, def search_similar(self, query_embedding: List[float], student_id: str,
subject: str = "chemistry", top_k: int = 3): subject: str = "chemistry", top_k: int = 3):
......
...@@ -6,33 +6,39 @@ class LanguageSegmentationService: ...@@ -6,33 +6,39 @@ class LanguageSegmentationService:
A service to segment a string of text into a list of dictionaries, A service to segment a string of text into a list of dictionaries,
each tagged with its detected language. each tagged with its detected language.
""" """
def segment_text(self, text: str) -> List[Dict[str, str]]: def segment_text(self, text: str) -> List[Dict[str, str]]:
""" """
Takes a mixed-language string and splits it into segments. Takes a mixed-language string and splits it into segments.
Example: Example:
Input: "هذا هو a test of the system." Input: "هذا هو a test of the system."
Output: [ Output: [
{'text': 'هذا هو', 'language': 'ar'}, {'text': 'هذا هو', 'language': 'ar'},
{'text': 'a test of the system.', 'language': 'en'} {'text': 'a test of the system.', 'language': 'en'}
] ]
""" """
segments = [] segments = []
if not text: if not text:
return segments return segments
words = text.split() words = text.split()
if not words: if not words:
return segments return segments
# Start with the language of the first word # Start with the language of the first word
current_lang = self._detect_word_language(words[0]) current_lang = self._detect_word_language(words[0])
current_segment = [] current_segment = []
for word in words: for word in words:
word_lang = self._detect_word_language(word) word_lang = self._detect_word_language(word)
if word_lang == current_lang: # Check if this is a "neutral" token (numbers, punctuation, special markers)
is_neutral = self._is_neutral_token(word)
if is_neutral:
# Neutral tokens stay with the current segment
current_segment.append(word)
elif word_lang == current_lang:
# If the language is the same, add the word to the current segment # If the language is the same, add the word to the current segment
current_segment.append(word) current_segment.append(word)
else: else:
...@@ -42,26 +48,46 @@ class LanguageSegmentationService: ...@@ -42,26 +48,46 @@ class LanguageSegmentationService:
"text": " ".join(current_segment), "text": " ".join(current_segment),
"language": current_lang "language": current_lang
}) })
# Start a new segment with the new word and language # Start a new segment with the new word and language
current_lang = word_lang current_lang = word_lang
current_segment = [word] current_segment = [word]
# Add the final remaining segment # Add the final remaining segment
if current_segment: if current_segment:
segments.append({ segments.append({
"text": " ".join(current_segment), "text": " ".join(current_segment),
"language": current_lang "language": current_lang
}) })
print(f"Segmented text into {len(segments)} parts.") print(f"Segmented text into {len(segments)} parts.")
return segments return segments
def _is_neutral_token(self, word: str) -> bool:
"""
Check if a token is 'neutral' (numbers, punctuation, special markers).
These should stick with the current segment rather than create a new one.
"""
# Strip common punctuation to check the core content
stripped = word.strip('.,!?;:()[]{}"\'-')
# Empty after stripping (pure punctuation)
if not stripped:
return True
# Pure numbers (with optional punctuation like "1." or "#1")
if stripped.replace('#', '').isdigit():
return True
# Special markdown-like markers (##, ###, etc.)
if all(c == '#' for c in stripped):
return True
return False
def _detect_word_language(self, word: str) -> str: def _detect_word_language(self, word: str) -> str:
"""Detects language of a single word, defaulting to 'en' for ambiguity.""" """Detects language of a single word, defaulting to 'en' for ambiguity."""
# Simple heuristic: if it contains any Arabic characters, it's Arabic. # Simple heuristic: if it contains any Arabic characters, it's Arabic.
if any('\u0600' <= char <= '\u06FF' for char in word): if any('\u0600' <= char <= '\u06FF' for char in word):
return "ar" return "ar"
# For non-Arabic words, we can assume English # For non-Arabic words, we can assume English
return "en" return "en"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment