File size: 12,150 Bytes
7dfe46c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365

import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
from enum import Enum
import hashlib
import sys 
import os 



sys.path.append(os.path.dirname(os.path.abspath(__file__)))  # Ensure current directory is in

try:
    from logger.custom_logger import CustomLoggerTracker
    custom_log = CustomLoggerTracker()
    logger = custom_log.get_logger("documents_processor")

except ImportError:
    # Fallback to standard logging if custom logger not available
    logger = logging.getLogger("documents_processor")



class DocumentType(Enum):
    """Supported document types."""
    PDF = "pdf"
    EXCEL = "excel"
    IMAGE = "image"
    UNKNOWN = "unknown"


class ProcessingStatus(Enum):
    """Document processing status."""
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    PARTIAL = "partial"


@dataclass
class ExtractedImage:
    """Represents an image extracted from a document."""
    image_id: str
    filename: str
    content: bytes
    format: str  # PNG, JPEG, etc.
    width: Optional[int] = None
    height: Optional[int] = None
    ocr_text: Optional[str] = None
    ocr_confidence: Optional[float] = None
    extraction_method: str = "unknown"
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class ExtractedTable:
    """Represents a table extracted from a document."""
    table_id: str
    headers: List[str]
    rows: List[List[str]]
    page_number: Optional[int] = None
    worksheet_name: Optional[str] = None
    cell_range: Optional[str] = None
    extraction_confidence: Optional[float] = None
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class ChunkMetadata:
    """Metadata for a document chunk."""
    chunk_id: str
    document_id: str
    chunk_index: int
    page_number: Optional[int] = None
    worksheet_name: Optional[str] = None
    cell_range: Optional[str] = None
    section_title: Optional[str] = None
    image_references: List[str] = field(default_factory=list)
    table_references: List[str] = field(default_factory=list)
    extraction_timestamp: datetime = field(default_factory=datetime.now)
    confidence_score: Optional[float] = None


@dataclass
class DocumentChunk:
    content: str
    metadata: ChunkMetadata
    embedding: Optional[List[float]] = None
    
    def __post_init__(self):
        """Validate chunk content after initialization."""
        if not self.content.strip():
            logger.warning(f"Empty content in chunk {self.metadata.chunk_id}")
        
        if len(self.content) > 10000:  # Warn for very large chunks
            logger.warning(f"Large chunk detected ({len(self.content)} chars): {self.metadata.chunk_id}")


@dataclass
class ProcessedDocument:
    """Represents a fully processed document with all extracted content."""
    document_id: str
    filename: str
    file_path: str
    document_type: DocumentType
    content: str
    metadata: Dict[str, Any]
    images: List[ExtractedImage] = field(default_factory=list)
    tables: List[ExtractedTable] = field(default_factory=list)
    processing_status: ProcessingStatus = ProcessingStatus.PENDING
    processing_timestamp: datetime = field(default_factory=datetime.now)
    file_size: int = 0
    checksum: str = ""
    error_message: Optional[str] = None
    
    def __post_init__(self):
        """Generate checksum and validate document after initialization."""
        if not self.checksum and Path(self.file_path).exists():
            self.checksum = self._generate_checksum()
            self.file_size = Path(self.file_path).stat().st_size
    
    def _generate_checksum(self) -> str:
        try:
            hash_md5 = hashlib.md5()
            with open(self.file_path, "rb") as f:
                for chunk in iter(lambda: f.read(1024), b""):
                    hash_md5.update(chunk)
            return hash_md5.hexdigest()
        except Exception as e:
            logger.error(f"Failed to generate checksum for {self.file_path}: {e}")
            return ""


class DocumentProcessingError(Exception):
    """Base exception for document processing errors."""
    
    def __init__(self, file_path: str, error_type: str, details: str):
        self.file_path = file_path
        self.error_type = error_type
        self.details = details
        super().__init__(f"Document processing error in {file_path}: {error_type} - {details}")


class UnsupportedDocumentTypeError(DocumentProcessingError):
    def __init__(self, file_path: str, detected_type: str):
        super().__init__(
            file_path, 
            "UnsupportedDocumentType", 
            f"Document type '{detected_type}' is not supported"
        )


class DocumentProcessor(ABC):
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.supported_extensions = self._get_supported_extensions()
        logger.info(f"Initialized {self.__class__.__name__} with config: {config}")
    
    @abstractmethod
    def _get_supported_extensions(self) -> List[str]:
        pass
    
    
    @abstractmethod
    def process_document(self, file_path: str) -> ProcessedDocument:
        
        pass
    
    def can_process(self, file_path: str) -> bool:
        file_extension = Path(file_path).suffix.lower()
        return file_extension in self.supported_extensions
    
    
    def extract_chunks(self, document: ProcessedDocument, chunk_size: int = 512, 
                      chunk_overlap: int = 50) -> List[DocumentChunk]:
        if not document.content.strip():
            logger.warning(f"No content to chunk in document {document.document_id}")
            return []
        
        chunks = []
        content = document.content
        start = 0
        chunk_index = 0
        
        while start < len(content):
            # Calculate end position
            end = min(start + chunk_size, len(content))
            
            # Try to break at word boundary if not at end of content
            if end < len(content):
                # Look for the last space within the chunk
                last_space = content.rfind(' ', start, end)
                if last_space > start:
                    end = last_space
            
            # Extract chunk content
            chunk_content = content[start:end].strip()
            
            if chunk_content:  # Only create chunk if it has content
                # Create chunk metadata
                metadata = ChunkMetadata(
                    chunk_id=f"{document.document_id}_chunk_{chunk_index}",
                    document_id=document.document_id,
                    chunk_index=chunk_index
                )
                
                # Create chunk
                chunk = DocumentChunk(
                    content=chunk_content,
                    metadata=metadata
                )
                
                chunks.append(chunk)
                chunk_index += 1
            
            # Move start position with overlap
            start = max(end - chunk_overlap, start + 1)
            
            # Prevent infinite loop
            if start >= end:
                break
        
        logger.info(f"Created {len(chunks)} chunks from document {document.document_id}")
        return chunks
    
    def _detect_document_type(self, file_path: str) -> DocumentType:
        extension = Path(file_path).suffix.lower()
        
        if extension == '.pdf':
            return DocumentType.PDF
        elif extension in ['.xlsx', '.xls', '.xlsm']:
            return DocumentType.EXCEL
        elif extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff']:
            return DocumentType.IMAGE
        else:
            return DocumentType.UNKNOWN
    
    def _generate_document_id(self, file_path: str) -> str:
        """
        Generate a unique document ID based on file path and timestamp.
        
        Args:
            file_path: Path to the document file
            
        Returns:
            Unique document ID string
        """
        file_name = Path(file_path).name
        timestamp = datetime.now().isoformat()
        content = f"{file_name}_{timestamp}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def validate_file(self, file_path: str) -> None:
        """
        Validate that a file exists and can be processed.
        
        Args:
            file_path: Path to the file to validate
            
        Raises:
            DocumentProcessingError: If file validation fails
        """
        file_path_obj = Path(file_path)
        
        if not file_path_obj.exists():
            raise DocumentProcessingError(
                file_path, 
                "FileNotFound", 
                f"File does not exist: {file_path}"
            )
        
        if not file_path_obj.is_file():
            raise DocumentProcessingError(
                file_path, 
                "NotAFile", 
                f"Path is not a file: {file_path}"
            )
        
        # Check file size
        max_size = self.config.get('max_file_size_mb', 100) * 1024 * 1024  # Convert to bytes
        file_size = file_path_obj.stat().st_size
        
        if file_size > max_size:
            raise DocumentProcessingError(
                file_path, 
                "FileTooLarge", 
                f"File size ({file_size} bytes) exceeds maximum allowed size ({max_size} bytes)"
            )
        
        if not self.can_process(file_path):
            detected_type = self._detect_document_type(file_path)
            raise UnsupportedDocumentTypeError(file_path, detected_type.value)
        
        logger.debug(f"File validation passed for: {file_path}")


class DocumentProcessorFactory:
    """Factory class for creating appropriate document processors."""
    
    _processors = {}
    
    @classmethod
    def register_processor(cls, document_type: DocumentType, processor_class):
        """Register a processor class for a document type."""
        cls._processors[document_type] = processor_class
        logger.info(f"Registered processor {processor_class.__name__} for type {document_type.value}")
    
    @classmethod
    def create_processor(cls, file_path: str, config: Dict[str, Any]) -> DocumentProcessor:
        """
        Create appropriate processor for the given file.
        
        Args:
            file_path: Path to the file to process
            config: Configuration dictionary
            
        Returns:
            DocumentProcessor instance
            
        Raises:
            UnsupportedDocumentTypeError: If no processor is available for the file type
        """
        # Detect document type
        extension = Path(file_path).suffix.lower()
        
        if extension == '.pdf':
            document_type = DocumentType.PDF
        elif extension in ['.xlsx', '.xls', '.xlsm']:
            document_type = DocumentType.EXCEL
        elif extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff']:
            document_type = DocumentType.IMAGE
        else:
            document_type = DocumentType.UNKNOWN
        
        # Get processor class
        processor_class = cls._processors.get(document_type)
        if not processor_class:
            raise UnsupportedDocumentTypeError(file_path, document_type.value)
        
        # Create and return processor instance
        return processor_class(config)
    
    @classmethod
    def get_supported_types(cls) -> List[DocumentType]:
        """Get list of supported document types."""
        return list(cls._processors.keys())




if __name__=="__main__":
    logger.info(f"Docs processor init ..")
    # Example usage (for testing purposes)
    config = {'max_file_size_mb': 50}
    processor = DocumentProcessorFactory.create_processor("example.pdf", config)
    processed_doc = processor.process_document("example.pdf")
    chunks = processor.extract_chunks(processed_doc)
    for chunk in chunks:
        print(chunk)