Spaces:
Sleeping
Sleeping
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional, Tuple | |
| import openpyxl | |
| from openpyxl.worksheet.worksheet import Worksheet | |
| from openpyxl.drawing.image import Image as OpenpyxlImage | |
| from dataclasses import dataclass | |
| import io | |
| from PIL import Image as PILImage | |
| import sys | |
| import os | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from src.document_processor import ( | |
| DocumentProcessor, | |
| ProcessedDocument, | |
| DocumentType, | |
| ProcessingStatus, | |
| DocumentProcessingError, | |
| ExtractedImage, | |
| ExtractedTable, | |
| DocumentProcessorFactory | |
| ) | |
| try: | |
| from logger.custom_logger import CustomLoggerTracker | |
| custom_log = CustomLoggerTracker() | |
| logger = custom_log.get_logger("excel_processor") | |
| except ImportError: | |
| # Fallback to standard logging if custom logger not available | |
| logger = logging.getLogger("excel_processor") | |
| class ExcelWorksheetInfo: | |
| """Information about an Excel worksheet.""" | |
| name: str | |
| max_row: int | |
| max_column: int | |
| cell_count: int | |
| image_count: int | |
| table_count: int | |
| has_data: bool | |
| class CellRange: | |
| """Represents a range of cells in Excel.""" | |
| start_row: int | |
| start_col: int | |
| end_row: int | |
| end_col: int | |
| def to_excel_range(self) -> str: | |
| """Convert to Excel range notation (e.g., 'A1:C5').""" | |
| start_col_letter = openpyxl.utils.get_column_letter(self.start_col) | |
| end_col_letter = openpyxl.utils.get_column_letter(self.end_col) | |
| return f"{start_col_letter}{self.start_row}:{end_col_letter}{self.end_row}" | |
| class ExcelProcessor(DocumentProcessor): | |
| """ | |
| Excel document processor using openpyxl. | |
| This processor extracts data from Excel worksheets, embedded images, | |
| and maintains proper citations with worksheet names and cell references. | |
| """ | |
| def __init__(self, config: Dict[str, Any]): | |
| """ | |
| Initialize the Excel processor. | |
| Args: | |
| config: Configuration dictionary containing Excel processing settings | |
| """ | |
| super().__init__(config) | |
| self.extract_images = config.get('image_processing', True) | |
| self.extract_tables = config.get('table_extraction', True) | |
| self.min_table_rows = config.get('min_table_rows', 2) | |
| self.min_table_cols = config.get('min_table_cols', 2) | |
| self.max_empty_rows = config.get('max_empty_rows', 5) | |
| self.max_empty_cols = config.get('max_empty_cols', 5) | |
| logger.info(f"Excel processor initialized with image_processing={self.extract_images}, " | |
| f"table_extraction={self.extract_tables}") | |
| def _get_supported_extensions(self) -> List[str]: | |
| """Get supported file extensions for Excel processor.""" | |
| return ['.xlsx', '.xls', '.xlsm'] | |
| def process_document(self, file_path: str) -> ProcessedDocument: | |
| """ | |
| Process an Excel document and extract all content. | |
| Args: | |
| file_path: Path to the Excel file | |
| Returns: | |
| ProcessedDocument with extracted content and metadata | |
| Raises: | |
| DocumentProcessingError: If Excel processing fails | |
| """ | |
| try: | |
| # Validate file first | |
| self.validate_file(file_path) | |
| # Generate document ID | |
| document_id = self._generate_document_id(file_path) | |
| logger.info(f"Processing Excel document: {file_path}") | |
| # Open Excel workbook | |
| workbook = openpyxl.load_workbook(file_path, data_only=True) | |
| try: | |
| # Extract metadata | |
| metadata = self._extract_metadata(workbook, file_path) | |
| # Process all worksheets | |
| all_content = [] | |
| all_images = [] | |
| all_tables = [] | |
| worksheet_info = [] | |
| for sheet_name in workbook.sheetnames: | |
| worksheet = workbook[sheet_name] | |
| logger.debug(f"Processing worksheet: {sheet_name}") | |
| # Extract data from worksheet | |
| sheet_content = self._extract_worksheet_content(worksheet, sheet_name) | |
| if sheet_content.strip(): | |
| all_content.append(f"[Worksheet: {sheet_name}]\n{sheet_content}") | |
| # Extract images if enabled | |
| if self.extract_images: | |
| sheet_images = self._extract_worksheet_images(worksheet, sheet_name, document_id) | |
| all_images.extend(sheet_images) | |
| # Extract tables if enabled | |
| if self.extract_tables: | |
| sheet_tables = self._extract_worksheet_tables(worksheet, sheet_name) | |
| all_tables.extend(sheet_tables) | |
| # Collect worksheet info | |
| worksheet_info.append(ExcelWorksheetInfo( | |
| name=sheet_name, | |
| max_row=worksheet.max_row or 0, | |
| max_column=worksheet.max_column or 0, | |
| cell_count=self._count_non_empty_cells(worksheet), | |
| image_count=len(sheet_images) if self.extract_images else 0, | |
| table_count=len(sheet_tables) if self.extract_tables else 0, | |
| has_data=bool(sheet_content.strip()) | |
| )) | |
| # Combine all content | |
| full_content = "\n\n".join(all_content) | |
| # Update metadata with processing info | |
| metadata.update({ | |
| 'total_worksheets': len(workbook.sheetnames), | |
| 'worksheet_names': workbook.sheetnames, | |
| 'total_images': len(all_images), | |
| 'total_tables': len(all_tables), | |
| 'total_content_length': len(full_content), | |
| 'worksheet_info': [ | |
| { | |
| 'name': info.name, | |
| 'max_row': info.max_row, | |
| 'max_column': info.max_column, | |
| 'cell_count': info.cell_count, | |
| 'image_count': info.image_count, | |
| 'table_count': info.table_count, | |
| 'has_data': info.has_data | |
| } | |
| for info in worksheet_info | |
| ] | |
| }) | |
| # Create processed document | |
| processed_doc = ProcessedDocument( | |
| document_id=document_id, | |
| filename=Path(file_path).name, | |
| file_path=file_path, | |
| document_type=DocumentType.EXCEL, | |
| content=full_content, | |
| metadata=metadata, | |
| images=all_images, | |
| tables=all_tables, | |
| processing_status=ProcessingStatus.COMPLETED | |
| ) | |
| logger.info(f"Successfully processed Excel: {len(workbook.sheetnames)} worksheets, " | |
| f"{len(all_images)} images, {len(all_tables)} tables") | |
| return processed_doc | |
| finally: | |
| workbook.close() | |
| except Exception as e: | |
| logger.error(f"Failed to process Excel {file_path}: {e}") | |
| # Create failed document | |
| document_id = self._generate_document_id(file_path) | |
| return ProcessedDocument( | |
| document_id=document_id, | |
| filename=Path(file_path).name, | |
| file_path=file_path, | |
| document_type=DocumentType.EXCEL, | |
| content="", | |
| metadata={}, | |
| processing_status=ProcessingStatus.FAILED, | |
| error_message=str(e) | |
| ) | |
| def _extract_metadata(self, workbook: openpyxl.Workbook, file_path: str) -> Dict[str, Any]: | |
| metadata = {} | |
| try: | |
| props = workbook.properties | |
| if props.title: | |
| metadata['title'] = props.title | |
| if props.creator: | |
| metadata['creator'] = props.creator | |
| if props.description: | |
| metadata['description'] = props.description | |
| if props.subject: | |
| metadata['subject'] = props.subject | |
| if props.keywords: | |
| metadata['keywords'] = props.keywords | |
| if props.category: | |
| metadata['category'] = props.category | |
| if props.created: | |
| metadata['created'] = props.created.isoformat() | |
| if props.modified: | |
| metadata['modified'] = props.modified.isoformat() | |
| if props.lastModifiedBy: | |
| metadata['last_modified_by'] = props.lastModifiedBy | |
| # Workbook info | |
| metadata['worksheet_count'] = len(workbook.sheetnames) | |
| metadata['active_sheet'] = workbook.active.title if workbook.active else None | |
| # File info | |
| file_path_obj = Path(file_path) | |
| metadata['file_size'] = file_path_obj.stat().st_size | |
| metadata['file_extension'] = file_path_obj.suffix | |
| except Exception as e: | |
| logger.warning(f"Failed to extract Excel metadata: {e}") | |
| metadata['metadata_extraction_error'] = str(e) | |
| return metadata | |
| def _extract_worksheet_content(self, worksheet: Worksheet, sheet_name: str) -> str: | |
| try: | |
| content_lines = [] | |
| if not worksheet.max_row or worksheet.max_row == 1: | |
| return "" | |
| # Iterate through rows and columns | |
| for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row, | |
| min_col=1, max_col=worksheet.max_column, | |
| values_only=True): | |
| # Convert row values to strings, handling None values | |
| row_values = [] | |
| for cell_value in row: | |
| if cell_value is not None: | |
| # Handle different data types | |
| if isinstance(cell_value, (int, float)): | |
| row_values.append(str(cell_value)) | |
| elif isinstance(cell_value, str): | |
| row_values.append(cell_value.strip()) | |
| else: | |
| row_values.append(str(cell_value)) | |
| else: | |
| row_values.append("") | |
| # Skip completely empty rows | |
| if any(val.strip() for val in row_values if val): | |
| # Join non-empty values with tabs | |
| row_text = "\t".join(row_values) | |
| content_lines.append(row_text) | |
| return "\n".join(content_lines) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract content from worksheet {sheet_name}: {e}") | |
| return "" | |
| def _extract_worksheet_images(self, worksheet: Worksheet, sheet_name: str, document_id: str) -> List[ExtractedImage]: | |
| images = [] | |
| try: | |
| # Get images from worksheet | |
| if hasattr(worksheet, '_images') and worksheet._images: | |
| for img_index, img in enumerate(worksheet._images): | |
| try: | |
| # Extract image data | |
| image_data = self._extract_image_data(img) | |
| if not image_data: | |
| continue | |
| # Create image object | |
| image_id = f"{document_id}_{sheet_name}_img{img_index}" | |
| filename = f"{sheet_name}_image{img_index}.{image_data['format'].lower()}" | |
| # Get image position if available | |
| anchor_info = {} | |
| if hasattr(img, 'anchor') and img.anchor: | |
| if hasattr(img.anchor, '_from'): | |
| anchor_info['from_cell'] = f"{img.anchor._from.col}{img.anchor._from.row}" | |
| if hasattr(img.anchor, 'to'): | |
| anchor_info['to_cell'] = f"{img.anchor.to.col}{img.anchor.to.row}" | |
| extracted_image = ExtractedImage( | |
| image_id=image_id, | |
| filename=filename, | |
| content=image_data['content'], | |
| format=image_data['format'], | |
| width=image_data.get('width'), | |
| height=image_data.get('height'), | |
| extraction_method="openpyxl", | |
| metadata={ | |
| 'worksheet_name': sheet_name, | |
| 'image_index': img_index, | |
| 'size_bytes': len(image_data['content']), | |
| 'anchor_info': anchor_info | |
| } | |
| ) | |
| images.append(extracted_image) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract image {img_index} from worksheet {sheet_name}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.warning(f"Failed to extract images from worksheet {sheet_name}: {e}") | |
| return images | |
| def _extract_image_data(self, img: OpenpyxlImage) -> Optional[Dict[str, Any]]: | |
| """ | |
| Extract data from an openpyxl Image object. | |
| Args: | |
| img: openpyxl Image object | |
| Returns: | |
| Dictionary with image data or None if extraction fails | |
| """ | |
| try: | |
| # Get image data | |
| if hasattr(img, 'ref') and img.ref: | |
| # Image has reference to external file | |
| image_data = img.ref | |
| elif hasattr(img, '_data') and img._data: | |
| # Image data is embedded | |
| image_data = img._data() | |
| else: | |
| logger.warning("No image data found in image object") | |
| return None | |
| # Determine format | |
| image_format = "PNG" # Default | |
| if hasattr(img, 'format') and img.format: | |
| image_format = img.format.upper() | |
| # Try to get dimensions using PIL | |
| width, height = None, None | |
| try: | |
| with io.BytesIO(image_data) as img_buffer: | |
| pil_img = PILImage.open(img_buffer) | |
| width, height = pil_img.size | |
| except Exception as e: | |
| logger.debug(f"Could not determine image dimensions: {e}") | |
| return { | |
| 'content': image_data, | |
| 'format': image_format, | |
| 'width': width, | |
| 'height': height | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to extract image data: {e}") | |
| return None | |
| def _extract_worksheet_tables(self, worksheet: Worksheet, sheet_name: str) -> List[ExtractedTable]: | |
| """ | |
| Extract tables from an Excel worksheet. | |
| Args: | |
| worksheet: openpyxl Worksheet object | |
| sheet_name: Name of the worksheet | |
| Returns: | |
| List of ExtractedTable objects | |
| """ | |
| tables = [] | |
| try: | |
| # First, try to extract defined tables | |
| if hasattr(worksheet, 'tables') and worksheet.tables: | |
| for table_name, table in worksheet.tables.items(): | |
| try: | |
| extracted_table = self._extract_defined_table(table, sheet_name, len(tables)) | |
| if extracted_table: | |
| tables.append(extracted_table) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract defined table {table_name}: {e}") | |
| # If no defined tables found, try to detect tables from data | |
| if not tables and self.extract_tables: | |
| detected_tables = self._detect_data_tables(worksheet, sheet_name) | |
| tables.extend(detected_tables) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract tables from worksheet {sheet_name}: {e}") | |
| return tables | |
| def _extract_defined_table(self, table, sheet_name: str, table_index: int) -> Optional[ExtractedTable]: | |
| """ | |
| Extract a defined Excel table. | |
| Args: | |
| table: Excel table object | |
| sheet_name: Name of the worksheet | |
| table_index: Index of the table | |
| Returns: | |
| ExtractedTable object or None if extraction fails | |
| """ | |
| try: | |
| # Get table range | |
| table_range = table.ref | |
| # Parse range (e.g., "A1:C10") | |
| start_cell, end_cell = table_range.split(':') | |
| # Get table data from worksheet | |
| worksheet = table.parent | |
| table_data = [] | |
| for row in worksheet[table_range]: | |
| row_data = [] | |
| for cell in row: | |
| value = cell.value if cell.value is not None else "" | |
| row_data.append(str(value)) | |
| table_data.append(row_data) | |
| if not table_data: | |
| return None | |
| # First row is typically headers | |
| headers = table_data[0] if table_data else [] | |
| rows = table_data[1:] if len(table_data) > 1 else [] | |
| # Create table object | |
| table_id = f"{sheet_name}_table{table_index}" | |
| return ExtractedTable( | |
| table_id=table_id, | |
| headers=headers, | |
| rows=rows, | |
| worksheet_name=sheet_name, | |
| cell_range=table_range, | |
| extraction_confidence=0.9, # High confidence for defined tables | |
| metadata={ | |
| 'extraction_method': 'defined_table', | |
| 'table_index': table_index, | |
| 'table_name': getattr(table, 'name', '') | |
| } | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract defined table: {e}") | |
| return None | |
| def _detect_data_tables(self, worksheet: Worksheet, sheet_name: str) -> List[ExtractedTable]: | |
| """ | |
| Detect tables from worksheet data patterns. | |
| Args: | |
| worksheet: openpyxl Worksheet object | |
| sheet_name: Name of the worksheet | |
| Returns: | |
| List of detected ExtractedTable objects | |
| """ | |
| tables = [] | |
| try: | |
| if not worksheet.max_row or worksheet.max_row < self.min_table_rows: | |
| return tables | |
| # Simple table detection: look for contiguous data blocks | |
| data_blocks = self._find_data_blocks(worksheet) | |
| for block_index, data_block in enumerate(data_blocks): | |
| if len(data_block) >= self.min_table_rows and len(data_block[0]) >= self.min_table_cols: | |
| # Create table from data block | |
| headers = data_block[0] | |
| rows = data_block[1:] | |
| # Calculate cell range | |
| start_row = 1 # This is simplified - in reality would need to track actual positions | |
| end_row = start_row + len(data_block) - 1 | |
| start_col = 1 | |
| end_col = len(headers) | |
| cell_range = CellRange(start_row, start_col, end_row, end_col).to_excel_range() | |
| table_id = f"{sheet_name}_detected_table{block_index}" | |
| table = ExtractedTable( | |
| table_id=table_id, | |
| headers=headers, | |
| rows=rows, | |
| worksheet_name=sheet_name, | |
| cell_range=cell_range, | |
| extraction_confidence=0.7, # Lower confidence for detected tables | |
| metadata={ | |
| 'extraction_method': 'data_pattern_detection', | |
| 'table_index': block_index | |
| } | |
| ) | |
| tables.append(table) | |
| except Exception as e: | |
| logger.warning(f"Failed to detect data tables: {e}") | |
| return tables | |
| def _find_data_blocks(self, worksheet: Worksheet) -> List[List[List[str]]]: | |
| """ | |
| Find contiguous blocks of data in the worksheet. | |
| Args: | |
| worksheet: openpyxl Worksheet object | |
| Returns: | |
| List of data blocks, where each block is a list of rows | |
| """ | |
| data_blocks = [] | |
| try: | |
| current_block = [] | |
| empty_row_count = 0 | |
| for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row, | |
| min_col=1, max_col=worksheet.max_column, | |
| values_only=True): | |
| # Convert row to strings | |
| row_values = [] | |
| has_data = False | |
| for cell_value in row: | |
| if cell_value is not None: | |
| row_values.append(str(cell_value).strip()) | |
| if str(cell_value).strip(): | |
| has_data = True | |
| else: | |
| row_values.append("") | |
| if has_data: | |
| # Reset empty row count and add to current block | |
| empty_row_count = 0 | |
| current_block.append(row_values) | |
| else: | |
| # Empty row | |
| empty_row_count += 1 | |
| # If we've seen too many empty rows, end the current block | |
| if empty_row_count >= self.max_empty_rows and current_block: | |
| if len(current_block) >= self.min_table_rows: | |
| data_blocks.append(current_block) | |
| current_block = [] | |
| empty_row_count = 0 | |
| # Add final block if it exists | |
| if current_block and len(current_block) >= self.min_table_rows: | |
| data_blocks.append(current_block) | |
| except Exception as e: | |
| logger.warning(f"Failed to find data blocks: {e}") | |
| return data_blocks | |
| def _count_non_empty_cells(self, worksheet: Worksheet) -> int: | |
| """ | |
| Count non-empty cells in a worksheet. | |
| Args: | |
| worksheet: openpyxl Worksheet object | |
| Returns: | |
| Number of non-empty cells | |
| """ | |
| count = 0 | |
| try: | |
| for row in worksheet.iter_rows(values_only=True): | |
| for cell_value in row: | |
| if cell_value is not None and str(cell_value).strip(): | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"Failed to count non-empty cells: {e}") | |
| return count | |
| # Register the Excel processor | |
| DocumentProcessorFactory.register_processor(DocumentType.EXCEL, ExcelProcessor) | |
| if __name__=="__main__": | |
| logger.info(f"Start excel processing...") |