machine code arabic enflish clean text

parent 7aa52e77
...@@ -4,7 +4,7 @@ class TTSConfig(BaseModel): ...@@ -4,7 +4,7 @@ class TTSConfig(BaseModel):
"""Holds configuration for a single TTS model.""" """Holds configuration for a single TTS model."""
language: str language: str
model_name_or_path: str model_name_or_path: str
speaker_wav: str speaker_directory: str
config_path: str | None = None config_path: str | None = None
vocab_path: str | None = None vocab_path: str | None = None
...@@ -12,7 +12,7 @@ class TTSConfig(BaseModel): ...@@ -12,7 +12,7 @@ class TTSConfig(BaseModel):
ARABIC_MODEL_CONFIG = TTSConfig( ARABIC_MODEL_CONFIG = TTSConfig(
language="ar", language="ar",
model_name_or_path="./model/EGTTS-V0.1/", model_name_or_path="./model/EGTTS-V0.1/",
speaker_wav="calm_anan_1.wav", speaker_directory="salma",
config_path="./model/EGTTS-V0.1/config.json", config_path="./model/EGTTS-V0.1/config.json",
vocab_path="./model/EGTTS-V0.1/vocab.json" vocab_path="./model/EGTTS-V0.1/vocab.json"
) )
...@@ -20,7 +20,7 @@ ARABIC_MODEL_CONFIG = TTSConfig( ...@@ -20,7 +20,7 @@ ARABIC_MODEL_CONFIG = TTSConfig(
ENGLISH_MODEL_CONFIG = TTSConfig( ENGLISH_MODEL_CONFIG = TTSConfig(
language="en", language="en",
model_name_or_path="tts_models/multilingual/multi-dataset/xtts_v2", model_name_or_path="tts_models/multilingual/multi-dataset/xtts_v2",
speaker_wav="calm_anan_1.wav" speaker_directory="anan"
) )
......
import torch
import soundfile as sf
import io
import warnings
import logging
import numpy as np
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, JSONResponse
from config import SUPPORTED_MODELS
from schemas import SynthesisRequest
from tts_service import TTSModel
from utils import split_text_into_chunks
# --- Suppress Warnings ---
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
logging.getLogger("transformers").setLevel(logging.ERROR)
# --- Application Setup ---
app = FastAPI()
# Dictionary for application's state (the loaded models)
models = {}
# --- Model Loading on Startup ---
@app.on_event("startup")
def load_all_models():
use_gpu = torch.cuda.is_available()
print(f"GPU Available: {use_gpu}")
for lang, config in SUPPORTED_MODELS.items():
model = TTSModel(config, use_gpu=use_gpu)
model.load()
models[lang] = model
# --- API Endpoint ---
@app.post("/synthesize")
async def synthesize(request: SynthesisRequest):
# Select the correct model from our state dictionary
model = models.get(request.language)
if not model or not model.is_loaded:
return JSONResponse(content={"error": f"The model for language '{request.language}' is not available."}, status_code=503)
try:
# Set character limits with a safety buffer
char_limit = 140 if request.language == "ar" else 220
text_chunks = split_text_into_chunks(request.text, char_limit)
print(f"Text split into {len(text_chunks)} chunks.")
all_audio_chunks = []
silence_samples = np.zeros(int(24000 * 300 / 1000), dtype=np.float32)
for i, chunk in enumerate(text_chunks):
print(f"Synthesizing chunk {i+1}/{len(text_chunks)}: '{chunk}'")
# Use our powerful OOP model object to synthesize
audio_chunk = model.synthesize_chunk(chunk)
all_audio_chunks.append(audio_chunk)
if i < len(text_chunks) - 1:
all_audio_chunks.append(silence_samples)
final_audio = np.concatenate(all_audio_chunks)
buffer = io.BytesIO()
sf.write(buffer, final_audio, 24000, format='WAV')
buffer.seek(0)
return StreamingResponse(buffer, media_type="audio/wav")
except Exception as e:
print(f"An error occurred during audio generation: {e}")
return JSONResponse(content={"error": "Failed to generate audio"}, status_code=500)
\ No newline at end of file
from pydantic import BaseModel from pydantic import BaseModel
from typing import Literal from typing import Literal, List
class SynthesisRequest(BaseModel): class SynthesisRequest(BaseModel):
text: str text: str
language: Literal["ar", "en"] language: Literal["ar", "en"]
\ No newline at end of file
class Segment(BaseModel):
"""Defines a single language-tagged text segment."""
text: str
language: Literal["ar", "en"]
class SequenceSynthesisRequest(BaseModel):
"""Defines the request body for the new endpoint, which is a list of segments."""
segments: List[Segment]
\ No newline at end of file
import os
import torch
import torchaudio
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
CONFIG_FILE_PATH = './model/EGTTS-V0.1/config.json'
VOCAB_FILE_PATH = './model/EGTTS-V0.1/vocab.json'
MODEL_PATH = './model/EGTTS-V0.1/'
print("Loading model...")
config = XttsConfig()
config.load_json(CONFIG_FILE_PATH)
model = Xtts.init_from_config(config)
model.load_checkpoint(config, checkpoint_dir=MODEL_PATH, use_deepspeed=False, vocab_path=VOCAB_FILE_PATH)
# move model to GPU if available
if torch.cuda.is_available():
model.cuda()
print("Model moved to GPU.")
# compute speaker latents
SPEAKER_AUDIO_PATH = 'calm_anan_1.wav'
print("Computing speaker latents...")
gpt_cond_latent, speaker_embedding = model.get_conditioning_latents(audio_path=[SPEAKER_AUDIO_PATH])
text = """
انا عنان مؤسس شرع العلوم وانا هنا عشان اساعدك تتعلم اي حاجة عايز تتعلمها فالعلوم
انا شرع العلوم موقع تعليمي بيقدم كورسات مجانية في مجالات متعددة زي البرمجة، التصميم، التسويق، وغيرها
كل اللي عليك تعمله تزور الموقع وتختار الكورس اللي يناسبك وتبدأ تتعلم على طول من غير اي تكلفة
تحب تتعلم ايه النهاردة؟
اي اسئلة عندك انا هنا عشان اساعدك
اي استفسار انا تحت امرك
اسال زي ما انت عايز
في اي مجال تحب تتعلم اكتر؟
"""
print("Inference...")
out = model.inference(
text,
"ar",
gpt_cond_latent,
speaker_embedding,
temperature=0.1,
)
AUDIO_OUTPUT_PATH = "output_audio.wav"
import soundfile as sf
sf.write(AUDIO_OUTPUT_PATH, out["wav"], 24000)
import numpy as np
from typing import Dict, List
from schemas import SequenceSynthesisRequest, Segment
from tts_service import TTSModel
from utils import split_text_into_chunks, sanitize_text, translate_equations_in_text
class SynthesizerService:
"""
This service class orchestrates the entire synthesis process, from chunking
to batching and stitching. It holds the application's state (the models).
"""
def __init__(self, models: Dict[str, TTSModel]):
self.models = models
def synthesize_simple(self, text: str, language: str) -> np.ndarray:
"""Handles the logic for the simple /synthesize endpoint."""
model = self.models.get(language)
if not model or not model.is_loaded:
raise ValueError(f"Model for language '{language}' is not available.")
char_limit = 140 if language == "ar" else 220
text = translate_equations_in_text(text, language)
text = sanitize_text(text)
text_chunks = split_text_into_chunks(text, char_limit)
print(f"Text split into {len(text_chunks)} chunks.")
audio_chunks = model.synthesize_batch(text_chunks)
silence_samples = np.zeros(int(24000 * 300 / 1000), dtype=np.float32)
final_audio_pieces = []
for i, audio in enumerate(audio_chunks):
final_audio_pieces.append(audio)
if i < len(audio_chunks) - 1:
final_audio_pieces.append(silence_samples)
return np.concatenate(final_audio_pieces)
def synthesize_sequence(self, segments: List[Segment]) -> np.ndarray:
"""Handles the complex logic for the /synthesize_sequence endpoint."""
silence_samples = np.zeros(int(24000 * 300 / 1000), dtype=np.float32)
chunk_metadata = []
for seg_idx, segment in enumerate(segments):
lang = segment.language
if lang not in self.models or not self.models[lang].is_loaded:
raise ValueError(f"Model for language '{lang}' is not available.")
char_limit = 140 if lang == "ar" else 200
segment.text = translate_equations_in_text(segment.text, lang)
segment.text = sanitize_text(segment.text)
print(f"Segment {seg_idx} ({lang}) text: {segment.text}")
text_chunks = split_text_into_chunks(segment.text, char_limit)
print(f"chunks: {text_chunks}")
for chunk_idx, text in enumerate(text_chunks):
chunk_metadata.append({
'segment_idx': seg_idx,
'lang': lang,
'text': text
})
lang_groups = {}
for idx, meta in enumerate(chunk_metadata):
lang = meta['lang']
if lang not in lang_groups:
lang_groups[lang] = []
lang_groups[lang].append((idx, meta['text']))
audio_results = [None] * len(chunk_metadata)
for lang, chunks in lang_groups.items():
model = self.models[lang]
indices = [idx for idx, _ in chunks]
texts = [text for _, text in chunks]
print(f"Processing {len(texts)} {lang} chunks in parallel batches...")
audio_chunks = model.synthesize_batch(texts)
for idx, audio in zip(indices, audio_chunks):
audio_results[idx] = audio
segments_audio = {}
for idx, meta in enumerate(chunk_metadata):
seg_idx = meta['segment_idx']
if seg_idx not in segments_audio:
segments_audio[seg_idx] = []
segments_audio[seg_idx].append(audio_results[idx])
final_audio_pieces = []
for seg_idx in sorted(segments_audio.keys()):
segment_audio = np.concatenate(segments_audio[seg_idx])
final_audio_pieces.append(segment_audio)
if seg_idx < len(segments) - 1:
final_audio_pieces.append(silence_samples)
if not final_audio_pieces:
raise ValueError("Audio generation resulted in empty output.")
return np.concatenate(final_audio_pieces)
\ No newline at end of file
...@@ -2,29 +2,41 @@ import torch ...@@ -2,29 +2,41 @@ import torch
from TTS.api import TTS from TTS.api import TTS
from TTS.tts.configs.xtts_config import XttsConfig from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts from TTS.tts.models.xtts import Xtts
from typing import List
import numpy as np
import os
from config import TTSConfig from config import TTSConfig
class TTSModel: class TTSModel:
""" """
A class that encapsulates a Coqui TTS model, handling loading, A class that encapsulates a Coqui TTS model with batch processing support.
speaker latent calculation, and inference. This is the core OOP abstraction.
""" """
def __init__(self, config: TTSConfig, use_gpu: bool = False): def __init__(self, config: TTSConfig, use_gpu: bool = False, batch_size: int = 4):
self.config = config self.config = config
self.use_gpu = use_gpu self.use_gpu = use_gpu
self.batch_size = batch_size
self.model = None self.model = None
self.gpt_cond_latent = None self.gpt_cond_latent = None
self.speaker_embedding = None self.speaker_embedding = None
self.is_loaded = False self.is_loaded = False
def load(self): def load(self):
"""Loads the model and computes speaker latents.""" """Loads the model and computes speaker latents from a directory of WAV files."""
print(f"Loading model for language: '{self.config.language}'...") print(f"Loading model for language: '{self.config.language}'...")
try: try:
# Handle the two different ways of loading models speaker_dir = self.config.speaker_directory
if not os.path.isdir(speaker_dir):
raise ValueError(f"Speaker directory not found: {speaker_dir}")
speaker_wav_paths = [os.path.join(speaker_dir, f) for f in os.listdir(speaker_dir) if f.endswith('.wav')]
if not speaker_wav_paths:
raise ValueError(f"No .wav files found in directory: {speaker_dir}")
print(f"Found {len(speaker_wav_paths)} reference audio files for voice cloning.")
# Load the base model (logic is the same)
if self.config.language == "ar": if self.config.language == "ar":
# Local, fine-tuned model
conf = XttsConfig() conf = XttsConfig()
conf.load_json(self.config.config_path) conf.load_json(self.config.config_path)
self.model = Xtts.init_from_config(conf) self.model = Xtts.init_from_config(conf)
...@@ -36,38 +48,47 @@ class TTSModel: ...@@ -36,38 +48,47 @@ class TTSModel:
) )
if self.use_gpu: if self.use_gpu:
self.model.cuda() self.model.cuda()
# Calculate latents using the model's method
self.gpt_cond_latent, self.speaker_embedding = self.model.get_conditioning_latents(
audio_path=[self.config.speaker_wav]
)
else: else:
# High-level API model
api_model = TTS(model_name=self.config.model_name_or_path, gpu=self.use_gpu) api_model = TTS(model_name=self.config.model_name_or_path, gpu=self.use_gpu)
self.model = api_model.synthesizer.tts_model self.model = api_model.synthesizer.tts_model
# Calculate latents using the API model's method print(f"Computing speaker characteristics from {len(speaker_wav_paths)} files...")
self.gpt_cond_latent, self.speaker_embedding = self.model.get_conditioning_latents( self.gpt_cond_latent, self.speaker_embedding = self.model.get_conditioning_latents(
audio_path=[self.config.speaker_wav] audio_path=speaker_wav_paths
) )
self.is_loaded = True self.is_loaded = True
print(f"Model for '{self.config.language}' loaded successfully.") print(f"Model for '{self.config.language}' loaded successfully with batch size {self.batch_size}.")
except Exception as e: except Exception as e:
print(f"FATAL ERROR: Could not load model for '{self.config.language}'. Error: {e}") print(f"FATAL ERROR: Could not load model for '{self.config.language}'. Error: {e}")
self.is_loaded = False self.is_loaded = False
def synthesize_chunk(self, text: str): def synthesize_chunk(self, text: str):
"""Runs inference on a single text chunk.""" if not self.is_loaded: raise RuntimeError(f"Model for '{self.config.language}' is not loaded.")
if not self.is_loaded: out = self.model.inference(text=text, language=self.config.language, speaker_embedding=self.speaker_embedding, gpt_cond_latent=self.gpt_cond_latent, temperature=0.1)
raise RuntimeError(f"Model for language '{self.config.language}' is not loaded.") return out["wav"]
out = self.model.inference( def synthesize_batch(self, texts: List[str]) -> List[np.ndarray]:
text=text, if not self.is_loaded: raise RuntimeError(f"Model for '{self.config.language}' is not loaded.")
language=self.config.language, if not texts: return []
speaker_embedding=self.speaker_embedding, all_audio = []
gpt_cond_latent=self.gpt_cond_latent, for i in range(0, len(texts), self.batch_size):
temperature=0.1 batch_texts = texts[i:i + self.batch_size]
) print(f"Processing batch {i//self.batch_size + 1}: {len(batch_texts)} chunks")
return out["wav"] batch_audio = []
\ No newline at end of file try:
with torch.no_grad():
for text in batch_texts:
out = self.model.inference(text=text, language=self.config.language, speaker_embedding=self.speaker_embedding, gpt_cond_latent=self.gpt_cond_latent, temperature=0.1)
batch_audio.append(out["wav"])
all_audio.extend(batch_audio)
if self.use_gpu: torch.cuda.empty_cache()
except RuntimeError as e:
if "out of memory" in str(e):
print(f"GPU OOM error. Falling back to sequential processing for this batch.")
for text in batch_texts:
out = self.model.inference(text=text, language=self.config.language, speaker_embedding=self.speaker_embedding, gpt_cond_latent=self.gpt_cond_latent, temperature=0.1)
all_audio.append(out["wav"])
if self.use_gpu: torch.cuda.empty_cache()
else: raise e
return all_audio
\ No newline at end of file
import nltk import nltk
import re
from num2words import num2words
def split_text_into_chunks(text: str, max_chars: int): def split_text_into_chunks(text: str, max_chars: int):
""" """
...@@ -33,4 +35,171 @@ def split_text_into_chunks(text: str, max_chars: int): ...@@ -33,4 +35,171 @@ def split_text_into_chunks(text: str, max_chars: int):
if current_chunk: if current_chunk:
chunks.append(current_chunk.strip()) chunks.append(current_chunk.strip())
return chunks return chunks
\ No newline at end of file
import re
def sanitize_text(text: str) -> str:
"""
Cleans a string of text by removing Markdown, list markers, and other
symbols that are not meant to be pronounced by a TTS model.
"""
# 1. Remove Markdown headers (##, ###, etc.)
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
# 2. Remove Markdown bold/italic markers (asterisks and underscores)
text = re.sub(r'(\*\*|__)(.*?)(\*\*|__)', r'\2', text) # **bold** or __bold__
text = re.sub(r'(\*|_)(.*?)(\*|_)', r'\2', text) # *italic* or _italic_
# 3. Remove list item markers (like -, *, +, or numbered lists) at the beginning of a line
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) # Unordered lists
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) # Numbered lists (1. 2. 3.)
# 4. Remove inline code markers (backticks)
text = re.sub(r'`([^`]+)`', r'\1', text) # `code`
text = re.sub(r'```[^\n]*\n(.*?)```', r'\1', text, flags=re.DOTALL) # ```code blocks```
# 5. Remove links but keep the text
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) # [text](url) -> text
text = re.sub(r'<([^>]+)>', r'\1', text) # <url> -> url
# 6. Remove images
text = re.sub(r'!\[([^\]]*)\]\([^\)]+\)', '', text) # ![alt](url)
# 7. Remove horizontal rules
text = re.sub(r'^[-*_]{3,}\s*$', '', text, flags=re.MULTILINE)
# 8. Remove blockquote markers
text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE)
# 9. Handle colons: replace with period for natural pause
text = text.replace(':', '.')
# 10. Remove brackets, parentheses, and other special characters
text = re.sub(r'[\[\]\(\){}]', '', text)
# 11. Remove any remaining isolated hash symbols or special markdown characters
text = re.sub(r'#+', '', text) # Remove any # characters
text = re.sub(r'[~^]', '', text) # Remove strikethrough and other markers
# 12. Clean up multiple periods or punctuation
text = re.sub(r'\.{2,}', '.', text) # Multiple periods -> single period
text = re.sub(r'[.!?]{2,}', '.', text) # Multiple punctuation -> single period
# 13. Normalize whitespace to a single space
text = re.sub(r'\s+', ' ', text).strip()
# 14. Remove leading/trailing periods that might result from the cleaning
text = re.sub(r'^\.\s*|\s*\.$', '', text).strip()
return text
SYMBOLS = {
r"\rightarrow": {"en": ", yields, ", "ar": "، ينتج، "},
r"\leftrightarrow": {"en": ", in equilibrium with, ", "ar": "، في حالة اتزان مع، "},
r"\cdot": {"en": " times ", "ar": " في "},
"+": {"en": " plus ", "ar": " زائد "},
"-": {"en": " minus ", "ar": " ناقص "},
"=": {"en": ", equals, ", "ar": "، يساوي، "},
}
def expand_element(element: str, lang: str = "en") -> str:
"""
Takes a chemical element string (e.g., 'CO2', 'H2O') and expands it
with spaces and converts numbers to words.
- 'CO2' becomes 'C O two'
- 'H2O' becomes 'H two O'
"""
# Only process if it looks like a chemical formula (contains letters and possibly numbers)
if not re.search(r'[A-Za-z]', element):
return element
# 1. Add spaces between adjacent letters and between letters/numbers.
spaced_element = re.sub(r"([A-Za-z])(?=[A-Za-z\d])", r"\1 ", element)
spaced_element = re.sub(r"(\d)(?=[A-Za-z])", r"\1 ", spaced_element)
# 2. Convert all numbers in the resulting string to words.
spoken_element = re.sub(r"(\d+)", lambda m: num2words(int(m.group(1)), lang=lang), spaced_element)
return spoken_element
def equation_to_speech_single(equation: str, lang: str = "en") -> str:
"""Converts a single LaTeX or plain text equation into a pronounceable string."""
# Debug: print the equation and check for backslashes
print(f"DEBUG equation_to_speech_single input: '{equation}'")
print(f"DEBUG equation bytes: {equation.encode('unicode_escape').decode('ascii')}")
# Handle both raw backslash and escape sequences
# Replace escape sequences that might have been interpreted
equation = equation.replace('\r', '\\r') # Fix carriage return back to \r
equation = equation.replace('\n', '\\n') # Fix newline back to \n
equation = equation.replace('\t', '\\t') # Fix tab back to \t
print(f"DEBUG after escape fix: {repr(equation)}")
# 1. FIRST: Replace LaTeX symbols with pronounceable text (before any other processing)
for symbol, replacement in SYMBOLS.items():
if symbol in equation:
print(f"DEBUG: Found and replacing symbol: {symbol}")
equation = equation.replace(symbol, replacement[lang])
# 2. Clean up LaTeX structural commands
equation = re.sub(r"\\frac{([^}]+)}{([^}]+)}", lambda m: f"{m.group(1)} over {m.group(2)}", equation)
equation = re.sub(r"_\{([^}]+)\}", r"\1", equation) # Remove subscripts, e.g., H_{2} -> H2
equation = re.sub(r"\^\{([^}]+)\}", r"\1", equation) # Remove superscripts
equation = equation.replace("{", "").replace("}", "")
# 3. Split by the replacement text to preserve operators
# We need to identify chemical formulas vs operators
parts = []
current = ""
i = 0
while i < len(equation):
# Check if we're at the start of an operator phrase
found_operator = False
for symbol_text in [" plus ", " minus ", ", equals, ", ", yields, ", ", in equilibrium with, ", " times "]:
if equation[i:].startswith(symbol_text):
if current.strip():
parts.append(("formula", current.strip()))
current = ""
parts.append(("operator", symbol_text.strip()))
i += len(symbol_text)
found_operator = True
break
if not found_operator:
current += equation[i]
i += 1
if current.strip():
parts.append(("formula", current.strip()))
# 4. Process each part appropriately
result = []
for part_type, content in parts:
if part_type == "operator":
result.append(content)
else:
# This is a chemical formula or number, expand it
result.append(expand_element(content, lang))
return " ".join(result)
def translate_equations_in_text(text: str, lang: str = "en") -> str:
"""Finds all LaTeX equations in a text block and replaces them."""
# This regex finds content inside $, $$, or \[...\]
pattern = re.compile(r"\$([^$]+)\$|\\\[([^]]+)\\\]")
def replacer(match):
# The content is in either the first or second capture group
content = match.group(1) or match.group(2)
if content:
return equation_to_speech_single(content, lang)
return match.group(0)
return pattern.sub(replacer, text)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment