Commit e2d1bde7 authored by salma's avatar salma

RVC server code

parent 6a698cac
import os
import sys
import time
import traceback
import numpy as np
from scipy.io import wavfile
from dotenv import load_dotenv
from openai import OpenAI
# Setup
now_dir = os.getcwd()
sys.path.append(now_dir)
load_dotenv()
os.environ["weight_root"] = "assets/weights"
os.environ["index_root"] = "logs"
from configs.config import Config
from infer.modules.vc.modules import VC
# --- CONFIGURATION ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
MODEL_NAME = "anan-40.pth"
INDEX_PATH = "logs/anan-40/added_IVF3961_Flat_nprobe_1_anan-40_v2.index"
RVC_PARAMS = {
"f0_up_key": 0,
"f0_method": "rmvpe",
"index_rate": 0,
"filter_radius": 3,
"resample_sr": 0,
"rms_mix_rate": 0.25,
"protect": 0.33
}
def init_rvc_model():
print(">>> [1/3] Initializing RVC Configuration...")
config = Config()
config.weight_root = "assets/weights"
config.index_root = "logs"
vc = VC(config)
print(f">>> [2/3] Loading Model '{MODEL_NAME}' to GPU...")
try:
vc.get_vc(MODEL_NAME)
except Exception as e:
print(f"CRITICAL ERROR loading model: {e}")
sys.exit(1)
print(">>> [3/3] Warming up (Loading RMVPE & Hubert into VRAM)...")
perform_warmup(vc)
return vc
def perform_warmup(vc):
dummy_path = "/dev/shm/warmup.wav"
sr = 16000
silent_audio = np.zeros(sr, dtype=np.int16)
wavfile.write(dummy_path, sr, silent_audio)
try:
vc.vc_single(0, dummy_path, 0, None, "rmvpe", INDEX_PATH, "", 0.75, 3, 0, 0.25, 0.33)
print(">>> Warm-up Complete! System is hot.")
except Exception as e:
print(f">>> Warm-up warning: {e}")
if os.path.exists(dummy_path):
os.remove(dummy_path)
def generate_openai_audio(client, text, output_path):
print(f"\n--- Generating OpenAI Audio ({len(text)} chars) ---")
start = time.time()
try:
response = client.audio.speech.create(
model="gpt-4o-mini-tts",
voice="alloy",
input=text,
response_format="wav"
)
with open(output_path, "wb") as f:
f.write(response.content)
end = time.time()
print(f"OpenAI TTS Time: {end - start:.4f} seconds")
return True
except Exception as e:
print(f"OpenAI Error: {e}")
return False
def main():
if not OPENAI_API_KEY:
print("Error: OPENAI_API_KEY not found.")
return
vc_instance = init_rvc_model()
openai_client = OpenAI(api_key=OPENAI_API_KEY)
print("\n" + "="*60)
print(" 🚀 AWS SYSTEM READY (HOT-LOADED)")
print("="*60)
while True:
try:
user_text = input("\n📝 Enter text: ")
if user_text.lower() in ['exit', 'quit']:
break
if not user_text.strip():
continue
temp_input = "/dev/shm/temp_openai.wav"
output_wav = "output_response.wav"
# 1. OpenAI Generation
if not generate_openai_audio(openai_client, user_text, temp_input):
continue
print("--- Converting Voice (RVC) ---")
rvc_start = time.time()
info, audio_tuple = vc_instance.vc_single(
0,
temp_input,
RVC_PARAMS["f0_up_key"],
None,
RVC_PARAMS["f0_method"],
INDEX_PATH,
"",
RVC_PARAMS["index_rate"],
RVC_PARAMS["filter_radius"],
RVC_PARAMS["resample_sr"],
RVC_PARAMS["rms_mix_rate"],
RVC_PARAMS["protect"]
)
rvc_end = time.time()
if audio_tuple is not None:
tgt_sr, audio_opt = audio_tuple
wavfile.write(output_wav, tgt_sr, audio_opt)
print(f"✅ Success! Saved to: {output_wav}")
print(f"⚡ RVC Inference Time: {rvc_end - rvc_start:.4f} seconds")
else:
print(f"❌ RVC Conversion Failed. Info: {info}")
if os.path.exists(temp_input):
os.remove(temp_input)
except KeyboardInterrupt:
print("\nStopping...")
break
except Exception as e:
print(f"Unexpected Error: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()
\ No newline at end of file
import os
import sys
import time
import uuid
import numpy as np
from scipy.io import wavfile
from dotenv import load_dotenv
from openai import OpenAI
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse
from pydantic import BaseModel
"""
RVC Voice Agent API
This FastAPI application integrates OpenAI's TTS capabilities with the RVC voice conversion model.
It accepts text input, generates speech using OpenAI, processes it through RVC, and returns the final audio.
"""
# Assuming this exists in your local files as per your snippet
try:
from text_processing import prepare_text_for_audio
except ImportError:
# Fallback if file is missing during testing
def prepare_text_for_audio(text): return text
# 1. Setup Environment
now_dir = os.getcwd()
sys.path.append(now_dir)
load_dotenv()
# Set paths for RVC
os.environ["weight_root"] = "assets/weights"
os.environ["index_root"] = "logs"
from configs.config import Config
from infer.modules.vc.modules import VC
# --- CONFIGURATION ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
MODEL_NAME = "anan-40.pth"
INDEX_PATH = "logs/anan-40/added_IVF3961_Flat_nprobe_1_anan-40_v2.index"
# Optimized RVC Parameters
RVC_PARAMS = {
"f0_up_key": -3,
"f0_method": "rmvpe",
"index_rate": 0,
"filter_radius": 3,
"resample_sr": 0,
"rms_mix_rate": 0.25,
"protect": 0.33
}
# --- GLOBAL VARIABLES ---
app = FastAPI(title="RVC Voice Agent API")
vc_instance = None
openai_client = None
# --- DATA MODELS ---
class TextRequest(BaseModel):
text: str
speed: float = 1.0 # <--- NEW: Optional arg, default is 1.0 (Range 0.25 - 4.0)
# --- HELPER FUNCTIONS ---
def perform_warmup(vc):
print(">>> [Warmup] Running dummy inference...")
dummy_path = "/dev/shm/warmup.wav"
sr = 16000
silent_audio = np.zeros(sr, dtype=np.int16)
wavfile.write(dummy_path, sr, silent_audio)
try:
vc.vc_single(0, dummy_path, 0, None, "rmvpe", INDEX_PATH, "", 0.75, 3, 0, 0.25, 0.33)
print(">>> [Warmup] Success! System is ready.")
except Exception as e:
print(f">>> [Warmup] Warning: {e}")
if os.path.exists(dummy_path):
os.remove(dummy_path)
def cleanup_files(file_paths: list):
"""Background task to delete temp files after response is sent"""
for path in file_paths:
if os.path.exists(path):
try:
os.remove(path)
except:
pass
# --- LIFECYCLE EVENTS ---
@app.on_event("startup")
async def startup_event():
global vc_instance, openai_client
if not OPENAI_API_KEY:
print("CRITICAL ERROR: OPENAI_API_KEY not found.")
sys.exit(1)
openai_client = OpenAI(api_key=OPENAI_API_KEY)
print(">>> [Startup] Initializing RVC Config...")
config = Config()
config.weight_root = "assets/weights"
config.index_root = "logs"
vc_instance = VC(config)
print(f">>> [Startup] Loading Model '{MODEL_NAME}' to GPU...")
try:
vc_instance.get_vc(MODEL_NAME)
except Exception as e:
print(f"CRITICAL ERROR loading model: {e}")
sys.exit(1)
perform_warmup(vc_instance)
# --- API ENDPOINTS ---
@app.get("/")
def root():
return {"status": "running", "model": MODEL_NAME}
@app.post("/generate_audio")
async def generate_audio(request: TextRequest, background_tasks: BackgroundTasks):
start_time = time.time()
unique_id = str(uuid.uuid4())
temp_openai_path = f"/dev/shm/openai_{unique_id}.wav"
final_output_path = f"/dev/shm/rvc_{unique_id}.wav"
# --- STEP 1: PRE-PROCESS TEXT ---
print(f"Original Text: {request.text[:50]}...")
clean_text = prepare_text_for_audio(request.text)
print(f"Cleaned Text: {clean_text[:50]}...")
# --- STEP 2: OpenAI TTS ---
try:
response = openai_client.audio.speech.create(
model="gpt-4o-mini-tts",
voice="alloy",
input=clean_text,
speed=request.speed, # <--- NEW: Using the speed from request
response_format="wav"
)
with open(temp_openai_path, "wb") as f:
f.write(response.content)
except Exception as e:
print(f"OpenAI Error: {e}")
raise HTTPException(status_code=500, detail=f"OpenAI Error: {str(e)}")
# --- STEP 3: RVC Inference ---
try:
info, audio_tuple = vc_instance.vc_single(
0,
temp_openai_path,
RVC_PARAMS["f0_up_key"],
None,
RVC_PARAMS["f0_method"],
INDEX_PATH,
"",
RVC_PARAMS["index_rate"],
RVC_PARAMS["filter_radius"],
RVC_PARAMS["resample_sr"],
RVC_PARAMS["rms_mix_rate"],
RVC_PARAMS["protect"]
)
if audio_tuple is None:
raise Exception(f"RVC Conversion returned None. Info: {info}")
tgt_sr, audio_opt = audio_tuple
wavfile.write(final_output_path, tgt_sr, audio_opt)
except Exception as e:
if os.path.exists(temp_openai_path): os.remove(temp_openai_path)
print(f"RVC Error: {e}")
raise HTTPException(status_code=500, detail=f"RVC Error: {str(e)}")
# --- STEP 4: Return & Cleanup ---
background_tasks.add_task(cleanup_files, [temp_openai_path, final_output_path])
total_time = time.time() - start_time
print(f"Request processed in {total_time:.4f}s with speed {request.speed}")
return FileResponse(
final_output_path,
media_type="audio/wav",
filename="response.wav"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5000)
\ No newline at end of file
import os
import sys
import numpy as np
import faiss
from sklearn.cluster import MiniBatchKMeans
from tqdm import tqdm
"""
This script manually creates a Faiss index for voice conversion features.
It loads pre-extracted features
trains a Faiss index, and saves the populated index to disk.
"""
# --- CONFIGURATION ---
exp_name = "anan-40"
version = "v2"
feature_dim = 768
# ---------------------
root_dir = "/home/ec2-user/RVC/Retrieval-based-Voice-Conversion-WebUI"
exp_dir = os.path.join(root_dir, "logs", exp_name)
feature_dir = os.path.join(exp_dir, f"3_feature{feature_dim}")
print(f"--- Starting Index Training for {exp_name} ---")
# 1. Load Features
if not os.path.exists(feature_dir):
print(f"Error: Directory not found: {feature_dir}")
sys.exit(1)
listdir_res = list(os.listdir(feature_dir))
if len(listdir_res) == 0:
print("Error: Feature directory is empty! You need to run Feature Extraction first.")
sys.exit(1)
print(f"Found {len(listdir_res)} feature files.")
npys = []
# --- Progress Bar for Loading ---
for name in tqdm(sorted(listdir_res), desc="Loading Features", unit="file"):
phone = np.load(os.path.join(feature_dir, name))
npys.append(phone)
big_npy = np.concatenate(npys, 0)
print(f"Total features loaded: {big_npy.shape}")
# 2. Shuffle
print("Shuffling features...")
big_npy_idx = np.arange(big_npy.shape[0])
np.random.shuffle(big_npy_idx)
big_npy = big_npy[big_npy_idx]
# 3. K-Means (Optimization for large datasets)
if big_npy.shape[0] > 2e5:
print(f"Dataset is large ({big_npy.shape[0]} rows). Applying K-Means clustering...")
try:
big_npy = (
MiniBatchKMeans(
n_clusters=10000,
verbose=True,
batch_size=256 * 8,
compute_labels=False,
init="random",
)
.fit(big_npy)
.cluster_centers_
)
except Exception as e:
print(f"K-Means failed: {e}")
# 4. Train Index
print("Training Faiss Index (Log outputs below)...")
n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39)
index = faiss.index_factory(feature_dim, f"IVF{n_ivf},Flat")
index_ivf = faiss.extract_index_ivf(index)
index_ivf.nprobe = 1
# --- ENABLE VERBOSE MODE (Shows internal progress) ---
index.verbose = True
# ---------------------------------------------------
index.train(big_npy)
# 5. Add Data and Save
index_name = f"added_IVF{n_ivf}_Flat_nprobe_1_{exp_name}_{version}.index"
save_path = os.path.join(exp_dir, index_name)
print(f"Adding data to index...")
batch_size_add = 8192
# --- Progress Bar for Indexing ---
total_batches = (big_npy.shape[0] + batch_size_add - 1) // batch_size_add
for i in tqdm(range(0, big_npy.shape[0], batch_size_add), desc="Populating Index", total=total_batches, unit="batch"):
index.add(big_npy[i : i + batch_size_add])
faiss.write_index(index, save_path)
print(f"--------------------------------------------------")
print(f"SUCCESS! Index saved to: {save_path}")
print(f"--------------------------------------------------")
\ No newline at end of file
# Steps to install RVC
## 1. clone the repo
```bash
git clone https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI.git
```
## 2. Install Dependencies
Follow the readme of the repo to install dependencies
\ No newline at end of file
import re
from num2words import num2words
"""
This module processes text containing LaTeX equations and mathematical symbols,
translating them into spoken forms suitable for text-to-speech synthesis in both
English and Arabic. It includes smart language detection based on surrounding text context.
"""
# --- 1. Symbols Dictionary ---
SYMBOLS = {
# 1. LaTeX Commands
r"\\rightarrow": {"en": " yields ", "ar": " ينتج "},
r"\\leftrightarrow": {"en": " in equilibrium with ", "ar": " في حالة اتزان مع "},
r"\\cdot": {"en": " times ", "ar": " في "},
r"\\times": {"en": " times ", "ar": " في "},
r"\\div": {"en": " divided by ", "ar": " على "},
# 2. Raw Unicode Arrows (ADDED THIS)
r"→": {"en": " yields ", "ar": " ينتج "},
r"↔": {"en": " in equilibrium with ", "ar": " في حالة اتزان مع "},
r"=>": {"en": " yields ", "ar": " ينتج "}, # Just in case
# 3. Basic Math Symbols
r"\+": {"en": " plus ", "ar": " زائد "},
r"-": {"en": " minus ", "ar": " ناقص "},
r"\*": {"en": " times ", "ar": " في "},
r"×": {"en": " times ", "ar": " في "},
r"/": {"en": " divided by ", "ar": " على "},
r"÷": {"en": " divided by ", "ar": " على "},
r"=": {"en": " equals ", "ar": " يساوي "},
}
# --- 2. Smart Language Detection ---
def detect_context_language(text: str, fallback="en") -> str:
# Remove non-letters for accurate counting
clean_text = re.sub(r'[0-9\s\W]', '', text)
arabic_count = len(re.findall(r'[\u0600-\u06FF]', clean_text))
english_count = len(re.findall(r'[a-zA-Z]', clean_text))
if arabic_count == 0 and english_count == 0:
return fallback
if english_count >= arabic_count:
return "en"
else:
return "ar"
# --- 3. Processing Functions ---
def process_latex_match(match):
"""Handles LaTeX blocks like $...$"""
content = match.group(1) or match.group(2)
if not content: return match.group(0)
lang = detect_context_language(content, fallback="en")
content = content.replace('{', ' ').replace('}', ' ')
over_word = " over " if lang == "en" else " على "
content = re.sub(r"\\frac\s*(\S+)\s*(\S+)", f"\\1{over_word}\\2", content)
for pattern, replacement in SYMBOLS.items():
clean_pat = pattern.replace("\\\\", "\\")
if clean_pat in content:
content = content.replace(clean_pat, replacement[lang])
return content
def process_raw_symbols(text: str) -> str:
"""
Finds math symbols outside of LaTeX and replaces them.
"""
# Loop through dictionary
for pattern, replacement in SYMBOLS.items():
if pattern.startswith(r"\\"): continue
regex = re.compile(f"({pattern})")
parts = regex.split(text)
output = []
for i, part in enumerate(parts):
if re.match(regex, part):
# INCREASED WINDOW TO 50 chars
# This helps it see "معادلة أكسيد" even if the equation is long
prev_chunk = parts[i-1][-50:] if i > 0 else ""
next_chunk = parts[i+1][:50] if i < len(parts)-1 else ""
context = prev_chunk + next_chunk
# Default to Arabic (ar) if unsure, since your content seems mostly Arabic
lang = detect_context_language(context, fallback="ar")
output.append(replacement[lang])
else:
output.append(part)
text = "".join(output)
return text
def sanitize_text(text: str) -> str:
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'(\*\*|__)(.*?)(\*\*|__)', r'\2', text)
text = re.sub(r'`([^`]+)`', r'\1', text)
text = re.sub(r'\[([^\]]+)\]\(.*?\)', r'\1', text)
text = re.sub(r'[\[\]\(\){}]', ' ', text)
text = text.replace('_', ' ')
# Hyphen logic: Replace "-" with comma if not between numbers/letters
text = re.sub(r'(?<![a-zA-Z0-9])\s*-\s*', '، ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
# --- MASTER FUNCTION ---
def prepare_text_for_audio(text: str) -> str:
# 1. Sanitize
text = sanitize_text(text)
# 2. Handle LaTeX Equations
text = re.sub(r"\$([^$]+)\$|\\\[([^]]+)\\\]", process_latex_match, text)
# 3. Handle Raw Symbols (Now includes Unicode →)
text = process_raw_symbols(text)
return text
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment