diff --git "a/src/orchestrator_engine.py" "b/src/orchestrator_engine.py" new file mode 100644--- /dev/null +++ "b/src/orchestrator_engine.py" @@ -0,0 +1,2246 @@ +# orchestrator_engine.py +import uuid +import logging +import time +import asyncio +from datetime import datetime +from typing import List, Dict, Optional +from concurrent.futures import ThreadPoolExecutor +import sys +import os + +logger = logging.getLogger(__name__) + +# Add project root and parent directory to path for imports +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) +sys.path.insert(0, parent_dir) +sys.path.insert(0, current_dir) + +try: + from safety_threshold_matrix import should_trigger_user_choice + from safety_user_choice import create_safety_choice_prompt, process_safety_choice + from safety_choice_orchestrator import SafetyChoiceOrchestrator + SAFETY_CHOICE_AVAILABLE = True + logger.info("Safety choice modules loaded successfully") +except ImportError as e: + logger.warning(f"Safety choice modules not available: {e}") + SAFETY_CHOICE_AVAILABLE = False + +class MVPOrchestrator: + def __init__(self, llm_router, context_manager, agents): + self.llm_router = llm_router + self.context_manager = context_manager + self.agents = agents + self.execution_trace = [] + # Cache for topic extraction to reduce API calls + self._topic_cache = {} + self._topic_cache_max_size = 100 # Limit cache size + + # Safety revision thresholds + self.safety_thresholds = { + "toxicity_or_harmful_language": 0.3, + "potential_biases_or_stereotypes": 0.05, # Low threshold for bias + "privacy_or_security_concerns": 0.2, + "controversial_or_sensitive_topics": 0.3 + } + self.max_revision_attempts = 2 + self.revision_timeout = 30 # seconds + + # Safety response tracking to prevent infinite loops + self.awaiting_safety_response = {} # session_id -> True/False + self._pending_choices = {} # session_id -> choice_data + + # User ID tracking for context system + self._current_user_id = {} # session_id -> user_id + + # Context cache to prevent loops + self._context_cache = {} # cache_key -> {context, timestamp} + + # Query similarity tracking for duplicate detection + self.recent_queries = [] # List of {query, response, timestamp} + self.max_recent_queries = 50 # Keep last 50 queries + + # Response metrics tracking + self.agent_call_count = 0 + self.response_metrics_history = [] # Store recent metrics + + # Context relevance classifier (initialized lazily when needed) + self.context_classifier = None + self._classifier_initialized = False + + logger.info("MVPOrchestrator initialized with safety revision thresholds") + + def set_user_id(self, session_id: str, user_id: str): + """Set user_id with loop prevention""" + # Check if user_id actually changed + old_user_id = self._current_user_id.get(session_id) + + if old_user_id != user_id: + self._current_user_id[session_id] = user_id + logger.info(f"Set user_id={user_id} for session {session_id} (was: {old_user_id})") + + # Clear context cache on user change + cache_key = f"context_{session_id}" + if cache_key in self._context_cache: + del self._context_cache[cache_key] + logger.info(f"Cleared context cache for session {session_id} due to user change") + else: + self._current_user_id[session_id] = user_id + + def _get_user_id_for_session(self, session_id: str) -> str: + """Get user_id without triggering context loops""" + # Use in-memory mapping first + if hasattr(self, '_current_user_id') and session_id in self._current_user_id: + return self._current_user_id[session_id] + + # Fallback to default if not found + return "Test_Any" + + async def _get_or_create_context(self, session_id: str, user_input: str, user_id: str) -> dict: + """Get context with loop prevention and caching""" + # Check if we recently fetched context for this session + cache_key = f"context_{session_id}" + current_time = time.time() + + if hasattr(self, '_context_cache'): + cached = self._context_cache.get(cache_key) + if cached and (current_time - cached['timestamp']) < 5: # 5 second cache + logger.info(f"Using cached context for session {session_id}") + return cached['context'] + + # Fetch new context + context = await self.context_manager.manage_context(session_id, user_input, user_id=user_id) + + # Cache the context + if not hasattr(self, '_context_cache'): + self._context_cache = {} + + self._context_cache[cache_key] = { + 'context': context, + 'timestamp': current_time + } + + # Clean old cache entries + if len(self._context_cache) > 100: + # Remove oldest entries + sorted_items = sorted(self._context_cache.items(), key=lambda x: x[1]['timestamp']) + self._context_cache = dict(sorted_items[-50:]) + + return context + + async def process_request(self, session_id: str, user_input: str) -> dict: + """ + Main orchestration flow with loop prevention + """ + logger.info(f"Processing request for session {session_id}") + logger.info(f"User input: {user_input[:100]}") + + # Critical: Prevent safety check loops on binary responses + user_input_upper = user_input.strip().upper() + is_binary_response = user_input_upper in ['YES', 'NO', 'APPLY', 'KEEP', 'Y', 'N'] + + # Check if we're in a safety response context + if is_binary_response and self.awaiting_safety_response.get(session_id, False): + logger.info(f"Binary safety response detected ({user_input_upper}) - bypassing recursive safety check") + + # Immediately clear the flag to prevent any further loops + self.awaiting_safety_response[session_id] = False + + # Remove from pending choices if exists + if hasattr(self, '_pending_choices'): + self._pending_choices.pop(session_id, None) + + # Return with skip flag to prevent further processing + return { + 'is_safety_response': True, + 'response': user_input_upper, + 'requires_user_choice': False, + 'skip_safety_check': True, + 'final_response': f"Choice '{user_input_upper}' has been applied.", + 'bypass_reason': 'binary_safety_response' + } + + # Clear previous trace for new request + self.execution_trace = [] + start_time = time.time() + + # Initialize enhanced reasoning chain + reasoning_chain = { + "chain_of_thought": {}, + "alternative_paths": [], + "uncertainty_areas": [], + "evidence_sources": [], + "confidence_calibration": {} + } + + try: + # Step 3: Check query similarity BEFORE processing (early exit for duplicates) + # Note: This happens early to skip full processing for identical/similar queries + similar_response = self.check_query_similarity(user_input, threshold=0.95) # Higher threshold for exact duplicates + if similar_response: + logger.info(f"Similar/duplicate query detected, using cached response") + # Still track metrics for cached response (minimal processing) + metrics_start = time.time() + self.track_response_metrics(metrics_start, similar_response) + return similar_response + + # Step 1: Generate unique interaction ID + interaction_id = self._generate_interaction_id(session_id) + logger.info(f"Generated interaction ID: {interaction_id}") + + # Step 2: Context management with loop prevention and relevance classification + logger.info("Step 2: Managing context with loop prevention...") + + # Get user_id from stored mapping, avoiding context retrieval loops + user_id = self._get_user_id_for_session(session_id) + + # Use context with deduplication check + base_context = await self._get_or_create_context(session_id, user_input, user_id) + + # Get context mode (safe with fallback) + context_mode = 'fresh' # Default + try: + if hasattr(self.context_manager, 'get_context_mode'): + context_mode = self.context_manager.get_context_mode(session_id) + except Exception as e: + logger.warning(f"Error getting context mode: {e}, using default 'fresh'") + + # ENHANCED: Relevance classification only if mode is 'relevant' + relevance_classification = None + if context_mode == 'relevant': + try: + logger.info("Relevant context mode: Classifying and summarizing relevant sessions...") + + # Initialize classifier if not already done (lazy initialization) + if not self._classifier_initialized: + try: + from src.context_relevance_classifier import ContextRelevanceClassifier + self.context_classifier = ContextRelevanceClassifier(self.llm_router) + self._classifier_initialized = True + logger.info("Context relevance classifier initialized") + except ImportError as e: + logger.warning(f"Context relevance classifier not available: {e}") + self._classifier_initialized = True # Mark as tried to avoid repeated attempts + + # Fetch user sessions if classifier available + if self.context_classifier: + all_session_contexts = [] + try: + if hasattr(self.context_manager, 'get_all_user_sessions'): + all_session_contexts = await self.context_manager.get_all_user_sessions(user_id) + else: + # Fallback: use _get_all_user_sessions from orchestrator + all_session_contexts = await self._get_all_user_sessions(user_id) + except Exception as e: + logger.error(f"Error fetching user sessions: {e}", exc_info=True) + all_session_contexts = [] # Continue with empty list + + if all_session_contexts: + # Perform classification and summarization + relevance_classification = await self.context_classifier.classify_and_summarize_relevant_contexts( + current_input=user_input, + session_contexts=all_session_contexts, + user_id=user_id + ) + + logger.info( + f"Relevance classification complete: " + f"{len(relevance_classification.get('relevant_summaries', []))} sessions summarized, " + f"topic: '{relevance_classification.get('topic', 'unknown')}', " + f"time: {relevance_classification.get('processing_time', 0):.2f}s" + ) + else: + logger.info("No session contexts available for relevance classification") + else: + logger.debug("Context classifier not available, skipping relevance classification") + + except Exception as e: + logger.error(f"Error in relevance classification: {e}", exc_info=True) + # FALLBACK: Continue with normal context (no degradation) + relevance_classification = None + + # Optimize context with relevance classification (handles None gracefully) + try: + context = self.context_manager._optimize_context( + base_context, + relevance_classification=relevance_classification + ) + except Exception as e: + logger.error(f"Error optimizing context: {e}", exc_info=True) + # FALLBACK: Use base context without optimization + context = base_context + + interaction_contexts_count = len(context.get('interaction_contexts', [])) + logger.info(f"Context retrieved: {interaction_contexts_count} interaction contexts, mode: {context_mode}") + + # Add context analysis to reasoning chain (using LLM-based topic extraction) + user_context = context.get('user_context', '') + has_user_context = bool(user_context) + + # Extract topic and keywords using LLM (async) + main_topic = await self._extract_main_topic(user_input, context) + topic_continuity = await self._analyze_topic_continuity(context, user_input) + query_keywords = await self._extract_keywords(user_input) + + reasoning_chain["chain_of_thought"]["step_1"] = { + "hypothesis": f"User is asking about: '{main_topic}'", + "evidence": [ + f"Previous interaction contexts: {interaction_contexts_count}", + f"User context available: {has_user_context}", + f"Session duration: {self._calculate_session_duration(context)}", + f"Topic continuity: {topic_continuity}", + f"Query keywords: {query_keywords}" + ], + "confidence": 0.85, + "reasoning": f"Context analysis shows user is focused on {main_topic} with {interaction_contexts_count} previous interaction contexts and {'existing' if has_user_context else 'new'} user context" + } + + # Step 3: Parallel Intent, Skills, and Safety Assessment + # Check if parallel processing is enabled (can be controlled via config) + use_parallel = getattr(self, '_parallel_processing_enabled', True) + + if use_parallel: + logger.info("Step 3: Processing intent, skills, and safety in parallel...") + parallel_results = await self.process_request_parallel(session_id, user_input, context) + intent_result = parallel_results.get('intent', {}) + skills_result = parallel_results.get('skills', {}) + # Safety will be checked later on the response + else: + # Sequential processing (fallback) + logger.info("Step 3: Recognizing intent...") + self.execution_trace.append({ + "step": "intent_recognition", + "agent": "intent_recognition", + "status": "executing" + }) + intent_result = await self.agents['intent_recognition'].execute( + user_input=user_input, + context=context + ) + self.execution_trace[-1].update({ + "status": "completed", + "result": {"primary_intent": intent_result.get('primary_intent', 'unknown')} + }) + logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}") + + # Step 3.5: Skills Identification + logger.info("Step 3.5: Identifying relevant skills...") + self.execution_trace.append({ + "step": "skills_identification", + "agent": "skills_identification", + "status": "executing" + }) + skills_result = await self.agents['skills_identification'].execute( + user_input=user_input, + context=context + ) + self.execution_trace[-1].update({ + "status": "completed", + "result": {"skills_count": len(skills_result.get('identified_skills', []))} + }) + logger.info(f"Skills identified: {len(skills_result.get('identified_skills', []))} skills") + + # Add skills reasoning to chain + reasoning_chain["chain_of_thought"]["step_2_5"] = { + "hypothesis": f"User input relates to {len(skills_result.get('identified_skills', []))} expert skills", + "evidence": [ + f"Market analysis: {skills_result.get('market_analysis', {}).get('overall_analysis', 'N/A')}", + f"Skill classification: {skills_result.get('skill_classification', {}).get('classification_reasoning', 'N/A')}", + f"High-probability skills: {[s.get('skill', '') for s in skills_result.get('identified_skills', [])[:3]]}", + f"Confidence score: {skills_result.get('confidence_score', 0.5)}" + ], + "confidence": skills_result.get('confidence_score', 0.5), + "reasoning": f"Skills identification completed for topic '{main_topic}' with {len(skills_result.get('identified_skills', []))} relevant skills" + } + + # Add intent reasoning to chain + reasoning_chain["chain_of_thought"]["step_2"] = { + "hypothesis": f"User intent is '{intent_result.get('primary_intent', 'unknown')}' for topic '{main_topic}'", + "evidence": [ + f"Pattern analysis: {self._extract_pattern_evidence(user_input)}", + f"Confidence scores: {intent_result.get('confidence_scores', {})}", + f"Secondary intents: {intent_result.get('secondary_intents', [])}", + f"Query complexity: {self._assess_query_complexity(user_input)}" + ], + "confidence": intent_result.get('confidence_scores', {}).get(intent_result.get('primary_intent', 'unknown'), 0.7), + "reasoning": f"Intent '{intent_result.get('primary_intent', 'unknown')}' detected for {main_topic} based on linguistic patterns and context" + } + + # Step 4: Agent execution planning with reasoning + logger.info("Step 4: Creating execution plan...") + execution_plan = await self._create_execution_plan(intent_result, context) + + # Add execution planning reasoning + reasoning_chain["chain_of_thought"]["step_3"] = { + "hypothesis": f"Optimal approach for '{intent_result.get('primary_intent', 'unknown')}' intent on '{main_topic}'", + "evidence": [ + f"Intent complexity: {self._assess_intent_complexity(intent_result)}", + f"Required agents: {execution_plan.get('agents_to_execute', [])}", + f"Execution strategy: {execution_plan.get('execution_order', 'sequential')}", + f"Response scope: {self._determine_response_scope(user_input)}" + ], + "confidence": 0.80, + "reasoning": f"Agent selection optimized for {intent_result.get('primary_intent', 'unknown')} intent regarding {main_topic}" + } + + # Step 5: Parallel agent execution + logger.info("Step 5: Executing agents...") + agent_results = await self._execute_agents(execution_plan, user_input, context) + logger.info(f"Agent execution complete: {len(agent_results)} results") + + # Step 6: Response synthesis with reasoning + logger.info("Step 6: Synthesizing response...") + self.execution_trace.append({ + "step": "response_synthesis", + "agent": "response_synthesis", + "status": "executing" + }) + final_response = await self.agents['response_synthesis'].execute( + agent_outputs=agent_results, + user_input=user_input, + context=context, + skills_result=skills_result + ) + self.execution_trace[-1].update({ + "status": "completed", + "result": {"synthesis_method": final_response.get('synthesis_method', 'unknown')} + }) + + # Add synthesis reasoning + reasoning_chain["chain_of_thought"]["step_4"] = { + "hypothesis": f"Response synthesis for '{main_topic}' using '{final_response.get('synthesis_method', 'unknown')}' method", + "evidence": [ + f"Synthesis quality: {final_response.get('coherence_score', 0.7)}", + f"Source integration: {len(final_response.get('source_references', []))} sources", + f"Response length: {len(str(final_response.get('final_response', '')))} characters", + f"Content relevance: {self._assess_content_relevance(user_input, final_response)}" + ], + "confidence": final_response.get('coherence_score', 0.7), + "reasoning": f"Multi-source synthesis for {main_topic} using {final_response.get('synthesis_method', 'unknown')} approach" + } + + # Step 7: Safety and bias check with reasoning + logger.info("Step 7: Safety check...") + self.execution_trace.append({ + "step": "safety_check", + "agent": "safety_check", + "status": "executing" + }) + safety_checked = await self.agents['safety_check'].execute( + response=final_response, + context=context + ) + self.execution_trace[-1].update({ + "status": "completed", + "result": {"warnings": safety_checked.get('warnings', [])} + }) + + # Step 7.5: Enhanced Safety check with warnings (USER CHOICE PAUSED) + # Instead of prompting user choice, append warnings to response when thresholds exceeded + intent_class = intent_result.get('primary_intent', 'casual_conversation') + response_content = final_response.get('final_response', '') or str(final_response.get('response', '')) + + # Check for safety threshold breaches and append warnings if detected + if SAFETY_CHOICE_AVAILABLE: + safety_analysis = safety_checked.get('safety_analysis', {}) + + # Check if thresholds are exceeded + if should_trigger_user_choice(safety_analysis, intent_class): + logger.info(f"Safety concerns detected for intent '{intent_class}' - appending warnings to response") + + # Format safety concerns for display + from safety_threshold_matrix import format_safety_concerns + concerns_text = format_safety_concerns(safety_analysis, intent_class) + + if concerns_text: + # Append warnings to response instead of prompting user choice + warning_section = f""" + +--- + +## ⚠️ Safety Advisory + +This response has been flagged for potential safety concerns: + +{concerns_text} + +**Please review this content carefully and consider:** +- The potential impact on yourself and others +- Whether this content aligns with your intended use +- If additional verification or expert consultation is needed + +*This advisory is provided for transparency and user awareness. The response has not been modified.* +""" + # Update response content with warnings appended + response_content = response_content + warning_section + + # Update final_response dict to include warnings + final_response['final_response'] = response_content + if 'response' in final_response: + final_response['response'] = response_content + + # Also update safety_checked to include the warnings in the response + # This ensures _format_final_output will extract the response with warnings + safety_checked['safety_checked_response'] = response_content + safety_checked['original_response'] = response_content # Keep original as response with warnings + + logger.info("Safety warnings appended to response - no user choice prompted (feature paused)") + + # Add safety reasoning + reasoning_chain["chain_of_thought"]["step_5"] = { + "hypothesis": f"Safety validation for response about '{main_topic}'", + "evidence": [ + f"Safety score: {safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)}", + f"Warnings generated: {len(safety_checked.get('warnings', []))}", + f"Analysis method: {safety_checked.get('safety_analysis', {}).get('analysis_method', 'unknown')}", + f"Content appropriateness: {self._assess_content_appropriateness(user_input, safety_checked)}" + ], + "confidence": safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8), + "reasoning": f"Safety analysis for {main_topic} content with non-blocking warning system" + } + + # Update final_response to use the response_content (which may have warnings appended) + # This ensures the formatted output includes warnings + if 'final_response' in final_response: + final_response['final_response'] = response_content + if 'response' in final_response: + final_response['response'] = response_content + + # Generate alternative paths and uncertainty analysis + reasoning_chain["alternative_paths"] = self._generate_alternative_paths(intent_result, user_input, main_topic) + reasoning_chain["uncertainty_areas"] = self._identify_uncertainty_areas(intent_result, final_response, safety_checked) + reasoning_chain["evidence_sources"] = self._extract_evidence_sources(intent_result, final_response, context) + reasoning_chain["confidence_calibration"] = self._calibrate_confidence_scores(reasoning_chain) + + processing_time = time.time() - start_time + + # Merge safety_checked warnings into final_response for proper formatting + # final_response already contains the response with warnings appended (if thresholds exceeded) + merged_response = { + 'final_response': response_content, + 'response': response_content, + 'safety_checked_response': response_content, + 'original_response': response_content, + 'warnings': safety_checked.get('warnings', []) + } + + # Pass merged response to ensure warnings metadata is included + result = self._format_final_output(merged_response, interaction_id, { + 'intent': intent_result.get('primary_intent', 'unknown'), + 'execution_plan': execution_plan, + 'processing_steps': [ + 'Context management', + 'Intent recognition', + 'Skills identification', + 'Execution planning', + 'Agent execution', + 'Response synthesis', + 'Safety check' + ], + 'processing_time': processing_time, + 'agents_used': list(self.agents.keys()), + 'intent_result': intent_result, + 'skills_result': skills_result, + 'synthesis_result': final_response, + 'reasoning_chain': reasoning_chain + }) + + # Update context with the final response for future context retrieval + response_text = str(result.get('response', '')) + user_id = getattr(self, '_current_user_id', {}).get(session_id, "Test_Any") + if response_text: + self.context_manager._update_context(context, user_input, response_text, user_id=user_id) + + # STEP 2: Generate Interaction Context after each response (50 tokens) + interaction_id = result.get('interaction_id', f"{session_id}_{int(time.time())}") + try: + await self.context_manager.generate_interaction_context( + interaction_id=interaction_id, + session_id=session_id, + user_input=user_input, + system_response=response_text, + user_id=user_id + ) + # Cache is automatically updated by generate_interaction_context() + + # STEP 3: Generate Session Context after each response (100 tokens) + # Uses cached interaction contexts, updates database and cache + try: + await self.context_manager.generate_session_context(session_id, user_id) + # Cache is automatically updated by generate_session_context() + except Exception as e: + logger.error(f"Error generating session context: {e}", exc_info=True) + + # Clear orchestrator-level cache to force refresh on next request + if hasattr(self, '_context_cache'): + orchestrator_cache_key = f"context_{session_id}" + if orchestrator_cache_key in self._context_cache: + del self._context_cache[orchestrator_cache_key] + logger.debug(f"Orchestrator cache cleared for session {session_id} to refresh with updated contexts") + except Exception as e: + logger.error(f"Error generating interaction context: {e}", exc_info=True) + + # Track response metrics + self.track_response_metrics(start_time, result) + + # Store query and response for similarity checking + self.recent_queries.append({ + 'query': user_input, + 'response': result, + 'timestamp': time.time() + }) + # Keep only recent queries + if len(self.recent_queries) > self.max_recent_queries: + self.recent_queries = self.recent_queries[-self.max_recent_queries:] + + logger.info(f"Request processing complete. Response length: {len(response_text)}") + return result + + except Exception as e: + logger.error(f"Error in process_request: {e}", exc_info=True) + processing_time = time.time() - start_time + return { + "response": f"Error processing request: {str(e)}", + "error": str(e), + "interaction_id": str(uuid.uuid4())[:8], + "agent_trace": [], + "timestamp": datetime.now().isoformat(), + "metadata": { + "agents_used": [], + "processing_time": processing_time, + "token_count": 0, + "warnings": [] + } + } + + def _generate_interaction_id(self, session_id: str) -> str: + """ + Generate unique interaction identifier + """ + timestamp = datetime.now().isoformat() + unique_id = str(uuid.uuid4())[:8] + return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}" + + async def _get_all_user_sessions(self, user_id: str) -> List[Dict]: + """ + Fetch all session contexts for relevance classification + Fallback method if context_manager doesn't have it + + Args: + user_id: User identifier + + Returns: + List of session context dictionaries + """ + try: + # Use context_manager's method if available + if hasattr(self.context_manager, 'get_all_user_sessions'): + return await self.context_manager.get_all_user_sessions(user_id) + + # Fallback: Direct database query + import sqlite3 + db_path = getattr(self.context_manager, 'db_path', 'sessions.db') + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + cursor.execute(""" + SELECT DISTINCT + sc.session_id, + sc.session_summary, + sc.created_at, + (SELECT GROUP_CONCAT(ic.interaction_summary, ' ||| ') + FROM interaction_contexts ic + WHERE ic.session_id = sc.session_id + ORDER BY ic.created_at DESC + LIMIT 10) as recent_interactions + FROM session_contexts sc + JOIN sessions s ON sc.session_id = s.session_id + WHERE s.user_id = ? + ORDER BY sc.created_at DESC + LIMIT 50 + """, (user_id,)) + + sessions = [] + for row in cursor.fetchall(): + session_id, session_summary, created_at, interactions_str = row + + interaction_list = [] + if interactions_str: + for summary in interactions_str.split(' ||| '): + if summary.strip(): + interaction_list.append({ + 'summary': summary.strip(), + 'timestamp': created_at + }) + + sessions.append({ + 'session_id': session_id, + 'summary': session_summary or '', + 'created_at': created_at, + 'interaction_contexts': interaction_list + }) + + conn.close() + return sessions + + except Exception as e: + logger.error(f"Error fetching user sessions: {e}", exc_info=True) + return [] # Safe fallback - no degradation + + async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict: + """ + Create execution plan based on intent recognition + Maps intent types to specific execution tasks + """ + primary_intent = intent_result.get('primary_intent', 'casual_conversation') + secondary_intents = intent_result.get('secondary_intents', []) + confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.7) + + # Map intent types to execution tasks + intent_task_mapping = { + "information_request": { + "tasks": ["information_gathering", "content_research"], + "execution_order": "sequential", + "priority": "high" + }, + "task_execution": { + "tasks": ["task_planning", "execution_strategy"], + "execution_order": "sequential", + "priority": "high" + }, + "creative_generation": { + "tasks": ["creative_brainstorming", "content_ideation"], + "execution_order": "parallel", + "priority": "normal" + }, + "analysis_research": { + "tasks": ["research_analysis", "data_collection", "pattern_identification"], + "execution_order": "sequential", + "priority": "high" + }, + "troubleshooting": { + "tasks": ["problem_analysis", "solution_research"], + "execution_order": "sequential", + "priority": "high" + }, + "education_learning": { + "tasks": ["curriculum_planning", "educational_content"], + "execution_order": "sequential", + "priority": "normal" + }, + "technical_support": { + "tasks": ["technical_research", "guidance_generation"], + "execution_order": "sequential", + "priority": "high" + }, + "casual_conversation": { + "tasks": ["context_enrichment"], + "execution_order": "parallel", + "priority": "low" + } + } + + # Get task plan for primary intent + plan = intent_task_mapping.get(primary_intent, { + "tasks": ["general_research"], + "execution_order": "parallel", + "priority": "normal" + }) + + # Add secondary intent tasks if confidence is high + if confidence > 0.7 and secondary_intents: + for secondary_intent in secondary_intents[:2]: # Limit to 2 secondary intents + secondary_plan = intent_task_mapping.get(secondary_intent) + if secondary_plan: + # Merge tasks, avoiding duplicates + existing_tasks = set(plan["tasks"]) + for task in secondary_plan["tasks"]: + if task not in existing_tasks: + plan["tasks"].append(task) + existing_tasks.add(task) + + logger.info(f"Execution plan created for intent '{primary_intent}': {len(plan['tasks'])} tasks, order={plan['execution_order']}") + + return { + "agents_to_execute": plan["tasks"], + "execution_order": plan["execution_order"], + "priority": plan["priority"], + "primary_intent": primary_intent, + "secondary_intents": secondary_intents + } + + async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict: + """ + Execute agents in parallel or sequential order based on plan + Actually executes task-specific LLM calls based on intent + """ + tasks = execution_plan.get("agents_to_execute", []) + execution_order = execution_plan.get("execution_order", "parallel") + primary_intent = execution_plan.get("primary_intent", "casual_conversation") + + if not tasks: + logger.warning("No tasks to execute in execution plan") + return {} + + logger.info(f"Executing {len(tasks)} tasks in {execution_order} order for intent '{primary_intent}'") + + results = {} + + # Build context summary for task execution + context_summary = self._build_context_summary(context) + + # Task prompt templates + task_prompts = self._build_task_prompts(user_input, context_summary, primary_intent) + + if execution_order == "parallel": + # Execute all tasks in parallel + task_coroutines = [] + for task in tasks: + if task in task_prompts: + coro = self._execute_single_task(task, task_prompts[task]) + task_coroutines.append((task, coro)) + else: + logger.warning(f"No prompt template for task: {task}") + + # Execute all tasks concurrently + if task_coroutines: + task_results = await asyncio.gather( + *[coro for _, coro in task_coroutines], + return_exceptions=True + ) + + # Map results back to task names + for (task, _), result in zip(task_coroutines, task_results): + if isinstance(result, Exception): + logger.error(f"Task {task} failed: {result}") + results[task] = {"error": str(result), "status": "failed"} + else: + results[task] = result + logger.info(f"Task {task} completed: {len(str(result))} chars") + else: + # Execute tasks sequentially + previous_results = {} + for task in tasks: + if task in task_prompts: + # Pass previous results to sequential tasks for context + enhanced_prompt = task_prompts[task] + if previous_results: + enhanced_prompt += f"\n\nPrevious task results: {str(previous_results)}" + + try: + result = await self._execute_single_task(task, enhanced_prompt) + results[task] = result + previous_results[task] = result + logger.info(f"Task {task} completed: {len(str(result))} chars") + except Exception as e: + logger.error(f"Task {task} failed: {e}") + results[task] = {"error": str(e), "status": "failed"} + previous_results[task] = results[task] + else: + logger.warning(f"No prompt template for task: {task}") + + logger.info(f"Agent execution complete: {len(results)} results collected") + return results + + def _build_context_summary(self, context: dict) -> str: + """Build a concise summary of context for task execution (all from cache)""" + summary_parts = [] + + # Extract session context (from cache) + session_context = context.get('session_context', {}) + session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" + if session_summary: + summary_parts.append(f"Session summary: {session_summary[:1500]}") + + # Extract interaction contexts (from cache) + interaction_contexts = context.get('interaction_contexts', []) + if interaction_contexts: + recent_summaries = [ic.get('summary', '') for ic in interaction_contexts[-3:]] + if recent_summaries: + summary_parts.append(f"Recent conversation topics: {', '.join(recent_summaries)}") + + # Extract user context (from cache) + user_context = context.get('user_context', '') + if user_context: + summary_parts.append(f"User background: {user_context[:200]}") + + return " | ".join(summary_parts) if summary_parts else "No prior context" + + async def process_agents_parallel(self, request: Dict) -> List: + """ + Step 1: Optimize Agent Chain - Process multiple agents in parallel + + Args: + request: Dictionary containing request data with 'user_input' and 'context' + + Returns: + List of agent results in order [intent_result, skills_result] + """ + user_input = request.get('user_input', '') + context = request.get('context', {}) + + # Increment agent call count for metrics + self.agent_call_count += 2 # Two agents called + + tasks = [ + self.agents['intent_recognition'].execute( + user_input=user_input, + context=context + ), + self.agents['skills_identification'].execute( + user_input=user_input, + context=context + ), + ] + + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + # Handle exceptions + processed_results = [] + for idx, result in enumerate(results): + if isinstance(result, Exception): + logger.error(f"Agent task {idx} failed: {result}") + processed_results.append({}) + else: + processed_results.append(result) + return processed_results + except Exception as e: + logger.error(f"Error in parallel agent processing: {e}", exc_info=True) + return [{}, {}] + + async def process_request_parallel(self, session_id: str, user_input: str, context: Dict) -> Dict: + """Process intent, skills, and safety in parallel""" + + # Run agents in parallel using asyncio.gather + try: + intent_task = self.agents['intent_recognition'].execute( + user_input=user_input, + context=context + ) + + skills_task = self.agents['skills_identification'].execute( + user_input=user_input, + context=context + ) + + # Safety check on user input (pre-check) + safety_task = self.agents['safety_check'].execute( + response=user_input, + context=context + ) + + # Increment agent call count for metrics + self.agent_call_count += 3 + + # Wait for all to complete + results = await asyncio.gather( + intent_task, + skills_task, + safety_task, + return_exceptions=True + ) + + # Handle results + intent_result = results[0] if not isinstance(results[0], Exception) else {} + skills_result = results[1] if not isinstance(results[1], Exception) else {} + safety_result = results[2] if not isinstance(results[2], Exception) else {} + + # Log any exceptions + if isinstance(results[0], Exception): + logger.error(f"Intent recognition error: {results[0]}") + if isinstance(results[1], Exception): + logger.error(f"Skills identification error: {results[1]}") + if isinstance(results[2], Exception): + logger.error(f"Safety check error: {results[2]}") + + return { + 'intent': intent_result, + 'skills': skills_result, + 'safety_precheck': safety_result + } + + except Exception as e: + logger.error(f"Error in parallel processing: {e}", exc_info=True) + # Fallback to sequential processing + return { + 'intent': await self.agents['intent_recognition'].execute(user_input=user_input, context=context), + 'skills': await self.agents['skills_identification'].execute(user_input=user_input, context=context), + 'safety_precheck': {} + } + + def _build_enhanced_context(self, session_id: str, prior_interactions: List[Dict]) -> Dict: + """Build enhanced context with memory accumulation""" + + # Intelligent context summarization + context = { + 'session_memory': [], + 'user_preferences': {}, + 'interaction_patterns': {}, + 'skills_used': set() + } + + # Process prior interactions with decay + for idx, interaction in enumerate(prior_interactions): + weight = 1.0 / (idx + 1) # Recent interactions weighted more + + # Extract key information + if 'skills' in interaction: + for skill in interaction['skills']: + if isinstance(skill, dict): + context['skills_used'].add(skill.get('name', skill.get('skill', ''))) + elif isinstance(skill, str): + context['skills_used'].add(skill) + + # Accumulate patterns + if 'intent' in interaction: + intent = interaction['intent'] + if intent not in context['interaction_patterns']: + context['interaction_patterns'][intent] = 0 + context['interaction_patterns'][intent] += weight + + # Build memory summary + if idx < 3: # Keep last 3 interactions in detail + context['session_memory'].append({ + 'summary': interaction.get('summary', ''), + 'timestamp': interaction.get('timestamp'), + 'relevance': weight + }) + + # Convert skills_used set to list for JSON serialization + context['skills_used'] = list(context['skills_used']) + + return context + + def _build_task_prompts(self, user_input: str, context_summary: str, primary_intent: str) -> dict: + """Build task-specific prompts for execution""" + + base_context = f"User Query: {user_input}\nContext: {context_summary}" + + prompts = { + "information_gathering": f""" + {base_context} + + Task: Gather comprehensive, accurate information relevant to the user's query. + Focus on facts, definitions, explanations, and verified information. + Structure the information clearly and cite key points. + """, + + "content_research": f""" + {base_context} + + Task: Research and compile detailed content about the topic. + Include multiple perspectives, current information, and relevant examples. + Organize findings logically with clear sections. + """, + + "task_planning": f""" + {base_context} + + Task: Create a detailed execution plan for the requested task. + Break down into clear steps, identify requirements, and outline expected outcomes. + Consider potential challenges and solutions. + """, + + "execution_strategy": f""" + {base_context} + + Task: Develop a strategic approach for task execution. + Define methodology, best practices, and implementation considerations. + Provide actionable guidance with clear priorities. + """, + + "creative_brainstorming": f""" + {base_context} + + Task: Generate creative ideas and approaches for content creation. + Explore different angles, styles, and formats. + Provide diverse creative options with implementation suggestions. + """, + + "content_ideation": f""" + {base_context} + + Task: Develop content concepts and detailed ideation. + Create outlines, themes, and structural frameworks. + Suggest variations and refinement paths. + """, + + "research_analysis": f""" + {base_context} + + Task: Conduct thorough research analysis on the topic. + Identify key findings, trends, patterns, and insights. + Analyze different perspectives and methodologies. + """, + + "data_collection": f""" + {base_context} + + Task: Collect and organize relevant data points and evidence. + Gather statistics, examples, case studies, and supporting information. + Structure data for easy analysis and reference. + """, + + "pattern_identification": f""" + {base_context} + + Task: Identify patterns, correlations, and significant relationships. + Analyze trends, cause-effect relationships, and underlying structures. + Provide insights based on pattern recognition. + """, + + "problem_analysis": f""" + {base_context} + + Task: Analyze the problem in detail. + Identify root causes, contributing factors, and constraints. + Break down the problem into components for systematic resolution. + """, + + "solution_research": f""" + {base_context} + + Task: Research and evaluate potential solutions. + Compare approaches, assess pros/cons, and recommend best practices. + Consider implementation feasibility and effectiveness. + """, + + "curriculum_planning": f""" + {base_context} + + Task: Design educational curriculum and learning path. + Structure content progressively, define learning objectives, and suggest resources. + Create a comprehensive learning framework. + """, + + "educational_content": f""" + {base_context} + + Task: Generate educational content with clear explanations. + Use teaching methods, examples, analogies, and progressive complexity. + Make content accessible and engaging for learning. + """, + + "technical_research": f""" + {base_context} + + Task: Research technical aspects and solutions. + Gather technical documentation, best practices, and implementation details. + Structure technical information clearly with practical guidance. + """, + + "guidance_generation": f""" + {base_context} + + Task: Generate step-by-step guidance and instructions. + Create clear, actionable steps with explanations and troubleshooting tips. + Ensure guidance is comprehensive and easy to follow. + """, + + "context_enrichment": f""" + {base_context} + + Task: Enrich the conversation with relevant context and insights. + Add helpful background information, connections to previous topics, and engaging details. + Enhance understanding and engagement. + """, + + "general_research": f""" + {base_context} + + Task: Conduct general research and information gathering. + Compile relevant information, insights, and useful details about the topic. + Organize findings for clear presentation. + """ + } + + return prompts + + async def _execute_single_task(self, task_name: str, prompt: str) -> dict: + """Execute a single task using the LLM router""" + try: + logger.debug(f"Executing task: {task_name}") + logger.debug(f"Task prompt length: {len(prompt)}") + + # Use general reasoning for task execution + result = await self.llm_router.route_inference( + task_type="general_reasoning", + prompt=prompt, + max_tokens=2000, + temperature=0.7 + ) + + if result: + return { + "task": task_name, + "status": "completed", + "content": result, + "content_length": len(str(result)) + } + else: + logger.warning(f"Task {task_name} returned empty result") + return { + "task": task_name, + "status": "empty", + "content": "", + "content_length": 0 + } + + except Exception as e: + logger.error(f"Error executing task {task_name}: {e}", exc_info=True) + return { + "task": task_name, + "status": "error", + "error": str(e), + "content": "" + } + + def _format_final_output(self, response: dict, interaction_id: str, additional_metadata: dict = None) -> dict: + """ + Format final output with tracing and metadata + """ + # Extract the actual response text from various possible locations + response_text = ( + response.get("final_response") or + response.get("safety_checked_response") or + response.get("original_response") or + response.get("response") or + str(response.get("result", "")) + ) + + if not response_text: + response_text = "I apologize, but I'm having trouble generating a response right now. Please try again." + + # Extract warnings from safety check result + warnings = [] + if "warnings" in response: + warnings = response["warnings"] if isinstance(response["warnings"], list) else [] + + # Build metadata dict + metadata = { + "agents_used": response.get("agents_used", []), + "processing_time": response.get("processing_time", 0), + "token_count": response.get("token_count", 0), + "warnings": warnings + } + + # Merge in any additional metadata + if additional_metadata: + metadata.update(additional_metadata) + + return { + "interaction_id": interaction_id, + "response": response_text, + "final_response": response_text, # Also provide as final_response for compatibility + "confidence_score": response.get("confidence_score", 0.7), + "agent_trace": self.execution_trace if self.execution_trace else [ + {"step": "complete", "agent": "orchestrator", "status": "completed"} + ], + "timestamp": datetime.now().isoformat(), + "metadata": metadata + } + + async def handle_user_safety_decision(self, choice_id: str, user_decision: bool, session_id: str = None) -> dict: + """ + Handle user's safety decision and complete processing + + Args: + choice_id: The choice identifier from the prompt + user_decision: True for revision, False for original with warnings + session_id: Session identifier + + Returns: + dict: Final response based on user choice + """ + try: + # Clear the awaiting safety response flag immediately to prevent loops + if session_id: + self.awaiting_safety_response[session_id] = False + + if not SAFETY_CHOICE_AVAILABLE: + logger.warning("Safety choice modules not available") + return {'error': 'Safety choice system not available'} + + choice_result = process_safety_choice(choice_id, user_decision) + + if 'error' in choice_result: + logger.error(f"Error processing safety choice: {choice_result['error']}") + return choice_result + + if choice_result['action'] == 'proceed_with_revision': + logger.info("User chose revision - applying safety revisions") + + safety_issues = choice_result['safety_analysis'].get('detected_issues', []) + safety_scores = choice_result['safety_analysis'].get('safety_scores', {}) + + if not safety_scores: + confidence_scores = choice_result['safety_analysis'].get('confidence_scores', {}) + if confidence_scores: + exceeded_categories = [] + if confidence_scores.get('toxicity', 0) > 0.3: + exceeded_categories.append('toxicity_or_harmful_language') + if confidence_scores.get('bias', 0) > 0.05: + exceeded_categories.append('potential_biases_or_stereotypes') + if confidence_scores.get('privacy', 0) > 0.2: + exceeded_categories.append('privacy_or_security_concerns') + else: + exceeded_categories = [k for k, v in safety_scores.items() if isinstance(v, (int, float)) and v > 0.3] + + revision_prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. + +Original Response: {choice_result['original_response']} + +Safety Issues Detected: {', '.join(exceeded_categories) if exceeded_categories else 'General safety concerns'} +Specific Warnings: {'; '.join(safety_issues) if safety_issues else 'General safety concerns detected'} + +Please revise the response to address these concerns while maintaining helpfulness and accuracy. +""" + + revised_result = await self.agents['response_synthesis'].execute( + agent_outputs={}, + user_input=revision_prompt, + context={} + ) + + revised_response = revised_result.get('final_response', choice_result['original_response']) + + return { + 'response': revised_response, + 'final_response': revised_response, + 'safety_analysis': choice_result['safety_analysis'], + 'user_choice': 'revision', + 'revision_applied': True, + 'interaction_id': str(uuid.uuid4())[:8], + 'timestamp': datetime.now().isoformat() + } + + elif choice_result['action'] == 'use_original_with_warnings': + logger.info("User chose original response with safety warnings") + + return { + 'response': choice_result['response_content'], + 'final_response': choice_result['response_content'], + 'safety_analysis': choice_result['safety_analysis'], + 'user_choice': 'original_with_warnings', + 'revision_applied': False, + 'interaction_id': str(uuid.uuid4())[:8], + 'timestamp': datetime.now().isoformat() + } + + else: + logger.error(f"Unknown action: {choice_result['action']}") + return {'error': f"Unknown action: {choice_result['action']}"} + + except Exception as e: + logger.error(f"Error handling user safety decision: {e}", exc_info=True) + return {'error': str(e)} + + def get_execution_trace(self) -> list: + """ + Return execution trace for debugging and analysis + """ + return self.execution_trace + + def clear_execution_trace(self): + """ + Clear the execution trace + """ + self.execution_trace = [] + + def _calculate_session_duration(self, context: dict) -> str: + """Calculate session duration for reasoning context""" + interaction_contexts = context.get('interaction_contexts', []) + if not interaction_contexts: + return "New session" + + # Simple duration calculation based on interaction contexts + interaction_count = len(interaction_contexts) + if interaction_count < 5: + return "Short session (< 5 interactions)" + elif interaction_count < 20: + return "Medium session (5-20 interactions)" + else: + return "Long session (> 20 interactions)" + + async def _analyze_topic_continuity(self, context: dict, user_input: str) -> str: + """Analyze topic continuity using LLM zero-shot classification (uses session context and interaction contexts from cache)""" + try: + # Check session context first (from cache) + session_context = context.get('session_context', {}) + session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" + + interaction_contexts = context.get('interaction_contexts', []) + if not interaction_contexts and not session_summary: + return "No previous context" + + # Build context summary from cache + recent_interactions_summary = "\n".join([ + f"- {ic.get('summary', '')}" + for ic in interaction_contexts[:3] + if ic.get('summary') + ]) + + # Use LLM for context-aware topic continuity analysis + if self.llm_router: + prompt = f"""Determine if the current query continues the previous conversation topic or introduces a new topic. + +Session Summary: {session_summary[:300] if session_summary else 'No session summary available'} + +Recent Interactions: +{recent_interactions_summary if recent_interactions_summary else 'No recent interactions'} + +Current Query: "{user_input}" + +Analyze whether the current query: +1. Continues the same topic from previous interactions +2. Introduces a new topic + +Respond with EXACTLY one of these formats: +- "Continuing [topic name] discussion" if same topic +- "New topic: [topic name]" if different topic + +Keep topic name to 2-5 words. Example responses: +- "Continuing machine learning discussion" +- "New topic: financial analysis" +- "Continuing software development discussion" +""" + + continuity_result = await self.llm_router.route_inference( + task_type="general_reasoning", + prompt=prompt, + max_tokens=50, + temperature=0.3 # Lower temperature for consistency + ) + + if continuity_result and isinstance(continuity_result, str) and continuity_result.strip(): + result = continuity_result.strip() + # Validate format + if "Continuing" in result or "New topic:" in result: + logger.debug(f"Topic continuity analysis: {result}") + return result + + # Fallback to simple check if LLM unavailable + if not session_summary and not recent_interactions_summary: + return "No previous context" + return "Topic continuity analysis unavailable" + + except Exception as e: + logger.error(f"Error in LLM-based topic continuity analysis: {e}", exc_info=True) + # Fallback + return "Topic continuity analysis failed" + + def _extract_pattern_evidence(self, user_input: str) -> str: + """Extract pattern evidence for intent reasoning""" + input_lower = user_input.lower() + + # Question patterns + if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'which']): + return "Question pattern detected" + + # Request patterns + if any(word in input_lower for word in ['please', 'can you', 'could you', 'help me']): + return "Request pattern detected" + + # Explanation patterns + if any(word in input_lower for word in ['explain', 'describe', 'tell me about']): + return "Explanation pattern detected" + + # Analysis patterns + if any(word in input_lower for word in ['analyze', 'compare', 'evaluate', 'assess']): + return "Analysis pattern detected" + + return "General conversational pattern" + + def _assess_intent_complexity(self, intent_result: dict) -> str: + """Assess intent complexity for reasoning""" + primary_intent = intent_result.get('primary_intent', 'unknown') + confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) + secondary_intents = intent_result.get('secondary_intents', []) + + if confidence > 0.8 and len(secondary_intents) == 0: + return "Simple, clear intent" + elif confidence > 0.7 and len(secondary_intents) <= 1: + return "Moderate complexity" + else: + return "Complex, multi-faceted intent" + + def _generate_alternative_paths(self, intent_result: dict, user_input: str, main_topic: str) -> list: + """Generate alternative reasoning paths based on actual content""" + primary_intent = intent_result.get('primary_intent', 'unknown') + secondary_intents = intent_result.get('secondary_intents', []) + + alternative_paths = [] + + # Add secondary intents as alternative paths + for secondary_intent in secondary_intents: + alternative_paths.append({ + "path": f"Alternative intent: {secondary_intent} for {main_topic}", + "reasoning": f"Could interpret as {secondary_intent} based on linguistic patterns in the query about {main_topic}", + "confidence": intent_result.get('confidence_scores', {}).get(secondary_intent, 0.3), + "rejected_reason": f"Primary intent '{primary_intent}' has higher confidence for {main_topic} topic" + }) + + # Add method-based alternatives based on content + if 'curriculum' in user_input.lower() or 'course' in user_input.lower(): + alternative_paths.append({ + "path": "Structured educational framework approach", + "reasoning": f"Could provide a more structured educational framework for {main_topic}", + "confidence": 0.6, + "rejected_reason": f"Current approach better matches user's specific request for {main_topic}" + }) + + if 'detailed' in user_input.lower() or 'comprehensive' in user_input.lower(): + alternative_paths.append({ + "path": "High-level overview approach", + "reasoning": f"Could provide a high-level overview instead of detailed content for {main_topic}", + "confidence": 0.4, + "rejected_reason": f"User specifically requested detailed information about {main_topic}" + }) + + return alternative_paths + + def _identify_uncertainty_areas(self, intent_result: dict, final_response: dict, safety_checked: dict) -> list: + """Identify areas of uncertainty in the reasoning based on actual content""" + uncertainty_areas = [] + + # Intent uncertainty + primary_intent = intent_result.get('primary_intent', 'unknown') + confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) + if confidence < 0.8: + uncertainty_areas.append({ + "aspect": f"Intent classification ({primary_intent}) for user's specific request", + "confidence": confidence, + "mitigation": "Provided multiple interpretation options and context-aware analysis" + }) + + # Response quality uncertainty + coherence_score = final_response.get('coherence_score', 0.7) + if coherence_score < 0.8: + uncertainty_areas.append({ + "aspect": "Response coherence and structure for the specific topic", + "confidence": coherence_score, + "mitigation": "Applied quality enhancement techniques and content relevance checks" + }) + + # Safety uncertainty + safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) + if safety_score < 0.9: + uncertainty_areas.append({ + "aspect": "Content safety and bias assessment for educational content", + "confidence": safety_score, + "mitigation": "Generated advisory warnings for user awareness and content appropriateness" + }) + + # Content relevance uncertainty + response_text = str(final_response.get('final_response', '')) + if len(response_text) < 100: # Very short response + uncertainty_areas.append({ + "aspect": "Response completeness for user's detailed request", + "confidence": 0.6, + "mitigation": "Enhanced response generation with topic-specific content" + }) + + return uncertainty_areas + + def _extract_evidence_sources(self, intent_result: dict, final_response: dict, context: dict) -> list: + """Extract evidence sources for reasoning based on actual content""" + evidence_sources = [] + + # Intent evidence + evidence_sources.append({ + "type": "linguistic_analysis", + "source": "Pattern matching and NLP analysis", + "relevance": 0.9, + "description": f"Intent classification based on linguistic patterns for '{intent_result.get('primary_intent', 'unknown')}' intent" + }) + + # Context evidence + interactions = context.get('interactions', []) + if interactions: + evidence_sources.append({ + "type": "conversation_history", + "source": f"Previous {len(interactions)} interactions", + "relevance": 0.7, + "description": f"Conversation context and topic continuity analysis" + }) + + # Synthesis evidence + synthesis_method = final_response.get('synthesis_method', 'unknown') + evidence_sources.append({ + "type": "synthesis_method", + "source": f"{synthesis_method} approach", + "relevance": 0.8, + "description": f"Response generated using {synthesis_method} methodology with quality optimization" + }) + + # Content-specific evidence + response_text = str(final_response.get('final_response', '')) + if len(response_text) > 1000: + evidence_sources.append({ + "type": "content_analysis", + "source": "Comprehensive content generation", + "relevance": 0.85, + "description": "Detailed response generation based on user's specific requirements" + }) + + return evidence_sources + + def _calibrate_confidence_scores(self, reasoning_chain: dict) -> dict: + """Calibrate confidence scores across the reasoning chain""" + chain_of_thought = reasoning_chain.get('chain_of_thought', {}) + + # Calculate overall confidence + step_confidences = [] + for step_data in chain_of_thought.values(): + if isinstance(step_data, dict) and 'confidence' in step_data: + step_confidences.append(step_data['confidence']) + + overall_confidence = sum(step_confidences) / len(step_confidences) if step_confidences else 0.7 + + return { + "overall_confidence": overall_confidence, + "step_count": len(chain_of_thought), + "confidence_distribution": { + "high_confidence": len([c for c in step_confidences if c > 0.8]), + "medium_confidence": len([c for c in step_confidences if 0.6 <= c <= 0.8]), + "low_confidence": len([c for c in step_confidences if c < 0.6]) + }, + "calibration_method": "Weighted average of step confidences" + } + + async def _extract_main_topic(self, user_input: str, context: dict = None) -> str: + """Extract the main topic using LLM zero-shot classification with caching""" + try: + # Check cache first + import hashlib + cache_key = hashlib.md5(user_input.encode()).hexdigest() + if cache_key in self._topic_cache: + logger.debug(f"Topic cache hit for: {user_input[:50]}...") + return self._topic_cache[cache_key] + + # Use LLM for accurate topic extraction + if self.llm_router: + # Build context summary if available + context_info = "" + if context: + session_context = context.get('session_context', {}) + session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" + interaction_count = len(context.get('interaction_contexts', [])) + + if session_summary: + context_info = f"\n\nSession context: {session_summary[:200]}" + if interaction_count > 0: + context_info += f"\nPrevious interactions in session: {interaction_count}" + + prompt = f"""Classify the main topic of this query in 2-5 words. Be specific and concise. + +Query: "{user_input}"{context_info} + +Respond with ONLY the topic name (e.g., "Machine Learning", "Healthcare Analytics", "Financial Modeling", "Software Development", "Educational Curriculum"). + +Do not include explanations, just the topic name. Maximum 5 words.""" + + topic_result = await self.llm_router.route_inference( + task_type="classification", + prompt=prompt, + max_tokens=20, + temperature=0.3 # Lower temperature for consistency + ) + + if topic_result and isinstance(topic_result, str) and topic_result.strip(): + topic = topic_result.strip() + # Clean up any extra text (LLM might add explanations) + # Take first line and first 5 words max + topic = topic.split('\n')[0].strip() + words = topic.split()[:5] + topic = " ".join(words) + + # Cache the result + if len(self._topic_cache) >= self._topic_cache_max_size: + # Remove oldest entry (simple FIFO) + oldest_key = next(iter(self._topic_cache)) + del self._topic_cache[oldest_key] + + self._topic_cache[cache_key] = topic + logger.debug(f"Topic extracted: {topic}") + return topic + + # Fallback to simple extraction if LLM unavailable + words = user_input.split()[:4] + fallback_topic = " ".join(words) if words else "General inquiry" + logger.warning(f"Using fallback topic extraction: {fallback_topic}") + return fallback_topic + + except Exception as e: + logger.error(f"Error in LLM-based topic extraction: {e}", exc_info=True) + # Fallback + words = user_input.split()[:4] + return " ".join(words) if words else "General inquiry" + + async def _extract_keywords(self, user_input: str) -> str: + """Extract key terms using LLM or simple extraction""" + try: + # Simple extraction for performance (keywords less critical than topic) + # Can be enhanced with LLM if needed + import re + # Extract meaningful words (3+ characters, not common stop words) + stop_words = {'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', 'his', 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two', 'way', 'who', 'boy', 'did', 'she', 'use', 'her', 'many', 'some', 'time', 'very', 'when', 'come', 'here', 'just', 'like', 'long', 'make', 'over', 'such', 'take', 'than', 'them', 'well', 'were'} + + words = re.findall(r'\b[a-zA-Z]{3,}\b', user_input.lower()) + keywords = [w for w in words if w not in stop_words][:5] + + return ", ".join(keywords) if keywords else "General terms" + + except Exception as e: + logger.error(f"Error in keyword extraction: {e}", exc_info=True) + return "General terms" + + def _assess_query_complexity(self, user_input: str) -> str: + """Assess the complexity of the user query""" + word_count = len(user_input.split()) + question_count = user_input.count('?') + + if word_count > 50 and question_count > 2: + return "Highly complex multi-part query" + elif word_count > 30 and question_count > 1: + return "Moderately complex query" + elif word_count > 15: + return "Standard complexity query" + else: + return "Simple query" + + def _determine_response_scope(self, user_input: str) -> str: + """Determine the scope of response needed""" + input_lower = user_input.lower() + + if any(word in input_lower for word in ['detailed', 'comprehensive', 'complete', 'full']): + return "Comprehensive detailed response" + elif any(word in input_lower for word in ['brief', 'short', 'summary', 'overview']): + return "Brief summary response" + elif any(word in input_lower for word in ['step by step', 'tutorial', 'guide', 'how to']): + return "Step-by-step instructional response" + else: + return "Standard informative response" + + def _assess_content_relevance(self, user_input: str, final_response: dict) -> str: + """Assess how relevant the response content is to the user input""" + response_text = str(final_response.get('final_response', '')) + + # Simple relevance check based on keyword overlap + input_words = set(user_input.lower().split()) + response_words = set(response_text.lower().split()) + + overlap = len(input_words.intersection(response_words)) + total_input_words = len(input_words) + + if overlap / total_input_words > 0.3: + return "High relevance to user query" + elif overlap / total_input_words > 0.15: + return "Moderate relevance to user query" + else: + return "Low relevance to user query" + + def _assess_content_appropriateness(self, user_input: str, safety_checked: dict) -> str: + """Assess content appropriateness for the topic""" + warnings = safety_checked.get('warnings', []) + safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) + + if safety_score > 0.9 and len(warnings) == 0: + return "Highly appropriate content" + elif safety_score > 0.8 and len(warnings) <= 1: + return "Appropriate content with minor notes" + else: + return "Content requires review" + + def check_safety_thresholds(self, safety_result: dict) -> tuple: + """Check if safety scores exceed thresholds""" + safety_analysis = safety_result.get('safety_analysis', {}) + + # Try to get safety_scores first (LLM format), fallback to confidence_scores (pattern format) + safety_scores = safety_analysis.get('safety_scores', {}) + if not safety_scores: + # Map confidence_scores to safety_scores format + confidence_scores = safety_analysis.get('confidence_scores', {}) + safety_scores = { + "toxicity_or_harmful_language": confidence_scores.get('toxicity', 0.0), + "potential_biases_or_stereotypes": confidence_scores.get('bias', 0.0), + "privacy_or_security_concerns": confidence_scores.get('privacy', 0.0), + "controversial_or_sensitive_topics": 1.0 - confidence_scores.get('safety', 1.0) + } + + exceeded_categories = [] + + for category, score in safety_scores.items(): + threshold = self.safety_thresholds.get(category, 1.0) + if isinstance(score, (int, float)) and score > threshold: + exceeded_categories.append(category) + logger.info(f"Safety threshold exceeded: {category} = {score} > {threshold}") + + return len(exceeded_categories) > 0, exceeded_categories + + def generate_revision_prompt(self, original_response: str, exceeded_categories: list, + safety_warnings: list) -> str: + """Generate revision prompt for synthesis agent""" + + category_instructions = { + "toxicity_or_harmful_language": "Remove any language that could be harmful or offensive", + "potential_biases_or_stereotypes": "Ensure response provides balanced alternatives and avoids stereotypes. Include diverse perspectives and specific qualifications needed for each career path", + "privacy_or_security_concerns": "Remove any content that could compromise privacy or security", + "controversial_or_sensitive_topics": "Present balanced viewpoints and acknowledge different perspectives" + } + + revision_instructions = [] + for category in exceeded_categories: + if category in category_instructions: + revision_instructions.append(category_instructions[category]) + + prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. + +Original Response: +{original_response} + +Safety Issues Detected: +{', '.join(exceeded_categories)} + +Specific Warnings: +{'; '.join(safety_warnings) if safety_warnings else 'General safety concerns detected'} + +REVISION INSTRUCTIONS: +{' | '.join(revision_instructions)} + +Please revise the response to address these concerns while maintaining helpfulness and accuracy. Ensure the response: +1. Addresses the user's original question completely +2. Provides specific, actionable alternatives with clear qualifications needed +3. Avoids generalizations and stereotypes about career transitions +4. Includes necessary skills, education, and experience requirements +5. Maintains a balanced, inclusive perspective that acknowledges different paths + +Revised Response:""" + + return prompt + + async def process_request_with_revision(self, session_id: str, user_input: str) -> dict: + """Enhanced process_request with safety revision loop and timeout protection""" + try: + return await asyncio.wait_for( + self._process_request_with_revision_internal(session_id, user_input), + timeout=self.revision_timeout + ) + except asyncio.TimeoutError: + logger.error(f"Safety revision timed out after {self.revision_timeout}s") + # Fallback to basic response + return { + 'final_response': 'Request processing took longer than expected. Please try again.', + 'response': 'Request processing took longer than expected. Please try again.', + 'revision_attempts': 0, + 'timeout_error': True, + 'safety_revision_applied': False + } + + async def _process_request_with_revision_internal(self, session_id: str, user_input: str) -> dict: + """Internal revision loop with comprehensive error handling""" + + revision_attempt = 0 + current_response = None + final_result = None + exceeded_categories = [] # ✅ Fix: Initialize variables + safety_warnings = [] # ✅ Fix: Initialize variables + + while revision_attempt <= self.max_revision_attempts: + try: + # For revision attempts, modify the input to include revision instructions + processing_input = user_input + if revision_attempt > 0: + processing_input = self.generate_revision_prompt( + current_response, + exceeded_categories, + safety_warnings + ) + logger.info(f"Revision attempt {revision_attempt}: regenerating response with safety improvements") + + # Execute normal processing flow + result = await self.process_request(session_id, processing_input) + + # Extract the response text + current_response = result.get('final_response') or result.get('response', '') + + if not current_response: + # Fallback: try to extract from metadata + metadata = result.get('metadata', {}) + current_response = metadata.get('synthesis_result', {}).get('final_response', '') + + if not current_response: + logger.warning("Could not extract response text for safety check") + return result + + # Execute safety check on the response + safety_checked = await self.agents['safety_check'].execute( + response=current_response, + context=result.get('context', {}) + ) + + # Check if revision is needed + needs_revision, exceeded_categories = self.check_safety_thresholds(safety_checked) + safety_warnings = safety_checked.get('warnings', []) + + if not needs_revision: + # Safety thresholds met + logger.info(f"Safety check passed on attempt {revision_attempt + 1}") + result['safety_result'] = safety_checked + result['revision_attempts'] = revision_attempt + result['safety_revision_applied'] = revision_attempt > 0 + + # Update metadata with safety info + if 'metadata' not in result: + result['metadata'] = {} + result['metadata']['safety_result'] = safety_checked + result['metadata']['revision_attempts'] = revision_attempt + + return result + + if revision_attempt >= self.max_revision_attempts: + # Max attempts reached - handle gracefully based on input complexity + logger.warning(f"Max revision attempts reached. Categories still exceeded: {exceeded_categories}") + + input_complexity = self._assess_input_complexity(user_input) + + # For complex inputs, offer intelligent re-attempt instead of asking user to rephrase + if input_complexity["is_complex"] and input_complexity["complexity_score"] > 25: + logger.info("Complex input detected - attempting intelligent re-prompt") + try: + # Generate improved prompt automatically + improved_prompt = self._generate_improved_prompt(user_input, exceeded_categories) + + # One final attempt with improved prompting + improved_result = await self.process_request(session_id, improved_prompt) + improved_response = improved_result.get('final_response', '') + + # Quick safety check on improved response + final_safety_check = await self.agents['safety_check'].execute( + response=improved_response, + context=improved_result.get('context', {}) + ) + + improved_needs_revision, improved_exceeded = self.check_safety_thresholds(final_safety_check) + + if not improved_needs_revision: + # Success with intelligent re-prompting + logger.info("Intelligent re-prompt resolved safety concerns") + improved_result['safety_result'] = final_safety_check + improved_result['revision_attempts'] = revision_attempt + 1 + improved_result['intelligent_reprompt_success'] = True + if 'metadata' not in improved_result: + improved_result['metadata'] = {} + improved_result['metadata']['safety_result'] = final_safety_check + improved_result['metadata']['revision_attempts'] = revision_attempt + 1 + improved_result['metadata']['intelligent_reprompt_success'] = True + return improved_result + else: + # Still has issues - proceed with guidance + logger.info("Intelligent re-prompt did not fully resolve concerns") + current_response = improved_response + safety_checked = final_safety_check + exceeded_categories = improved_exceeded + + except Exception as e: + logger.warning(f"Intelligent re-prompt failed: {e}", exc_info=True) + # Continue with original response and guidance + + # Add user-friendly warning summary with appropriate guidance + warning_summary = self._generate_warning_summary(exceeded_categories, safety_checked.get('warnings', [])) + user_guidance = self._generate_user_guidance(exceeded_categories, user_input) + + # Append guidance to response + original_response = result.get('final_response', '') + enhanced_response = f"{original_response}\n\n{warning_summary}\n\n{user_guidance}" + + result['final_response'] = enhanced_response + result['response'] = enhanced_response # Also update response for compatibility + result['safety_result'] = safety_checked + result['revision_attempts'] = revision_attempt + result['safety_exceeded'] = exceeded_categories + result['safety_revision_applied'] = revision_attempt > 0 + result['warning_summary_added'] = True + result['input_complexity'] = input_complexity + + # Update metadata + if 'metadata' not in result: + result['metadata'] = {} + result['metadata']['safety_result'] = safety_checked + result['metadata']['revision_attempts'] = revision_attempt + result['metadata']['safety_exceeded'] = exceeded_categories + result['metadata']['input_complexity'] = input_complexity + + return result + + # Store for next revision + final_result = result + revision_attempt += 1 + logger.info(f"Generating revision attempt {revision_attempt} for: {exceeded_categories}") + + except Exception as e: + logger.error(f"Error in safety revision attempt {revision_attempt}: {e}", exc_info=True) + if final_result: + final_result['revision_error'] = str(e) + if 'metadata' not in final_result: + final_result['metadata'] = {} + final_result['metadata']['revision_error'] = str(e) + return final_result + # If we don't have a result yet, return the error result + return { + 'response': 'Error in processing with safety revision', + 'final_response': 'Error in processing with safety revision', + 'revision_attempts': revision_attempt, + 'revision_error': str(e), + 'error': str(e) + } + + # Fallback - should not reach here + return final_result or { + 'response': 'Error in safety revision processing', + 'final_response': 'Error in safety revision processing', + 'revision_attempts': revision_attempt, + 'safety_revision_applied': False + } + + def _generate_warning_summary(self, exceeded_categories: list, safety_warnings: list) -> str: + """Generate user-friendly warning summary""" + category_explanations = { + "potential_biases_or_stereotypes": "may contain assumptions about career transitions that don't account for individual circumstances", + "toxicity_or_harmful_language": "contains language that could be harmful or inappropriate", + "privacy_or_security_concerns": "includes content that could raise privacy or security considerations", + "controversial_or_sensitive_topics": "touches on topics that may benefit from additional perspective" + } + + if not exceeded_categories: + return "" + + warning_text = "**Note**: This response " + ", ".join([ + category_explanations.get(cat, f"has concerns related to {cat}") + for cat in exceeded_categories + ]) + "." + + return warning_text + + def _generate_user_guidance(self, exceeded_categories: list, original_user_input: str) -> str: + """Generate proactive user guidance with UX-friendly options for complex prompts""" + if not exceeded_categories: + return "" + + input_complexity = self._assess_input_complexity(original_user_input) + + guidance_templates = { + "potential_biases_or_stereotypes": { + "issue": "avoid assumptions about career paths", + "simple_suggestion": "ask for advice tailored to specific qualifications or industry interests", + "complex_refinement": "add details like your specific skills, target industry, or education level" + }, + "toxicity_or_harmful_language": { + "issue": "ensure respectful communication", + "simple_suggestion": "rephrase using more neutral language", + "complex_refinement": "adjust the tone while keeping your detailed context" + }, + "privacy_or_security_concerns": { + "issue": "protect sensitive information", + "simple_suggestion": "ask for general guidance instead", + "complex_refinement": "remove specific personal details while keeping the scenario structure" + }, + "controversial_or_sensitive_topics": { + "issue": "get balanced perspectives", + "simple_suggestion": "ask for multiple viewpoints or balanced analysis", + "complex_refinement": "specify you'd like pros/cons or different perspectives included" + } + } + + primary_category = exceeded_categories[0] + guidance = guidance_templates.get(primary_category, { + "issue": "improve response quality", + "simple_suggestion": "try rephrasing with more specific details", + "complex_refinement": "add clarifying details to your existing question" + }) + + # Topic extraction removed from error recovery to avoid async complexity + # Error recovery uses simplified context + topic = "Error recovery context" + + # Adaptive guidance based on input complexity + if input_complexity["is_complex"]: + return f"""**Want a better response?** To {guidance['issue']} in responses about {topic}, you could {guidance['complex_refinement']} rather than rewriting your detailed question. Or simply ask again as-is and I'll focus on providing more balanced information.""" + else: + return f"""**Want a better response?** To {guidance['issue']} in future responses about {topic}, you could {guidance['simple_suggestion']}. Feel free to ask again with any adjustments!""" + + def _assess_input_complexity(self, user_input: str) -> dict: + """Assess input complexity to determine appropriate UX guidance""" + word_count = len(user_input.split()) + sentence_count = user_input.count('.') + user_input.count('!') + user_input.count('?') + has_context = any(phrase in user_input.lower() for phrase in [ + 'i am currently', 'my situation', 'my background', 'i have been', + 'my experience', 'i work', 'my company', 'specific to my' + ]) + has_constraints = any(phrase in user_input.lower() for phrase in [ + 'must', 'need to', 'required', 'limited by', 'constraint', 'budget', + 'timeline', 'deadline', 'specific requirements' + ]) + + is_complex = ( + word_count > 30 or + sentence_count > 2 or + has_context or + has_constraints + ) + + return { + "is_complex": is_complex, + "word_count": word_count, + "has_personal_context": has_context, + "has_constraints": has_constraints, + "complexity_score": word_count * 0.1 + sentence_count * 5 + (has_context * 10) + (has_constraints * 10) + } + + def _generate_improved_prompt(self, original_input: str, exceeded_categories: list) -> str: + """Generate improved prompt for complex inputs to resolve safety concerns automatically""" + + improvements = [] + + if "potential_biases_or_stereotypes" in exceeded_categories: + improvements.append("Please provide specific qualifications, skills, and requirements for each option") + improvements.append("Include diverse pathways and acknowledge individual circumstances vary") + + if "toxicity_or_harmful_language" in exceeded_categories: + improvements.append("Use respectful, professional language throughout") + + if "privacy_or_security_concerns" in exceeded_categories: + improvements.append("Focus on general guidance without personal specifics") + + if "controversial_or_sensitive_topics" in exceeded_categories: + improvements.append("Present balanced perspectives and multiple viewpoints") + + improvement_instructions = ". ".join(improvements) + + improved_prompt = f"""{original_input} + +Additional guidance for response: {improvement_instructions}. Ensure all advice is specific, actionable, and acknowledges different backgrounds and circumstances.""" + + return improved_prompt + + def check_query_similarity(self, new_query: str, threshold: float = 0.85) -> Optional[Dict]: + """ + Step 3: Add Query Similarity Detection + + Check if new query is similar to any recent queries above threshold. + Uses simple string similarity (can be enhanced with embeddings later). + + Args: + new_query: The new query to check + threshold: Similarity threshold (default 0.85) + + Returns: + Cached response dict if similar query found, None otherwise + """ + if not self.recent_queries: + return None + + new_query_lower = new_query.lower().strip() + + for cached_query_data in reversed(self.recent_queries): # Check most recent first + cached_query = cached_query_data.get('query', '') + if not cached_query: + continue + + cached_query_lower = cached_query.lower().strip() + + # Calculate similarity using simple word overlap (Jaccard similarity) + similarity = self._calculate_similarity(new_query_lower, cached_query_lower) + + if similarity > threshold: + logger.info(f"Similar query detected (similarity: {similarity:.2f}): '{new_query[:50]}...' similar to '{cached_query[:50]}...'") + return cached_query_data.get('response') + + return None + + def _calculate_similarity(self, query1: str, query2: str) -> float: + """ + Calculate similarity between two queries using Jaccard similarity on words. + Can be enhanced with embeddings for semantic similarity. + """ + if not query1 or not query2: + return 0.0 + + # Split into words and create sets + words1 = set(query1.split()) + words2 = set(query2.split()) + + if not words1 or not words2: + return 0.0 + + # Calculate Jaccard similarity + intersection = len(words1.intersection(words2)) + union = len(words1.union(words2)) + + if union == 0: + return 0.0 + + jaccard = intersection / union + + # Also check for substring similarity for very similar queries + if query1 in query2 or query2 in query1: + jaccard = max(jaccard, 0.9) + + return jaccard + + def track_response_metrics(self, start_time: float, response: Dict): + """ + Step 5: Add Response Metrics Tracking + + Track performance metrics for responses. + + Args: + start_time: Start time from time.time() + response: Response dictionary containing response data + """ + try: + latency = time.time() - start_time + + # Extract response text for token counting + response_text = ( + response.get('response') or + response.get('final_response') or + str(response.get('result', '')) + ) + + # Approximate token count (4 characters ≈ 1 token) + token_count = len(response_text.split()) if response_text else 0 + + # Extract safety score + safety_score = 0.8 # Default + if 'metadata' in response: + synthesis_result = response['metadata'].get('synthesis_result', {}) + safety_result = response['metadata'].get('safety_result', {}) + if safety_result: + safety_analysis = safety_result.get('safety_analysis', {}) + safety_score = safety_analysis.get('overall_safety_score', 0.8) + + metrics = { + 'latency': latency, + 'token_count': token_count, + 'agent_calls': self.agent_call_count, + 'safety_score': safety_score, + 'timestamp': datetime.now().isoformat() + } + + # Store in history (keep last 100) + self.response_metrics_history.append(metrics) + if len(self.response_metrics_history) > 100: + self.response_metrics_history = self.response_metrics_history[-100:] + + # Log metrics + logger.info(f"Response Metrics - Latency: {latency:.3f}s, Tokens: {token_count}, " + f"Agent Calls: {self.agent_call_count}, Safety Score: {safety_score:.2f}") + + # Reset agent call count for next request + self.agent_call_count = 0 + + except Exception as e: + logger.error(f"Error tracking response metrics: {e}", exc_info=True)