Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from pathlib import Path | |
| import tempfile | |
| import time | |
| import json | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| import os | |
| import sys | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| try: | |
| from logger.custom_logger import CustomLoggerTracker | |
| custom_log = CustomLoggerTracker() | |
| logger = custom_log.get_logger("rag_demo") | |
| except ImportError: | |
| # Fallback to standard logging if custom logger not available | |
| logger = logging.getLogger("rag_demo") | |
| # Import RAG components | |
| try: | |
| from src.config import Config | |
| from src.ingestion_pipeline import DocumentIngestionPipeline, IngestionResult | |
| from src.rag_engine import RAGEngine, RAGResponse | |
| from src.metadata_manager import MetadataManager | |
| from src.document_processor import ProcessingStatus | |
| except ImportError as e: | |
| st.error(f"Failed to import RAG components: {e}") | |
| st.stop() | |
| class RAGDemo: | |
| def __init__(self): | |
| """Initialize the RAG demo application.""" | |
| self.config = None | |
| self.ingestion_pipeline = None | |
| self.rag_engine = None | |
| self.metadata_manager = None | |
| # Initialize session state | |
| if 'initialized' not in st.session_state: | |
| st.session_state.initialized = False | |
| st.session_state.documents = [] | |
| st.session_state.chat_history = [] | |
| def initialize_system(self): | |
| try: | |
| # Load configuration | |
| self.config = Config("src/config.yaml") | |
| # Initialize components | |
| config_dict = { | |
| 'siliconflow_api_key': self.config.siliconflow_api_key, | |
| 'groq_api_key': self.config.groq_api_key, | |
| 'qdrant_url': self.config.qdrant_url, | |
| 'qdrant_api_key': self.config.qdrant_api_key, | |
| **self.config.rag_config, | |
| **self.config.document_processing_config, | |
| **self.config.storage_config | |
| } | |
| self.ingestion_pipeline = DocumentIngestionPipeline(config_dict) | |
| self.rag_engine = RAGEngine(config_dict) | |
| self.metadata_manager = MetadataManager(config_dict) | |
| st.session_state.initialized = True | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to initialize RAG system: {e}") | |
| return False | |
| def run(self): | |
| """Run the Streamlit demo application.""" | |
| st.set_page_config( | |
| page_title="Manufacturing RAG Agent", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| st.title("π Manufacturing RAG Agent") | |
| st.markdown("*Intelligent document analysis for manufacturing data*") | |
| # Initialize system if not already done | |
| if not st.session_state.initialized: | |
| with st.spinner("Initializing RAG system..."): | |
| if not self.initialize_system(): | |
| st.stop() | |
| # Sidebar navigation | |
| page = st.sidebar.selectbox( | |
| "Navigation", | |
| ["π Document Upload", "β Ask Questions", "π Analytics", "βοΈ System Status"] | |
| ) | |
| # Route to appropriate page | |
| if page == "π Document Upload": | |
| self.document_upload_page() | |
| elif page == "β Ask Questions": | |
| self.question_answering_page() | |
| elif page == "π Analytics": | |
| self.analytics_page() | |
| elif page == "βοΈ System Status": | |
| self.system_status_page() | |
| def document_upload_page(self): | |
| """Document upload and management page.""" | |
| st.header("π Document Upload & Management") | |
| # File upload section | |
| st.subheader("Upload Documents") | |
| uploaded_files = st.file_uploader( | |
| "Choose files to upload", | |
| type=['pdf', 'xlsx', 'xls', 'xlsm', 'png', 'jpg', 'jpeg'], | |
| accept_multiple_files=True, | |
| help="Supported formats: PDF, Excel (.xlsx, .xls, .xlsm), Images (.png, .jpg, .jpeg)" | |
| ) | |
| if uploaded_files: | |
| if st.button("Process Documents", type="primary"): | |
| self.process_uploaded_files(uploaded_files) | |
| # Document management section | |
| st.subheader("Document Library") | |
| self.display_document_library() | |
| def process_uploaded_files(self, uploaded_files): | |
| """Process uploaded files through the ingestion pipeline.""" | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| results_container = st.container() | |
| results = [] | |
| for i, uploaded_file in enumerate(uploaded_files): | |
| status_text.text(f"Processing {uploaded_file.name}...") | |
| progress_bar.progress((i + 1) / len(uploaded_files)) | |
| # Save uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| tmp_file_path = tmp_file.name | |
| try: | |
| # Process document | |
| result = self.ingestion_pipeline.ingest_document(tmp_file_path) | |
| results.append(result) | |
| # Clean up temporary file | |
| Path(tmp_file_path).unlink() | |
| except Exception as e: | |
| st.error(f"Failed to process {uploaded_file.name}: {e}") | |
| results.append(IngestionResult( | |
| document_id="error", | |
| filename=uploaded_file.name, | |
| success=False, | |
| processing_time=0.0, | |
| chunks_created=0, | |
| chunks_indexed=0, | |
| error_message=str(e) | |
| )) | |
| # Display results | |
| status_text.text("Processing complete!") | |
| self.display_processing_results(results, results_container) | |
| # Refresh document library | |
| st.rerun() | |
| def display_processing_results(self, results: List[IngestionResult], container): | |
| """Display processing results in a formatted way.""" | |
| with container: | |
| st.subheader("Processing Results") | |
| # Summary metrics | |
| successful = sum(1 for r in results if r.success) | |
| total_chunks = sum(r.chunks_indexed for r in results if r.success) | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Documents Processed", f"{successful}/{len(results)}") | |
| with col2: | |
| st.metric("Total Chunks Created", total_chunks) | |
| with col3: | |
| avg_time = sum(r.processing_time for r in results) / len(results) | |
| st.metric("Avg Processing Time", f"{avg_time:.2f}s") | |
| # Detailed results | |
| for result in results: | |
| with st.expander(f"π {result.filename} - {'β Success' if result.success else 'β Failed'}"): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write(f"**Document ID:** {result.document_id}") | |
| st.write(f"**Processing Time:** {result.processing_time:.2f}s") | |
| st.write(f"**Chunks Created:** {result.chunks_created}") | |
| st.write(f"**Chunks Indexed:** {result.chunks_indexed}") | |
| with col2: | |
| if result.success: | |
| st.success("Document processed successfully!") | |
| else: | |
| st.error(f"Processing failed: {result.error_message}") | |
| if result.warnings: | |
| for warning in result.warnings: | |
| st.warning(warning) | |
| def display_document_library(self): | |
| """Display the document library with management options.""" | |
| try: | |
| # Get document list | |
| documents = self.metadata_manager.list_documents(limit=100) | |
| if not documents: | |
| st.info("No documents uploaded yet. Use the upload section above to add documents.") | |
| return | |
| # Create DataFrame for display | |
| doc_data = [] | |
| for doc in documents: | |
| doc_data.append({ | |
| 'Filename': doc.filename, | |
| 'Type': doc.file_type.upper(), | |
| 'Status': doc.processing_status.value.title(), | |
| 'Chunks': doc.total_chunks, | |
| 'Size': self.format_file_size(doc.file_size), | |
| 'Uploaded': doc.upload_timestamp.strftime('%Y-%m-%d %H:%M'), | |
| 'Processing Time': f"{doc.processing_time:.2f}s" if doc.processing_time else "N/A", | |
| 'Document ID': doc.document_id | |
| }) | |
| df = pd.DataFrame(doc_data) | |
| # Display with selection | |
| selected_indices = st.dataframe( | |
| df.drop('Document ID', axis=1), | |
| use_container_width=True, | |
| selection_mode="multi-row", | |
| on_select="rerun" | |
| ).selection.rows | |
| # Management actions | |
| if selected_indices: | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("π Reprocess Selected", help="Reprocess selected documents"): | |
| self.reprocess_documents([doc_data[i]['Document ID'] for i in selected_indices]) | |
| with col2: | |
| if st.button("ποΈ Delete Selected", help="Delete selected documents", type="secondary"): | |
| self.delete_documents([doc_data[i]['Document ID'] for i in selected_indices]) | |
| with col3: | |
| if st.button("π View Details", help="View detailed information"): | |
| self.show_document_details([doc_data[i]['Document ID'] for i in selected_indices]) | |
| except Exception as e: | |
| st.error(f"Failed to load document library: {e}") | |
| def question_answering_page(self): | |
| """Question answering interface.""" | |
| st.header("β Ask Questions") | |
| # Check if documents are available | |
| try: | |
| documents = self.metadata_manager.list_documents( | |
| status=ProcessingStatus.COMPLETED, | |
| limit=1 | |
| ) | |
| if not documents: | |
| st.warning("No processed documents available. Please upload and process documents first.") | |
| return | |
| except Exception as e: | |
| st.error(f"Failed to check document availability: {e}") | |
| return | |
| # Question input | |
| question = st.text_input( | |
| "Enter your question about the manufacturing documents:", | |
| placeholder="e.g., What is the average production yield for Q3?", | |
| help="Ask questions about processes, metrics, specifications, or any content in your uploaded documents." | |
| ) | |
| # Advanced options | |
| with st.expander("π§ Advanced Options"): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| max_results = st.slider("Max Context Chunks", 1, 10, 5) | |
| similarity_threshold = st.slider("Similarity Threshold", 0.0, 1.0, 0.7, 0.1) | |
| with col2: | |
| document_filter = st.selectbox( | |
| "Filter by Document Type", | |
| ["All", "PDF", "Excel", "Image"] | |
| ) | |
| enable_reranking = st.checkbox("Enable Reranking", value=True) | |
| # Ask question | |
| if st.button("π Ask Question", type="primary", disabled=not question): | |
| self.process_question(question, max_results, similarity_threshold, document_filter, enable_reranking) | |
| # Display chat history | |
| if st.session_state.chat_history: | |
| st.subheader("π¬ Recent Questions") | |
| for i, (q, response) in enumerate(reversed(st.session_state.chat_history[-5:])): | |
| with st.expander(f"Q: {q[:100]}..." if len(q) > 100 else f"Q: {q}"): | |
| self.display_rag_response(response) | |
| def process_question(self, question: str, max_results: int, similarity_threshold: float, | |
| document_filter: str, enable_reranking: bool): | |
| """Process a question through the RAG engine.""" | |
| with st.spinner("Searching documents and generating answer..."): | |
| try: | |
| # Prepare filters | |
| filters = {} | |
| if document_filter != "All": | |
| filters["document_type"] = document_filter.lower() | |
| # Update RAG engine config temporarily | |
| original_config = { | |
| 'final_top_k': self.rag_engine.final_top_k, | |
| 'similarity_threshold': self.rag_engine.similarity_threshold | |
| } | |
| self.rag_engine.final_top_k = max_results | |
| self.rag_engine.similarity_threshold = similarity_threshold | |
| # Get response | |
| response = self.rag_engine.answer_question(question, filters if filters else None) | |
| # Restore original config | |
| self.rag_engine.final_top_k = original_config['final_top_k'] | |
| self.rag_engine.similarity_threshold = original_config['similarity_threshold'] | |
| # Add to chat history | |
| st.session_state.chat_history.append((question, response)) | |
| # Display response | |
| self.display_rag_response(response) | |
| except Exception as e: | |
| st.error(f"Failed to process question: {e}") | |
| def display_rag_response(self, response: RAGResponse): | |
| """Display a RAG response with formatting and citations.""" | |
| if not response.success: | |
| st.error(f"Failed to generate answer: {response.error_message}") | |
| return | |
| # Main answer | |
| st.markdown("### π Answer") | |
| st.markdown(response.answer) | |
| # Metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Confidence", f"{response.confidence_score:.2f}") | |
| with col2: | |
| st.metric("Processing Time", f"{response.processing_time:.2f}s") | |
| with col3: | |
| st.metric("Sources Used", len(response.citations)) | |
| with col4: | |
| st.metric("Chunks Retrieved", response.total_chunks_retrieved) | |
| # Citations | |
| if response.citations: | |
| st.markdown("### π Sources & Citations") | |
| for i, citation in enumerate(response.citations): | |
| with st.expander(f"Source {i+1}: {citation.source_file} (Confidence: {citation.confidence:.2f})"): | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.markdown("**Text Snippet:**") | |
| st.markdown(f"*{citation.text_snippet}*") | |
| with col2: | |
| st.markdown("**Citation Details:**") | |
| if citation.page_number: | |
| st.write(f"π Page: {citation.page_number}") | |
| if citation.worksheet_name: | |
| st.write(f"π Sheet: {citation.worksheet_name}") | |
| if citation.cell_range: | |
| st.write(f"π Range: {citation.cell_range}") | |
| if citation.section_title: | |
| st.write(f"π Section: {citation.section_title}") | |
| # Performance breakdown | |
| with st.expander("β‘ Performance Details"): | |
| perf_data = { | |
| 'Stage': ['Retrieval', 'Reranking', 'Generation', 'Total'], | |
| 'Time (s)': [ | |
| response.retrieval_time, | |
| response.rerank_time, | |
| response.generation_time, | |
| response.processing_time | |
| ] | |
| } | |
| fig = px.bar( | |
| perf_data, | |
| x='Stage', | |
| y='Time (s)', | |
| title="Processing Time Breakdown" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| def analytics_page(self): | |
| """Analytics and statistics page.""" | |
| st.header("π Analytics Dashboard") | |
| try: | |
| # Get system statistics | |
| pipeline_stats = self.ingestion_pipeline.get_pipeline_stats() | |
| metadata_stats = self.metadata_manager.get_statistics() | |
| # Overview metrics | |
| st.subheader("π Overview") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Documents", metadata_stats.get('total_documents', 0)) | |
| with col2: | |
| st.metric("Total Chunks", metadata_stats.get('total_chunks', 0)) | |
| with col3: | |
| st.metric("Total File Size", self.format_file_size(metadata_stats.get('total_file_size', 0))) | |
| with col4: | |
| vector_stats = pipeline_stats.get('vector_store', {}) | |
| st.metric("Vector Points", vector_stats.get('total_points', 0)) | |
| # Document type distribution | |
| st.subheader("π Document Types") | |
| type_counts = metadata_stats.get('documents_by_type', {}) | |
| if type_counts: | |
| fig = px.pie( | |
| values=list(type_counts.values()), | |
| names=list(type_counts.keys()), | |
| title="Documents by Type" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Processing status distribution | |
| st.subheader("βοΈ Processing Status") | |
| status_counts = metadata_stats.get('documents_by_status', {}) | |
| if status_counts: | |
| fig = px.bar( | |
| x=list(status_counts.keys()), | |
| y=list(status_counts.values()), | |
| title="Documents by Processing Status" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Recent activity | |
| st.subheader("π Recent Activity") | |
| recent_docs = self.metadata_manager.list_documents(limit=10) | |
| if recent_docs: | |
| activity_data = [] | |
| for doc in recent_docs: | |
| activity_data.append({ | |
| 'Document': doc.filename, | |
| 'Status': doc.processing_status.value.title(), | |
| 'Chunks': doc.total_chunks, | |
| 'Upload Time': doc.upload_timestamp.strftime('%Y-%m-%d %H:%M:%S') | |
| }) | |
| st.dataframe(pd.DataFrame(activity_data), use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Failed to load analytics: {e}") | |
| def system_status_page(self): | |
| """System status and health check page.""" | |
| st.header("βοΈ System Status") | |
| # Health checks | |
| st.subheader("π₯ Health Checks") | |
| try: | |
| # RAG engine health | |
| rag_health = self.rag_engine.health_check() | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| status = "β Healthy" if rag_health.get('vector_store', False) else "β Unhealthy" | |
| st.metric("Vector Store", status) | |
| with col2: | |
| status = "β Healthy" if rag_health.get('llm_system', False) else "β Unhealthy" | |
| st.metric("LLM System", status) | |
| with col3: | |
| status = "β Healthy" if rag_health.get('embedding_system', False) else "β Unhealthy" | |
| st.metric("Embedding System", status) | |
| # Pipeline health | |
| pipeline_health = self.ingestion_pipeline.health_check() | |
| st.subheader("π§ Pipeline Components") | |
| for component, healthy in pipeline_health.items(): | |
| status = "β Healthy" if healthy else "β Unhealthy" | |
| st.write(f"**{component.replace('_', ' ').title()}:** {status}") | |
| # Configuration display | |
| st.subheader("βοΈ Configuration") | |
| with st.expander("View Current Configuration"): | |
| config_display = { | |
| "RAG Settings": { | |
| "Max Context Chunks": self.rag_engine.max_context_chunks, | |
| "Similarity Threshold": self.rag_engine.similarity_threshold, | |
| "Rerank Top K": self.rag_engine.rerank_top_k, | |
| "Final Top K": self.rag_engine.final_top_k | |
| }, | |
| "Pipeline Settings": { | |
| "Chunk Size": self.ingestion_pipeline.chunk_size, | |
| "Chunk Overlap": self.ingestion_pipeline.chunk_overlap, | |
| "Batch Size": self.ingestion_pipeline.batch_size, | |
| "Max Workers": self.ingestion_pipeline.max_workers | |
| } | |
| } | |
| st.json(config_display) | |
| except Exception as e: | |
| st.error(f"Failed to check system status: {e}") | |
| def format_file_size(self, size_bytes: int) -> str: | |
| """Format file size in human readable format.""" | |
| if size_bytes == 0: | |
| return "0B" | |
| size_names = ["B", "KB", "MB", "GB", "TB"] | |
| i = 0 | |
| while size_bytes >= 1024 and i < len(size_names) - 1: | |
| size_bytes /= 1024.0 | |
| i += 1 | |
| return f"{size_bytes:.1f}{size_names[i]}" | |
| def reprocess_documents(self, document_ids: List[str]): | |
| """Reprocess selected documents.""" | |
| with st.spinner(f"Reprocessing {len(document_ids)} documents..."): | |
| for doc_id in document_ids: | |
| try: | |
| result = self.ingestion_pipeline.reprocess_document(doc_id) | |
| if result.success: | |
| st.success(f"Successfully reprocessed {result.filename}") | |
| else: | |
| st.error(f"Failed to reprocess {result.filename}: {result.error_message}") | |
| except Exception as e: | |
| st.error(f"Error reprocessing document {doc_id}: {e}") | |
| st.rerun() | |
| def delete_documents(self, document_ids: List[str]): | |
| """Delete selected documents.""" | |
| if st.confirm(f"Are you sure you want to delete {len(document_ids)} documents? This action cannot be undone."): | |
| with st.spinner(f"Deleting {len(document_ids)} documents..."): | |
| for doc_id in document_ids: | |
| try: | |
| success = self.ingestion_pipeline.delete_document(doc_id) | |
| if success: | |
| st.success(f"Successfully deleted document {doc_id}") | |
| else: | |
| st.error(f"Failed to delete document {doc_id}") | |
| except Exception as e: | |
| st.error(f"Error deleting document {doc_id}: {e}") | |
| st.rerun() | |
| def show_document_details(self, document_ids: List[str]): | |
| """Show detailed information for selected documents.""" | |
| for doc_id in document_ids: | |
| try: | |
| metadata = self.metadata_manager.get_document_metadata(doc_id) | |
| if metadata: | |
| with st.expander(f"π {metadata.filename} Details"): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write(f"**Document ID:** {metadata.document_id}") | |
| st.write(f"**File Type:** {metadata.file_type}") | |
| st.write(f"**File Size:** {self.format_file_size(metadata.file_size)}") | |
| st.write(f"**Total Chunks:** {metadata.total_chunks}") | |
| with col2: | |
| st.write(f"**Upload Time:** {metadata.upload_timestamp}") | |
| st.write(f"**Processing Status:** {metadata.processing_status.value}") | |
| st.write(f"**Processing Time:** {metadata.processing_time:.2f}s" if metadata.processing_time else "N/A") | |
| st.write(f"**Checksum:** {metadata.checksum[:16]}..." if metadata.checksum else "N/A") | |
| if metadata.error_message: | |
| st.error(f"Error: {metadata.error_message}") | |
| if metadata.metadata_json: | |
| with st.expander("Raw Metadata"): | |
| try: | |
| metadata_dict = json.loads(metadata.metadata_json) | |
| st.json(metadata_dict) | |
| except: | |
| st.text(metadata.metadata_json) | |
| except Exception as e: | |
| st.error(f"Failed to load details for document {doc_id}: {e}") | |
| def main(): | |
| """Main function to run the Streamlit demo.""" | |
| demo = RAGDemo() | |
| demo.run() | |
| if __name__ == "__main__": | |
| main() |