AI-Digital-Library-Assistant / services /elevenlabs_service.py
Nihal2000's picture
fixed all bugs
ffc9670
import logging
import asyncio
from typing import Optional, Dict, Any, List
import json
try:
from elevenlabs.client import ElevenLabs
from elevenlabs.conversational_ai.conversation import Conversation, ClientTools
from elevenlabs.conversational_ai.default_audio_interface import DefaultAudioInterface
ELEVENLABS_AVAILABLE = True
except ImportError:
ELEVENLABS_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("ElevenLabs SDK not available. Install: pip install elevenlabs")
import config
from services.llamaindex_service import LlamaIndexService
logger = logging.getLogger(__name__)
class ElevenLabsService:
"""
Enhanced service for ElevenLabs Conversational AI with proper RAG integration.
Key improvements:
- Proper client tools registration with event loop handling
- Built-in RAG through ElevenLabs Knowledge Base
- Support for both real-time voice and text-based chat
- Session management and conversation history
"""
def __init__(self, llamaindex_service: LlamaIndexService):
"""
Initialize ElevenLabs service with RAG integration
Args:
llamaindex_service: LlamaIndex service for document queries
"""
self.config = config.config
self.llamaindex_service = llamaindex_service
self.client = None
self.client_tools = None
self.active_conversations: Dict[str, Conversation] = {}
self.conversation_history: Dict[str, List[Dict]] = {}
if not ELEVENLABS_AVAILABLE:
logger.error("ElevenLabs SDK not installed. Run: pip install elevenlabs")
return
if not self.config.ELEVENLABS_API_KEY:
logger.warning("ELEVENLABS_API_KEY not configured.")
return
try:
# Initialize ElevenLabs client
self.client = ElevenLabs(api_key=self.config.ELEVENLABS_API_KEY)
logger.info("ElevenLabs client initialized successfully")
# Initialize client tools - CRITICAL: Must be done in async context
self._init_client_tools()
logger.info("ElevenLabs service initialized")
except Exception as e:
logger.error(f"Error initializing ElevenLabs service: {str(e)}")
def _init_client_tools(self):
"""Initialize client tools for RAG integration"""
try:
# FIX: Try initializing without arguments first (Newer SDKs)
try:
self.client_tools = ClientTools()
except TypeError:
# Fallback for older SDKs that might require a loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.client_tools = ClientTools(loop=loop)
# Register RAG query tool with proper metadata
self.client_tools.register(
"query_documents",
handler=self._rag_query_handler,
description="Search through the user's uploaded documents. Use this tool whenever the user asks questions about their documents, files, or content in their library.",
parameters={
"query": {
"type": "string",
"description": "The search query or question to find information in the documents"
}
},
is_async=True
)
logger.info("Client tools registered: query_documents")
except Exception as e:
logger.error(f"Error initializing client tools: {str(e)}")
# Keep client_tools as None so we know it failed
self.client_tools = None
async def _rag_query_handler(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Enhanced RAG query handler with better error handling and response formatting
This tool is called by the ElevenLabs agent when it needs to search documents.
Args:
params: Dictionary with 'query' key containing user's question
Returns:
Dictionary with 'answer' and optional 'sources'
"""
try:
query = params.get("query", "")
if not query or not query.strip():
return {
"answer": "I didn't receive a question to search for. Could you please ask again?"
}
logger.info(f"RAG query: {query}")
# Query LlamaIndex with timeout
try:
result = await asyncio.wait_for(
self.llamaindex_service.query(query),
timeout=self.config.CONVERSATION_TIMEOUT if hasattr(self.config, 'CONVERSATION_TIMEOUT') else 30
)
logger.info(f"RAG query successful: {len(result)} chars")
# Format response for conversational voice
return {
"answer": result,
"confidence": "high",
"source": "document_library"
}
except asyncio.TimeoutError:
logger.error("RAG query timeout")
return {
"answer": "The search is taking longer than expected. Could you try rephrasing your question?"
}
except Exception as e:
logger.error(f"RAG query error: {str(e)}", exc_info=True)
return {
"answer": f"I encountered an error while searching: {str(e)}. Please try again."
}
def create_conversation(
self,
agent_id: Optional[str] = None,
session_id: Optional[str] = None,
use_audio: bool = True
) -> Optional[Conversation]:
"""
Create a new conversation session
Args:
agent_id: ElevenLabs agent ID (uses config default if not provided)
session_id: Optional session ID for tracking
use_audio: If True, use audio interface; if False, text-only mode
Returns:
Conversation object or None if initialization fails
"""
if not self.client:
logger.error("ElevenLabs client not initialized")
return None
try:
agent_id = agent_id or self.config.ELEVENLABS_AGENT_ID
if not agent_id:
logger.error("No agent ID provided or configured")
return None
# Create audio interface only if requested
audio_interface = DefaultAudioInterface() if use_audio else None
# Create conversation with RAG tool
conversation = Conversation(
client=self.client,
agent_id=agent_id,
requires_auth=True,
audio_interface=audio_interface,
client_tools=self.client_tools,
# Add callbacks for monitoring
callback_agent_response=lambda response: self._on_agent_response(session_id, response),
callback_user_transcript=lambda transcript: self._on_user_message(session_id, transcript)
)
# Store conversation and initialize history
if session_id:
self.active_conversations[session_id] = conversation
self.conversation_history[session_id] = []
logger.info(f"Created conversation for agent: {agent_id}")
return conversation
except Exception as e:
logger.error(f"Error creating conversation: {str(e)}")
return None
def _on_agent_response(self, session_id: Optional[str], response: str):
"""Track agent responses"""
if session_id and session_id in self.conversation_history:
self.conversation_history[session_id].append({
"role": "assistant",
"content": response
})
logger.debug(f"Agent response: {response[:100]}...")
def _on_user_message(self, session_id: Optional[str], message: str):
"""Track user messages"""
if session_id and session_id in self.conversation_history:
self.conversation_history[session_id].append({
"role": "user",
"content": message
})
logger.debug(f"User message: {message[:100]}...")
async def start_conversation(self, session_id: Optional[str] = None) -> Dict[str, Any]:
"""
Start a new conversation session
Args:
session_id: Optional session ID for tracking
Returns:
Dictionary with success status and conversation info
"""
try:
conversation = self.create_conversation(session_id=session_id, use_audio=False)
if conversation:
return {
"success": True,
"session_id": session_id,
"message": "Voice assistant ready. Ask me anything about your documents!"
}
else:
return {
"success": False,
"error": "Failed to create conversation. Check API configuration."
}
except Exception as e:
logger.error(f"Error starting conversation: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def send_text_message(
self,
message: str,
session_id: str
) -> Dict[str, Any]:
"""
Send a text message to the agent and get response
This is for text-based chat (no audio). Perfect for web interfaces.
Args:
message: User's text message
session_id: Session identifier
Returns:
Dictionary with agent's response
"""
try:
if not message or not message.strip():
return {
"success": False,
"error": "Empty message"
}
# For text-based interaction, we directly query the RAG system
# since ElevenLabs Conversational AI is primarily audio-focused
# Store user message
if session_id in self.conversation_history:
self.conversation_history[session_id].append({
"role": "user",
"content": message
})
# Query RAG system
response = await self._rag_query_handler({"query": message})
# Store assistant response
if session_id in self.conversation_history:
self.conversation_history[session_id].append({
"role": "assistant",
"content": response["answer"]
})
return {
"success": True,
"answer": response["answer"],
"session_id": session_id
}
except Exception as e:
logger.error(f"Error sending message: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def end_conversation(self, session_id: str) -> bool:
"""
End an active conversation session
Args:
session_id: Session identifier
Returns:
True if conversation ended successfully
"""
try:
if session_id in self.active_conversations:
conversation = self.active_conversations[session_id]
# Try to end the session gracefully
try:
if hasattr(conversation, 'end_session'):
conversation.end_session()
except Exception as e:
logger.warning(f"Error during session cleanup: {str(e)}")
# Remove from active conversations
del self.active_conversations[session_id]
logger.info(f"Ended conversation: {session_id}")
return True
return False
except Exception as e:
logger.error(f"Error ending conversation: {str(e)}")
return False
def get_conversation_history(self, session_id: str) -> List[Dict]:
"""Get conversation history for a session"""
return self.conversation_history.get(session_id, [])
def get_available_voices(self) -> List[Dict[str, str]]:
"""
Get list of available voice models
Returns:
List of voice model information
"""
try:
if not self.client:
return []
voices = self.client.voices.get_all()
return [
{
"voice_id": voice.voice_id,
"name": voice.name,
"category": getattr(voice, 'category', "general")
}
for voice in voices.voices
]
except Exception as e:
logger.error(f"Error getting voices: {str(e)}")
return []
def is_available(self) -> bool:
"""Check if ElevenLabs service is available and configured"""
return ELEVENLABS_AVAILABLE and self.client is not None
async def test_connection(self) -> Dict[str, Any]:
"""
Test ElevenLabs API connection
Returns:
Dictionary with test results
"""
try:
if not self.client:
return {
"status": "error",
"message": "Client not initialized"
}
# Test API by fetching voices
voices = self.get_available_voices()
# Test RAG tool
test_result = await self._rag_query_handler({"query": "test"})
return {
"status": "success",
"message": "ElevenLabs API connected",
"voices_available": len(voices),
"rag_tool_working": "answer" in test_result,
"client_tools_registered": self.client_tools is not None
}
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
return {
"status": "error",
"message": str(e)
}