Commit f1823d1f authored by Salma Mohammed Hamed's avatar Salma Mohammed Hamed

Merge branch 'no_n8n' into 'master'

No n8n

See merge request !2
parents 513504cb a58e55a2
......@@ -73,6 +73,11 @@ services:
N8N_WEBHOOK_URL: "${N8N_WEBHOOK_URL}"
OPENAI_API_KEY: "${OPENAI_API_KEY}"
MINIO_BUCKET: "${MINIO_BUCKET}"
POSTGRES_HOST: "postgres"
POSTGRES_USER: "${POSTGRES_USER}"
POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
POSTGRES_DB: "${POSTGRES_DB}"
volumes:
- ./uploads:/app/uploads
depends_on:
......
from .enums import MessageType, ResponseStatus
from .enums import MessageType, ResponseStatus, StudentNationality, Models
from .config import AppConfig
\ No newline at end of file
......@@ -5,15 +5,12 @@ from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Configuration Management
@dataclass
class AppConfig:
minio_endpoint: str
minio_access_key: str
minio_secret_key: str
minio_bucket: str
n8n_webhook_url: str
openai_api_key: str
@classmethod
......@@ -23,6 +20,5 @@ class AppConfig:
minio_access_key=os.getenv("MINIO_ACCESS_KEY"),
minio_secret_key=os.getenv("MINIO_SECRET_KEY"),
minio_bucket=os.getenv("MINIO_BUCKET"),
n8n_webhook_url=os.getenv("N8N_WEBHOOK_URL"),
openai_api_key=os.getenv("OPENAI_API_KEY")
)
\ No newline at end of file
......@@ -8,4 +8,16 @@ class MessageType(str, Enum):
class ResponseStatus(str, Enum):
SUCCESS = "success"
ERROR = "error"
PROCESSING = "processing"
\ No newline at end of file
PROCESSING = "processing"
class StudentNationality(str, Enum):
EGYPTIAN = "egyptian"
SAUDI = "saudi"
class Models(str, Enum):
chat = "gpt-5-nano"
tts = "gpt-4o-mini-tts"
embedding = "text-embedding-3-small"
transcription = "whisper-1"
from fastapi import UploadFile, HTTPException
from abc import ABC, abstractmethod
import sys
import io
sys.path.append("../")
from repositories import WebhookClient, StorageRepository
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import MessageType, ResponseStatus
from services import OpenAIService
from repositories import StorageRepository
from services.openai_service import OpenAIService
class MessageHandler(ABC):
@abstractmethod
def handle(self, **kwargs) -> dict:
pass
class AudioMessageHandler():
def __init__(self, storage_repo: StorageRepository, webhook_client: WebhookClient,
bucket: str, openai_service: OpenAIService):
class AudioMessageHandler:
def __init__(self, storage_repo: StorageRepository, bucket: str, openai_service: OpenAIService):
self.storage_repo = storage_repo
self.webhook_client = webhook_client
self.bucket = bucket
self.openai_service = openai_service
def handle(self, file: UploadFile, **kwargs) -> dict:
"""Process audio message - transcribe locally using OpenAI Whisper"""
try:
print(f"Processing audio file: {file.filename}")
# Read file content
file.file.seek(0)
file_content = file.file.read()
if not file_content:
raise HTTPException(status_code=400, detail="Empty audio file received")
# Upload original file to MinIO for backup
file_path = f"audio/{file.filename}"
file_stream = io.BytesIO(file_content)
self.storage_repo.upload_file(file_stream, self.bucket, file_path)
print(f"Uploaded {file.filename} to MinIO at {file_path}")
if not self.openai_service.is_available():
raise HTTPException(status_code=500, detail="OpenAI service not available for transcription")
# Try to transcribe the audio using OpenAI service
# Transcribe using OpenAI Whisper
try:
transcribed_text = self.openai_service.transcribe_audio(file_content, file.filename)
# Send transcribed text to n8n
payload = {
"type": MessageType.AUDIO,
"message": transcribed_text,
}
self.webhook_client.send_webhook(payload)
print(f"Sent transcribed text to n8n: {transcribed_text[:100]}...")
print(f"Transcription successful: {transcribed_text}")
return {
"status": ResponseStatus.SUCCESS,
"message": "Audio transcribed and forwarded to n8n.",
"transcription": transcribed_text
"status": ResponseStatus.SUCCESS,
"message": "Audio transcribed successfully",
"transcription": transcribed_text,
"message_type": MessageType.AUDIO
}
except Exception as transcription_error:
print(f"Transcription failed: {transcription_error}")
print(f"Local transcription failed: {transcription_error}")
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(transcription_error)}")
# Fallback: send filename to n8n for processing there
payload = {
"type": MessageType.AUDIO,
"filename": file.filename,
"message": "تم تسجيل رسالة صوتية - فشل في التفريغ المحلي",
"transcription_source": "fallback",
"error": str(transcription_error)
}
self.webhook_client.send_webhook(payload)
print(f"Sent filename to n8n as fallback: {file.filename}")
return {
"status": ResponseStatus.SUCCESS,
"message": "Audio uploaded, transcription failed - sent to n8n for processing.",
"filename": file.filename,
"transcription_error": str(transcription_error)
}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
print(f"Error processing audio message: {e}")
raise HTTPException(status_code=500, detail=f"Failed to process audio message: {str(e)}")
\ No newline at end of file
raise HTTPException(status_code=500, detail=f"Failed to process audio: {str(e)}")
finally:
# Reset file pointer for potential reuse
file.file.seek(0)
\ No newline at end of file
from abc import ABC, abstractmethod
import sys
sys.path.append("../")
from repositories import WebhookClient
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import MessageType, ResponseStatus
class TextMessageHandler():
def __init__(self, webhook_client: WebhookClient):
self.webhook_client = webhook_client
def handle(self, text: str, **kwargs) -> dict:
print(f"Received text message: {text}")
payload = {"type": MessageType.TEXT, "message": text}
self.webhook_client.send_webhook(payload)
return {"status": ResponseStatus.SUCCESS, "message": "Message received and forwarded to n8n."}
class TextMessageHandler:
def __init__(self):
pass # No dependencies needed for text handling
def handle(self, text: str, **kwargs) -> dict:
"""Process text message - simple validation and pass-through"""
try:
if not text or not text.strip():
raise ValueError("Text message cannot be empty")
print(f"Processing text message: {text[:50]}...")
return {
"status": ResponseStatus.SUCCESS,
"message": "Text message processed successfully",
"text": text.strip(),
"message_type": MessageType.TEXT
}
except Exception as e:
print(f"Error processing text message: {e}")
return {
"status": ResponseStatus.ERROR,
"message": f"Failed to process text: {str(e)}",
"message_type": MessageType.TEXT
}
\ No newline at end of file
import os
import boto3
from botocore.client import Config
import requests
import time
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from starlette.background import BackgroundTask
from pydantic import BaseModel
import tempfile
from typing import Optional
import uvicorn
import json
from botocore.exceptions import ClientError
from typing import Optional, Protocol
import base64
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from core import MessageType, ResponseStatus, AppConfig
from schemas import WebhookResponse, TextMessage
from core import AppConfig
from repositories import StorageRepository, MinIOStorageRepository
from repositories import WebhookClient, N8NWebhookClient
from handlers import AudioMessageHandler, TextMessageHandler
from services import AudioService, ChatService, HealthService, ResponseService, ResponseManager, WebhookService, OpenAIService
from services import (
AudioService, ChatService, HealthService, ResponseService,
ResponseManager, OpenAIService, AgentService
)
class DIContainer:
def __init__(self):
self.config = AppConfig.from_env()
self.storage_repo = MinIOStorageRepository(self.config)
self.webhook_client = N8NWebhookClient(self.config.n8n_webhook_url)
self.response_manager = ResponseManager()
# Initialize OpenAI service
# Initialize OpenAI and Agent services
self.openai_service = OpenAIService()
self.agent_service = AgentService()
# Updated services to use OpenAI service
# Initialize services
self.audio_service = AudioService(self.storage_repo, self.config.minio_bucket)
self.chat_service = ChatService(
self.storage_repo,
self.webhook_client,
self.response_manager,
self.config,
self.openai_service # Pass OpenAI service
)
self.webhook_service = WebhookService(
self.response_manager,
self.storage_repo,
self.config.minio_bucket,
self.openai_service # Pass OpenAI service
self.openai_service,
self.agent_service
)
self.response_service = ResponseService(self.response_manager, self.audio_service)
self.health_service = HealthService(self.storage_repo, self.config)
# FastAPI App Factory
def create_app() -> FastAPI:
app = FastAPI(title="Unified Chat API")
app = FastAPI(title="Unified Chat API with Local Agent")
# Add CORS middleware
app.add_middleware(
......@@ -72,21 +52,20 @@ def create_app() -> FastAPI:
# Print configuration
print("MinIO Endpoint:", container.config.minio_endpoint)
print("MinIO Bucket:", container.config.minio_bucket)
print("n8n Webhook URL:", container.config.n8n_webhook_url)
print("OpenAI Service Available:", container.openai_service.is_available())
print("Agent Service Available:", container.agent_service.is_available())
@app.post("/chat")
async def chat_handler(
file: Optional[UploadFile] = File(None),
text: Optional[str] = Form(None)
):
"""Handles incoming chat messages (either text or audio). Forwards the message to the n8n webhook."""
"""
Handles incoming chat messages (either text or audio).
Generates responses locally using the agent service.
"""
return container.chat_service.process_message(file, text)
@app.post("/n8n_webhook_receiver")
async def n8n_webhook_receiver(response: WebhookResponse):
"""Receives and processes webhook data from n8n."""
return container.webhook_service.process_webhook_response(response)
@app.get("/get-audio-response")
async def get_audio_response():
"""Fetches the agent's text and audio response."""
......@@ -94,26 +73,93 @@ def create_app() -> FastAPI:
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return container.health_service.get_health_status()
"""Health check endpoint with agent service status"""
health_status = container.health_service.get_health_status()
# Add agent service status
health_status.update({
"openai_service_status": "available" if container.openai_service.is_available() else "unavailable",
"agent_service_status": "available" if container.agent_service.is_available() else "unavailable"
})
return health_status
# Agent management endpoints
@app.get("/conversation/stats")
async def get_conversation_stats(conversation_id: str = "default"):
"""Get conversation statistics"""
return container.chat_service.get_agent_stats(conversation_id)
@app.post("/conversation/clear")
async def clear_conversation(conversation_id: str = "default"):
"""Clear conversation history"""
return container.chat_service.clear_conversation(conversation_id)
@app.post("/agent/system-prompt")
async def set_system_prompt(request: dict):
"""Update the agent's system prompt"""
prompt = request.get("prompt", "")
if not prompt:
raise HTTPException(status_code=400, detail="System prompt cannot be empty")
return container.chat_service.set_system_prompt(prompt)
@app.get("/agent/system-prompt")
async def get_system_prompt():
"""Get the current system prompt"""
return {
"system_prompt": container.agent_service.system_prompt,
"status": "success"
}
@app.get("/conversation/export")
async def export_conversation(conversation_id: str = "default"):
"""Export conversation history"""
history = container.agent_service.export_conversation(conversation_id)
return {
"conversation_id": conversation_id,
"messages": history,
"total_messages": len(history)
}
@app.post("/conversation/import")
async def import_conversation(request: dict):
"""Import conversation history"""
conversation_id = request.get("conversation_id", "default")
messages = request.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="Messages list cannot be empty")
container.agent_service.import_conversation(messages, conversation_id)
return {
"status": "success",
"message": f"Imported {len(messages)} messages to conversation {conversation_id}"
}
@app.get("/")
async def root():
"""Root endpoint with API info"""
return {
"service": "Unified Chat API",
"version": "1.0.0",
"description": "Unified backend for audio/text chat with an agent, powered by n8n.",
"service": "Unified Chat API with Local Agent",
"version": "2.1.0",
"description": "Unified backend for audio/text chat with a local AI agent.",
"features": [
"Local AI agent responses using OpenAI GPT",
"Audio transcription using OpenAI Whisper",
"Text-to-speech using OpenAI TTS",
"Conversation history management"
],
"endpoints": {
"chat": "/chat (accepts audio or text, forwards to n8n)",
"n8n_webhook_receiver": "/n8n-webhook-receiver (receives agent responses from n8n)",
"chat": "/chat (accepts audio or text, generates local agent response)",
"get_audio_response": "/get-audio-response (fetches agent's audio and text response)",
},
"conversation_stats": "/conversation/stats (get conversation statistics)",
"clear_conversation": "/conversation/clear (clear conversation history)",
"set_system_prompt": "/agent/system-prompt (update agent system prompt)",
"export_conversation": "/conversation/export (export conversation history)",
"health": "/health (service health check)"
}
}
return app
# Application entry point
app = create_app()
......
{
"name": "speech_agent",
"nodes": [
{
"parameters": {
"resource": "audio",
"operation": "transcribe",
"options": {
"language": "ar"
}
},
"type": "@n8n/n8n-nodes-langchain.openAi",
"typeVersion": 1.8,
"position": [
880,
-576
],
"id": "0635df4e-56be-4f44-b620-8f1d3e426208",
"name": "Transcribe a recording",
"credentials": {
"openAiApi": {
"id": "NX7wk8zVYdNiMg98",
"name": "OpenAi account"
}
}
},
{
"parameters": {
"modelId": {
"__rl": true,
"value": "gpt-5-nano",
"mode": "list",
"cachedResultName": "GPT-5-NANO"
},
"messages": {
"values": [
{
"content": "={{ $('set student nationality').item.json.student_nationality === 'Saudi' ? $('dialect based system prompts').item.json.system_prompt_s : $('dialect based system prompts').item.json.system_prompt_eg }}\n",
"role": "system"
},
{
"content": "={{ $('Webhook').item.json.body.type === 'text' ? $('Webhook').item.json.body.message : $json.text }}\n"
}
]
},
"options": {}
},
"type": "@n8n/n8n-nodes-langchain.openAi",
"typeVersion": 1.8,
"position": [
1088,
-384
],
"id": "5143c3bf-58e9-4dbf-9a24-1bf140f0ef21",
"name": "Message a model",
"credentials": {
"openAiApi": {
"id": "NX7wk8zVYdNiMg98",
"name": "OpenAi account"
}
}
},
{
"parameters": {
"bucketName": "coversation",
"fileKey": "question.mp3"
},
"type": "n8n-nodes-base.s3",
"typeVersion": 1,
"position": [
720,
-576
],
"id": "6f77a82f-8a20-4baa-82c7-806a8c7bdcb4",
"name": "Download a file",
"credentials": {
"s3": {
"id": "qI8Btoanv8tZEX9O",
"name": "S3 account"
}
}
},
{
"parameters": {
"httpMethod": "POST",
"path": "b470cc56-de9b-4b92-9c50-39bac33143cc",
"options": {}
},
"type": "n8n-nodes-base.webhook",
"typeVersion": 2.1,
"position": [
16,
-400
],
"id": "fc7dddcc-c0dc-4a19-b3fe-281e36438d2b",
"name": "Webhook",
"webhookId": "b470cc56-de9b-4b92-9c50-39bac33143cc"
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 2
},
"conditions": [
{
"id": "6dfab8d9-7e68-4037-b7d9-c8e374a4052e",
"leftValue": "={{ $('Webhook').item.json.body.type }}",
"rightValue": "audio",
"operator": {
"type": "string",
"operation": "equals",
"name": "filter.operator.equals"
}
}
],
"combinator": "and"
},
"options": {
"ignoreCase": false
}
},
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [
576,
-400
],
"id": "ecae597e-b081-4f38-9985-5887518b469a",
"name": "If"
},
{
"parameters": {
"method": "POST",
"url": "http://voice-agent:8000/n8n_webhook_receiver",
"sendBody": true,
"bodyParameters": {
"parameters": [
{
"name": "agent_responded",
"value": "yes"
},
{
"name": "agent_response",
"value": "={{ $json.message.content }}"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
1408,
-384
],
"id": "5a27f2a7-8903-414b-81fc-eb52600efa0e",
"name": "HTTP Request"
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "76dfd79b-fdc0-4706-b804-6e6af453bdb2",
"name": "system_prompt_eg",
"value": "إنت مدرس كيميا لطفل في ابتدائي. رد باللهجة المصريّة السهلة. كلّم الطفل كأنك بتحكي له بصوت طبيعي. خلي الجمل قصيرة وواضحة، وما تحشرش معلومات كتير في جملة واحدة. ما تقولش الحاجات البديهية اللي هو عارفها زي \"المَيَّه بتتشرب\". قول المعلومة مرّة واحدة من غير تكرار. لو هتدي مثال أو تشبيه، يكون حاجة جديدة بتوضّح الفكرة، مش مجرد إعادة. خلّي المثال بسيط زي لعبة، شكل، أو صورة في الخيال. اكتب الكلمات زي ما بتنطق بالتشكيل الصح (زي: مَيَّه، أوكسچين). لو فيه رموز كيميائية زي H2O أو CO2 اكتبها زي ما هي. خلي الشرح شبه حكاية صغيرة أو صورة في دماغ الطفل، مش زي شرح كتاب.",
"type": "string"
},
{
"id": "2264d4cb-b6fa-409d-b5bd-ab70244ac23a",
"name": "system_prompt_s",
"value": "إنت مُعلّم كيميا لطفل في ابتدائي. رد باللهجة السعوديّة الدارجة والبسيطة. كَلّم الطفل كأنك تحاكيه وجهاً لوجه بصوت طبيعي. خل الجمل قصار وواضحة، لا تكدّس معلومات كثير في جملة وحدة. لا تقول أشياء بديهية يعرفها مثل \"المُوَيَّه نشربها\". أعط المعلومة مرّة وحدة بلا تكرار. لو بتضرب مثال أو تشبيه، يكون زاوية جديدة توضّح الفكرة، ما يكون تكرار. خلّ المثال شي بسيط يقرّب المعنى للطفل: زي لعبة، حركة، أو صورة يتخيّلها. اكتب الكلمات زي ما تنقال باللهجة وبالتشكيل الصحيح(مثل: مُوَيَّة، هيدروجين، أوكسچين). لو فيه رموز كيميائية مثل H2O أو CO2 اكتُبها زي ما هي. الشرح يكون كأنه سواليف بسيطة أو حكاية تخلي الطفل يتصوّرها، مو زي كلام كتاب مدرسي.",
"type": "string"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [
176,
-400
],
"id": "dd152583-73d3-408c-b2e6-2c1e57959477",
"name": "dialect based system prompts"
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "b43a6cb2-2faf-4af1-b464-b27fcdb50e02",
"name": "student_nationality",
"value": "Saudi",
"type": "string"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [
368,
-400
],
"id": "dc10c19d-7b94-4212-96eb-c912108004b8",
"name": "set student nationality"
}
],
"pinData": {},
"connections": {
"Transcribe a recording": {
"main": [
[
{
"node": "Message a model",
"type": "main",
"index": 0
}
]
]
},
"Message a model": {
"main": [
[
{
"node": "HTTP Request",
"type": "main",
"index": 0
}
]
]
},
"Download a file": {
"main": [
[
{
"node": "Transcribe a recording",
"type": "main",
"index": 0
}
]
]
},
"Webhook": {
"main": [
[
{
"node": "dialect based system prompts",
"type": "main",
"index": 0
}
]
]
},
"If": {
"main": [
[
{
"node": "Download a file",
"type": "main",
"index": 0
}
],
[
{
"node": "Message a model",
"type": "main",
"index": 0
}
]
]
},
"dialect based system prompts": {
"main": [
[
{
"node": "set student nationality",
"type": "main",
"index": 0
}
]
]
},
"set student nationality": {
"main": [
[
{
"node": "If",
"type": "main",
"index": 0
}
]
]
}
},
"active": true,
"settings": {
"executionOrder": "v1"
},
"versionId": "11e62c7d-f788-4cc6-99e6-f6c682a7ba40",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "afbef30b0e6f21971b595b8009fe1ea8e4dce4dda3acc1c3ccc51175e6f32b69"
},
"id": "ycCxjCgSUjDbzQXy",
"tags": []
}
\ No newline at end of file
......@@ -6,3 +6,4 @@ fastapi
uvicorn[standard]
python-multipart
openai
psycopg2-binary
\ No newline at end of file
......@@ -3,5 +3,6 @@ from .chat_service import ChatService
from .health_service import HealthService
from .response_service import ResponseService
from .response_manager import ResponseManager
from .webhook_service import WebhookService
from .openai_service import OpenAIService
from .agent_service import AgentService
from .pgvector_service import PGVectorService
\ No newline at end of file
import logging
import os
from typing import List, Dict
from fastapi import HTTPException
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import StudentNationality, Models
from services.pgvector_service import PGVectorService
from services.openai_service import OpenAIService
logger = logging.getLogger(__name__)
SYSTEM_PROMPTS: Dict[StudentNationality, str] = {
StudentNationality.EGYPTIAN: """إنت مدرس كيميا لطفل في ابتدائي. رد باللهجة المصريّة السهلة. كلّم الطفل كأنك بتحكي له بصوت طبيعي. خلي الجمل قصيرة وواضحة، وما تحشرش معلومات كتير في جملة واحدة. ما تقولش الحاجات البديهية اللي هو عارفها زي "المَيَّه بتتشرب". قول المعلومة مرّة واحدة من غير تكرار. لو هتدي مثال أو تشبيه، يكون حاجة جديدة بتوضّح الفكرة، مش مجرد إعادة. خلّي المثال بسيط زي لعبة، شكل، أو صورة في الخيال. اكتب الكلمات زي ما بتنطق بالتشكيل الصح (زي: مَيَّه، أوكسچين). لو فيه رموز كيميائية زي H2O أو CO2 اكتبها زي ما هي. خلي الشرح شبه حكاية صغيرة أو صورة في دماغ الطفل، مش زي شرح كتاب.""",
StudentNationality.SAUDI: """إنت مُعلّم كيميا لطفل في ابتدائي. رد باللهجة السعوديّة الدارجة والبسيطة. كَلّم الطفل كأنك تحاكيه وجهاً لوجه بصوت طبيعي. خل الجمل قصار وواضحة، لا تكدّس معلومات كثير في جملة وحدة. لا تقول أشياء بديهية يعرفها مثل "المُوَيَّه نشربها". أعط المعلومة مرّة وحدة بلا تكرار. لو بتضرب مثال أو تشبيه، يكون زاوية جديدة توضّح الفكرة، ما يكون تكرار. خلّ المثال شي بسيط يقرّب المعنى للطفل: زي لعبة، حركة، أو صورة يتخيّلها. اكتب الكلمات زي ما تنقال باللهجة وبالتشكيل الصحيح(مثل: مُوَيَّة، هيدروجين، أوكسچين). لو فيه رموز كيميائية مثل H2O أو CO2 اكتُبها زي ما هي. الشرح يكون كأنه سواليف بسيطة أو حكاية تخلي الطفل يتصوّرها، مو زي كلام كتاب مدرسي."""
}
class AgentService:
"""Service class for handling AI agent conversations using OpenAI GPT and optional PGVector"""
def __init__(self, use_pgvector: bool = False):
self.openai_service = OpenAIService()
if not self.openai_service.is_available():
logger.warning("Warning: OPENAI_API_KEY not found. Agent service will be disabled.")
self.client = None
else:
self.client = self.openai_service.client
self.conversations: Dict[str, List[Dict[str, str]]] = {}
self.pgvector = PGVectorService() if use_pgvector else None
def is_available(self) -> bool:
return self.client is not None
def get_conversation_history(self, conversation_id: str = "default") -> List[Dict[str, str]]:
return self.conversations.get(conversation_id, [])
def add_message_to_history(self, message: str, role: str = "user", conversation_id: str = "default"):
if conversation_id not in self.conversations:
self.conversations[conversation_id] = []
self.conversations[conversation_id].append({"role": role, "content": message})
if len(self.conversations[conversation_id]) > 20:
messages = self.conversations[conversation_id]
if messages[0].get("role") == "system":
self.conversations[conversation_id] = [messages[0]] + messages[-19:]
else:
self.conversations[conversation_id] = messages[-20:]
def generate_response(
self,
user_message: str,
conversation_id: str = "default",
model: str = Models.chat,
temperature: float = 1.0,
nationality: StudentNationality = StudentNationality.EGYPTIAN,
top_k: int = 3
) -> str:
"""Generate a GPT response, optionally enriched with pgvector results"""
if not self.is_available():
raise HTTPException(status_code=500, detail="Agent service not available")
try:
self.add_message_to_history(user_message, "user", conversation_id)
# Pick system prompt
system_prompt = SYSTEM_PROMPTS.get(nationality, SYSTEM_PROMPTS[StudentNationality.EGYPTIAN])
messages = []
conversation_history = self.get_conversation_history(conversation_id)
if not conversation_history or conversation_history[0].get("role") != "system":
messages.append({"role": "system", "content": system_prompt})
self.conversations.setdefault(conversation_id, []).insert(0, {
"role": "system",
"content": system_prompt
})
messages.extend(conversation_history)
# If pgvector is enabled → enrich with nearest neighbors
if self.pgvector:
query_embedding = self.openai_service.generate_embedding(user_message)
neighbors = self.pgvector.search_nearest(query_embedding, limit=top_k)
if neighbors:
context_message = "Knowledge base search results:\n"
for n in neighbors:
context_message += f"- ID {n['id']} (distance {n['distance']:.4f})\n"
messages.append({"role": "system", "content": context_message})
# Generate AI response
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature
)
ai_response = response.choices[0].message.content.strip()
if not ai_response:
raise ValueError("Empty response from AI model")
self.add_message_to_history(ai_response, "assistant", conversation_id)
return ai_response
except Exception as e:
logger.error(f"Error generating AI response: {e}")
raise HTTPException(status_code=500, detail=f"AI response generation failed: {str(e)}")
def search_similar(self, query_embedding: List[float], top_k: int = 3):
"""Optional nearest neighbor search if PGVector is enabled"""
if not self.pgvector:
raise HTTPException(status_code=400, detail="PGVector service not enabled")
return self.pgvector.search_nearest(query_embedding, limit=top_k)
# ----------------- Suggested Test -----------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Agent with pgvector enabled
agent = AgentService(use_pgvector=True)
if agent.is_available():
reply = agent.generate_response("هو يعني إيه ذَرّة؟", model="gpt-5-nano", nationality=StudentNationality.EGYPTIAN)
print("AI:", reply)
else:
print("Agent service not available. Check OPENAI_API_KEY.")
......@@ -3,39 +3,127 @@ from typing import Optional
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import MessageType, AppConfig
from repositories import StorageRepository, WebhookClient
from core import MessageType, AppConfig, StudentNationality
from repositories import StorageRepository
from services.response_manager import ResponseManager
from services.openai_service import OpenAIService
from services.agent_service import AgentService
class ChatService:
def __init__(self, storage_repo: StorageRepository, webhook_client: WebhookClient,
response_manager: ResponseManager, config: AppConfig, openai_service: OpenAIService):
def __init__(self, storage_repo: StorageRepository, response_manager: ResponseManager,
config: AppConfig, openai_service: OpenAIService, agent_service: AgentService):
from handlers import AudioMessageHandler, TextMessageHandler
self.storage_repo = storage_repo
self.webhook_client = webhook_client
self.response_manager = response_manager
self.config = config
self.openai_service = openai_service
self.agent_service = agent_service
# Message handlers with OpenAI service dependency
# Message handlers (no webhook dependencies)
self.handlers = {
MessageType.AUDIO: AudioMessageHandler(
storage_repo,
webhook_client,
config.minio_bucket,
openai_service # Pass OpenAI service
openai_service
),
MessageType.TEXT: TextMessageHandler(webhook_client)
MessageType.TEXT: TextMessageHandler()
}
def process_message(self, file: Optional[UploadFile] = None, text: Optional[str] = None) -> dict:
"""Process incoming message and generate agent response directly"""
self.response_manager.clear_response()
if file and file.filename:
return self.handlers[MessageType.AUDIO].handle(file=file)
elif text:
return self.handlers[MessageType.TEXT].handle(text=text)
else:
raise HTTPException(status_code=400, detail="No text or audio file provided.")
\ No newline at end of file
try:
# Process the input message first
if file and file.filename:
# Handle audio message - transcribe first
result = self.handlers[MessageType.AUDIO].handle(file=file)
if result.get("status") == "success":
# Get transcribed text from the result
user_message = result.get("transcription", "")
if not user_message:
# Fallback message if transcription failed
user_message = "تم إرسال رسالة صوتية - فشل في التفريغ المحلي"
else:
raise HTTPException(status_code=400, detail="Failed to process audio message")
elif text:
# Handle text message
result = self.handlers[MessageType.TEXT].handle(text=text)
user_message = text
else:
raise HTTPException(status_code=400, detail="No text or audio file provided.")
# Generate agent response using the local agent service
try:
agent_response = self.agent_service.generate_response(user_message, nationality=StudentNationality.EGYPTIAN)
# Generate TTS audio from the response
audio_filename = self._generate_and_upload_audio(agent_response)
# Store response for retrieval
self.response_manager.store_response(agent_response, audio_filename)
print(f"Generated agent response: {agent_response[:100]}...")
return {
"status": "success",
"message": "Message processed and agent response ready",
"agent_response": agent_response,
"audio_filename": audio_filename
}
except Exception as agent_error:
print(f"Agent service error: {agent_error}")
raise HTTPException(status_code=500, detail=f"Agent response generation failed: {str(agent_error)}")
except Exception as e:
print(f"Error processing message: {e}")
raise HTTPException(status_code=500, detail=f"Failed to process message: {str(e)}")
def _generate_and_upload_audio(self, text: str) -> str:
"""Generate audio from text and upload to MinIO, return filename"""
try:
import time
# Generate audio using OpenAI service
temp_file_path = self.openai_service.generate_speech(text)
# Generate unique filename for MinIO
timestamp = int(time.time())
filename = f"agent_response_{timestamp}.mp3"
minio_file_path = f"audio/{filename}"
print(f"Uploading generated audio to MinIO: {minio_file_path}")
# Upload to MinIO
with open(temp_file_path, 'rb') as audio_file:
self.storage_repo.upload_file(audio_file, self.config.minio_bucket, minio_file_path)
# Clean up temporary file
self.openai_service.cleanup_temp_file(temp_file_path)
print(f"Successfully generated and uploaded TTS audio: {filename}")
return filename
except Exception as e:
print(f"Error generating and uploading audio: {e}")
# Don't fail the entire request if TTS fails
return None
def get_agent_stats(self, conversation_id: str = "default") -> dict:
"""Get conversation statistics from agent service"""
return self.agent_service.get_conversation_stats(conversation_id)
def clear_conversation(self, conversation_id: str = "default"):
"""Clear conversation history"""
self.agent_service.clear_conversation(conversation_id)
return {"status": "success", "message": f"Conversation {conversation_id} cleared"}
def set_system_prompt(self, prompt: str):
"""Update the agent's system prompt"""
self.agent_service.set_system_prompt(prompt)
return {"status": "success", "message": "System prompt updated"}
\ No newline at end of file
......@@ -13,12 +13,10 @@ class HealthService:
def get_health_status(self) -> dict:
minio_status = self.storage_repo.check_connection(self.config.minio_bucket)
n8n_status = "configured" if self.config.n8n_webhook_url else "not_configured"
return {
"status": "healthy",
"service": "unified-chat-api",
"service": "unified-chat-api-local-agent",
"minio_status": minio_status,
"n8n_status": n8n_status,
"timestamp": time.time()
}
\ No newline at end of file
......@@ -2,13 +2,16 @@ import os
import time
import tempfile
import io
from typing import Optional
from typing import Optional, List
from fastapi import HTTPException
from openai import OpenAI
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core import Models
class OpenAIService:
"""Service class for handling OpenAI API operations (TTS and Whisper)"""
"""Service class for handling OpenAI API operations (TTS, Whisper, Embeddings)"""
def __init__(self):
self.api_key = os.getenv("OPENAI_API_KEY")
......@@ -22,40 +25,24 @@ class OpenAIService:
"""Check if OpenAI service is available"""
return self.client is not None
# ------------------- Whisper -------------------
def transcribe_audio(self, file_content: bytes, filename: str, language: Optional[str] = "ar") -> str:
"""
Transcribe audio using OpenAI Whisper
Args:
file_content: Audio file content as bytes
filename: Original filename for context
language: Language code (optional, defaults to Arabic)
Returns:
Transcribed text
Raises:
HTTPException: If transcription fails or service unavailable
"""
"""Transcribe audio using OpenAI Whisper"""
if not self.is_available():
raise HTTPException(status_code=500, detail="OpenAI service not available")
try:
# Create file-like object for the API
audio_file = io.BytesIO(file_content)
audio_file.name = filename
print(f"Transcribing audio: {filename}")
# Call Whisper API
transcript = self.client.audio.transcriptions.create(
model="whisper-1",
model=Models.transcription,
file=audio_file,
language=language if language else None # Auto-detect if None
language=language if language else None
)
transcribed_text = transcript.text.strip()
if not transcribed_text:
raise ValueError("Empty transcription result")
......@@ -66,36 +53,22 @@ class OpenAIService:
print(f"Error during transcription: {e}")
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
def generate_speech(self, text: str, voice: str = "alloy", model: str = "tts-1") -> str:
"""
Generate speech from text using OpenAI TTS
Args:
text: Text to convert to speech
voice: Voice to use (alloy, echo, fable, onyx, nova, shimmer)
model: TTS model to use (tts-1 or tts-1-hd)
Returns:
Path to temporary file containing the generated audio
Raises:
HTTPException: If TTS generation fails or service unavailable
"""
# ------------------- TTS -------------------
def generate_speech(self, text: str, voice: str = "alloy") -> str:
"""Generate speech from text using OpenAI TTS"""
if not self.is_available():
raise HTTPException(status_code=500, detail="OpenAI service not available")
temp_file_path = None
try:
# Create temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
temp_file_path = temp_file.name
temp_file.close()
print(f"Generating TTS audio: {text[:50]}...")
# Generate audio using OpenAI TTS
with self.client.audio.speech.with_streaming_response.create(
model=model,
model=Models.tts,
voice=voice,
input=text,
response_format="mp3"
......@@ -106,13 +79,41 @@ class OpenAIService:
return temp_file_path
except Exception as e:
# Clean up temp file on error
if temp_file_path and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
print(f"Error during TTS generation: {e}")
raise HTTPException(status_code=500, detail=f"TTS generation failed: {str(e)}")
# ------------------- Embeddings -------------------
def generate_embedding(self, text: str) -> List[float]:
"""
Generate an embedding vector for input text.
Args:
text: Input string
model: Embedding model (default: text-embedding-3-small)
Returns:
List[float]: Embedding vector
"""
if not self.is_available():
raise HTTPException(status_code=500, detail="OpenAI service not available")
try:
response = self.client.embeddings.create(
model=Models.embedding,
input=text
)
embedding = response.data[0].embedding
if not embedding:
raise ValueError("Empty embedding generated")
return embedding
except Exception as e:
print(f"Error during embedding generation: {e}")
raise HTTPException(status_code=500, detail=f"Embedding generation failed: {str(e)}")
# ------------------- Utils -------------------
def cleanup_temp_file(self, file_path: str) -> None:
"""Clean up temporary file"""
if file_path and os.path.exists(file_path):
......@@ -120,4 +121,4 @@ class OpenAIService:
os.unlink(file_path)
print(f"Cleaned up temporary file: {file_path}")
except Exception as e:
print(f"Warning: Could not clean up temp file {file_path}: {e}")
\ No newline at end of file
print(f"Warning: Could not clean up temp file {file_path}: {e}")
import os
import psycopg2
from psycopg2.extras import RealDictCursor
class PGVectorService:
"""Service for managing embeddings with PostgreSQL pgvector"""
def __init__(self):
self.conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "postgres"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
dbname=os.getenv("POSTGRES_DB"),
)
def insert_embedding(self, id: int, embedding: list):
"""Insert or update an embedding"""
with self.conn.cursor() as cur:
cur.execute(
"""
INSERT INTO embeddings_table (id, embedding)
VALUES (%s, %s)
ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding;
""",
(id, embedding),
)
self.conn.commit()
def search_nearest(self, query_embedding: list, limit: int = 3):
"""Search nearest embeddings using cosine distance (<-> operator)"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT id, embedding, embedding <-> %s AS distance
FROM embeddings_table
ORDER BY embedding <-> %s
LIMIT %s;
""",
(query_embedding, query_embedding, limit),
)
return cur.fetchall()
def close(self):
if self.conn:
self.conn.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment