|
|
import logging |
|
|
import asyncio |
|
|
from typing import Optional, Dict, Any, List |
|
|
import json |
|
|
|
|
|
try: |
|
|
from elevenlabs.client import ElevenLabs |
|
|
from elevenlabs.conversational_ai.conversation import Conversation, ClientTools |
|
|
from elevenlabs.conversational_ai.default_audio_interface import DefaultAudioInterface |
|
|
ELEVENLABS_AVAILABLE = True |
|
|
except ImportError: |
|
|
ELEVENLABS_AVAILABLE = False |
|
|
logger = logging.getLogger(__name__) |
|
|
logger.warning("ElevenLabs SDK not available. Install: pip install elevenlabs") |
|
|
|
|
|
import config |
|
|
from services.llamaindex_service import LlamaIndexService |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ElevenLabsService: |
|
|
""" |
|
|
Enhanced service for ElevenLabs Conversational AI with proper RAG integration. |
|
|
|
|
|
Key improvements: |
|
|
- Proper client tools registration with event loop handling |
|
|
- Built-in RAG through ElevenLabs Knowledge Base |
|
|
- Support for both real-time voice and text-based chat |
|
|
- Session management and conversation history |
|
|
""" |
|
|
|
|
|
def __init__(self, llamaindex_service: LlamaIndexService): |
|
|
""" |
|
|
Initialize ElevenLabs service with RAG integration |
|
|
|
|
|
Args: |
|
|
llamaindex_service: LlamaIndex service for document queries |
|
|
""" |
|
|
self.config = config.config |
|
|
self.llamaindex_service = llamaindex_service |
|
|
self.client = None |
|
|
self.client_tools = None |
|
|
self.active_conversations: Dict[str, Conversation] = {} |
|
|
self.conversation_history: Dict[str, List[Dict]] = {} |
|
|
|
|
|
if not ELEVENLABS_AVAILABLE: |
|
|
logger.error("ElevenLabs SDK not installed. Run: pip install elevenlabs") |
|
|
return |
|
|
|
|
|
if not self.config.ELEVENLABS_API_KEY: |
|
|
logger.warning("ELEVENLABS_API_KEY not configured.") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
self.client = ElevenLabs(api_key=self.config.ELEVENLABS_API_KEY) |
|
|
logger.info("ElevenLabs client initialized successfully") |
|
|
|
|
|
|
|
|
self._init_client_tools() |
|
|
|
|
|
logger.info("ElevenLabs service initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing ElevenLabs service: {str(e)}") |
|
|
|
|
|
def _init_client_tools(self): |
|
|
"""Initialize client tools for RAG integration""" |
|
|
try: |
|
|
|
|
|
try: |
|
|
self.client_tools = ClientTools() |
|
|
except TypeError: |
|
|
|
|
|
try: |
|
|
loop = asyncio.get_event_loop() |
|
|
except RuntimeError: |
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
self.client_tools = ClientTools(loop=loop) |
|
|
|
|
|
|
|
|
self.client_tools.register( |
|
|
"query_documents", |
|
|
handler=self._rag_query_handler, |
|
|
description="Search through the user's uploaded documents. Use this tool whenever the user asks questions about their documents, files, or content in their library.", |
|
|
parameters={ |
|
|
"query": { |
|
|
"type": "string", |
|
|
"description": "The search query or question to find information in the documents" |
|
|
} |
|
|
}, |
|
|
is_async=True |
|
|
) |
|
|
|
|
|
logger.info("Client tools registered: query_documents") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing client tools: {str(e)}") |
|
|
|
|
|
self.client_tools = None |
|
|
|
|
|
async def _rag_query_handler(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Enhanced RAG query handler with better error handling and response formatting |
|
|
|
|
|
This tool is called by the ElevenLabs agent when it needs to search documents. |
|
|
|
|
|
Args: |
|
|
params: Dictionary with 'query' key containing user's question |
|
|
|
|
|
Returns: |
|
|
Dictionary with 'answer' and optional 'sources' |
|
|
""" |
|
|
try: |
|
|
query = params.get("query", "") |
|
|
|
|
|
if not query or not query.strip(): |
|
|
return { |
|
|
"answer": "I didn't receive a question to search for. Could you please ask again?" |
|
|
} |
|
|
|
|
|
logger.info(f"RAG query: {query}") |
|
|
|
|
|
|
|
|
try: |
|
|
result = await asyncio.wait_for( |
|
|
self.llamaindex_service.query(query), |
|
|
timeout=self.config.CONVERSATION_TIMEOUT if hasattr(self.config, 'CONVERSATION_TIMEOUT') else 30 |
|
|
) |
|
|
|
|
|
logger.info(f"RAG query successful: {len(result)} chars") |
|
|
|
|
|
|
|
|
return { |
|
|
"answer": result, |
|
|
"confidence": "high", |
|
|
"source": "document_library" |
|
|
} |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
logger.error("RAG query timeout") |
|
|
return { |
|
|
"answer": "The search is taking longer than expected. Could you try rephrasing your question?" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"RAG query error: {str(e)}", exc_info=True) |
|
|
return { |
|
|
"answer": f"I encountered an error while searching: {str(e)}. Please try again." |
|
|
} |
|
|
|
|
|
def create_conversation( |
|
|
self, |
|
|
agent_id: Optional[str] = None, |
|
|
session_id: Optional[str] = None, |
|
|
use_audio: bool = True |
|
|
) -> Optional[Conversation]: |
|
|
""" |
|
|
Create a new conversation session |
|
|
|
|
|
Args: |
|
|
agent_id: ElevenLabs agent ID (uses config default if not provided) |
|
|
session_id: Optional session ID for tracking |
|
|
use_audio: If True, use audio interface; if False, text-only mode |
|
|
|
|
|
Returns: |
|
|
Conversation object or None if initialization fails |
|
|
""" |
|
|
if not self.client: |
|
|
logger.error("ElevenLabs client not initialized") |
|
|
return None |
|
|
|
|
|
try: |
|
|
agent_id = agent_id or self.config.ELEVENLABS_AGENT_ID |
|
|
|
|
|
if not agent_id: |
|
|
logger.error("No agent ID provided or configured") |
|
|
return None |
|
|
|
|
|
|
|
|
audio_interface = DefaultAudioInterface() if use_audio else None |
|
|
|
|
|
|
|
|
conversation = Conversation( |
|
|
client=self.client, |
|
|
agent_id=agent_id, |
|
|
requires_auth=True, |
|
|
audio_interface=audio_interface, |
|
|
client_tools=self.client_tools, |
|
|
|
|
|
callback_agent_response=lambda response: self._on_agent_response(session_id, response), |
|
|
callback_user_transcript=lambda transcript: self._on_user_message(session_id, transcript) |
|
|
) |
|
|
|
|
|
|
|
|
if session_id: |
|
|
self.active_conversations[session_id] = conversation |
|
|
self.conversation_history[session_id] = [] |
|
|
|
|
|
logger.info(f"Created conversation for agent: {agent_id}") |
|
|
return conversation |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating conversation: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _on_agent_response(self, session_id: Optional[str], response: str): |
|
|
"""Track agent responses""" |
|
|
if session_id and session_id in self.conversation_history: |
|
|
self.conversation_history[session_id].append({ |
|
|
"role": "assistant", |
|
|
"content": response |
|
|
}) |
|
|
logger.debug(f"Agent response: {response[:100]}...") |
|
|
|
|
|
def _on_user_message(self, session_id: Optional[str], message: str): |
|
|
"""Track user messages""" |
|
|
if session_id and session_id in self.conversation_history: |
|
|
self.conversation_history[session_id].append({ |
|
|
"role": "user", |
|
|
"content": message |
|
|
}) |
|
|
logger.debug(f"User message: {message[:100]}...") |
|
|
|
|
|
async def start_conversation(self, session_id: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Start a new conversation session |
|
|
|
|
|
Args: |
|
|
session_id: Optional session ID for tracking |
|
|
|
|
|
Returns: |
|
|
Dictionary with success status and conversation info |
|
|
""" |
|
|
try: |
|
|
conversation = self.create_conversation(session_id=session_id, use_audio=False) |
|
|
|
|
|
if conversation: |
|
|
return { |
|
|
"success": True, |
|
|
"session_id": session_id, |
|
|
"message": "Voice assistant ready. Ask me anything about your documents!" |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Failed to create conversation. Check API configuration." |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error starting conversation: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
async def send_text_message( |
|
|
self, |
|
|
message: str, |
|
|
session_id: str |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Send a text message to the agent and get response |
|
|
|
|
|
This is for text-based chat (no audio). Perfect for web interfaces. |
|
|
|
|
|
Args: |
|
|
message: User's text message |
|
|
session_id: Session identifier |
|
|
|
|
|
Returns: |
|
|
Dictionary with agent's response |
|
|
""" |
|
|
try: |
|
|
if not message or not message.strip(): |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Empty message" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if session_id in self.conversation_history: |
|
|
self.conversation_history[session_id].append({ |
|
|
"role": "user", |
|
|
"content": message |
|
|
}) |
|
|
|
|
|
|
|
|
response = await self._rag_query_handler({"query": message}) |
|
|
|
|
|
|
|
|
if session_id in self.conversation_history: |
|
|
self.conversation_history[session_id].append({ |
|
|
"role": "assistant", |
|
|
"content": response["answer"] |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"answer": response["answer"], |
|
|
"session_id": session_id |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error sending message: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
async def end_conversation(self, session_id: str) -> bool: |
|
|
""" |
|
|
End an active conversation session |
|
|
|
|
|
Args: |
|
|
session_id: Session identifier |
|
|
|
|
|
Returns: |
|
|
True if conversation ended successfully |
|
|
""" |
|
|
try: |
|
|
if session_id in self.active_conversations: |
|
|
conversation = self.active_conversations[session_id] |
|
|
|
|
|
|
|
|
try: |
|
|
if hasattr(conversation, 'end_session'): |
|
|
conversation.end_session() |
|
|
except Exception as e: |
|
|
logger.warning(f"Error during session cleanup: {str(e)}") |
|
|
|
|
|
|
|
|
del self.active_conversations[session_id] |
|
|
logger.info(f"Ended conversation: {session_id}") |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error ending conversation: {str(e)}") |
|
|
return False |
|
|
|
|
|
def get_conversation_history(self, session_id: str) -> List[Dict]: |
|
|
"""Get conversation history for a session""" |
|
|
return self.conversation_history.get(session_id, []) |
|
|
|
|
|
def get_available_voices(self) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Get list of available voice models |
|
|
|
|
|
Returns: |
|
|
List of voice model information |
|
|
""" |
|
|
try: |
|
|
if not self.client: |
|
|
return [] |
|
|
|
|
|
voices = self.client.voices.get_all() |
|
|
|
|
|
return [ |
|
|
{ |
|
|
"voice_id": voice.voice_id, |
|
|
"name": voice.name, |
|
|
"category": getattr(voice, 'category', "general") |
|
|
} |
|
|
for voice in voices.voices |
|
|
] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting voices: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def is_available(self) -> bool: |
|
|
"""Check if ElevenLabs service is available and configured""" |
|
|
return ELEVENLABS_AVAILABLE and self.client is not None |
|
|
|
|
|
async def test_connection(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Test ElevenLabs API connection |
|
|
|
|
|
Returns: |
|
|
Dictionary with test results |
|
|
""" |
|
|
try: |
|
|
if not self.client: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Client not initialized" |
|
|
} |
|
|
|
|
|
|
|
|
voices = self.get_available_voices() |
|
|
|
|
|
|
|
|
test_result = await self._rag_query_handler({"query": "test"}) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"message": "ElevenLabs API connected", |
|
|
"voices_available": len(voices), |
|
|
"rag_tool_working": "answer" in test_result, |
|
|
"client_tools_registered": self.client_tools is not None |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Connection test failed: {str(e)}") |
|
|
return { |
|
|
"status": "error", |
|
|
"message": str(e) |
|
|
} |