Commit 5be82ab8 authored by salma's avatar salma

test script TTS with the new RVC pipeline

parent 8b8c37b8
import requests
import concurrent.futures
import time
# Configuration
URL = "http://ec2-18-193-226-85.eu-central-1.compute.amazonaws.com:5000/generate_audio"
WORKERS = 6
# We define 6 different payloads to ensure the model isn't just caching the result.
# We keep the mix of Arabic, English, and LaTeX to stress test the tokenizer.
test_inputs = [
{
"id": 1,
"text": r"TEST 1: عملية الاحتراق combustion process بتوصف تفاعل الميثان. Equation: $CH_4 + 2O_2 \rightarrow CO_2 + 2H_2O$."
},
{
"id": 2,
"text": r"TEST 2: التفاعل ده بيطلع طاقة كبيرة جدا. This results in heat energy release. $E = mc^2$ is not used here."
},
{
"id": 3,
"text": r"TEST 3: النتيجة النهائية هي بخار الماء وغاز ثاني أكسيد الكربون. Output: $CO_2$ and $H_2O$ vapor."
},
{
"id": 4,
"text": r"TEST 4: لو الأكسجين قليل، الاحتراق مش هيكون كامل. Incomplete combustion produces $CO$ (Carbon Monoxide)."
},
{
"id": 5,
"text": r"TEST 5: الميثان هو أبسط مركب هيدروكربوني. Methane is the simplest hydrocarbon with formula $CH_4$."
},
{
"id": 6,
"text": r"TEST 6: كيمياء الحرارة Thermochemistry بتدرس التغيرات دي. $\Delta H$ is negative for this reaction."
}
]
def send_request(data):
"""
Function to send a single request.
This will be run by multiple threads simultaneously.
"""
idx = data['id']
text_content = data['text']
print(f"Worker {idx}: Sending Request...")
payload = {"text": text_content}
headers = {"Content-Type": "application/json"}
try:
start_time = time.time()
response = requests.post(URL, json=payload, headers=headers, timeout=60)
duration = time.time() - start_time
if response.status_code == 200:
filename = f"test_result_{idx}.wav"
with open(filename, "wb") as f:
f.write(response.content)
return f"Worker {idx}: Success! Saved {filename} (Time: {duration:.2f}s)"
else:
return f"Worker {idx}: Failed (Status: {response.status_code})"
except requests.exceptions.RequestException as e:
return f"Worker {idx}: Error - {e}"
# Main Execution
if __name__ == "__main__":
print(f"--- Starting Load Test with {WORKERS} Concurrent Workers ---")
start_global = time.time()
# ThreadPoolExecutor manages the threads for us
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
# This maps the function 'send_request' to every item in 'test_inputs'
# and starts them all immediately.
results = list(executor.map(send_request, test_inputs))
print("\n--- Test Completed ---")
print(f"Total Time Taken: {time.time() - start_global:.2f} seconds\n")
# Print results sorted
for result in sorted(results):
print(result)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment