File size: 12,646 Bytes
4aec76b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
""" A script which will deal with ingestion of new documents into the vector database.
- Currently has file ingestion which supports txt, pdf, and md files.
- Plan to add more file types in the future.
- Plan to add web based ingestion in the future.
- Now supports multimodal ingestion with image embeddings and OCR text extraction from images
"""

from typing import List
from pathlib import Path
import uuid

from llm_system.utils.loader import load_file
from llm_system.utils.splitter import split_text
from llm_system.core.llm import get_embeddings
from llm_system.core import colpali_embeddings
from llm_system.core import colpali_storage

# For type hinting
from llm_system.core.qdrant_database import VectorDB
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
from PIL import Image

# Try to import pytesseract for OCR
try:
    import pytesseract
    PYTESSERACT_AVAILABLE = True
except ImportError:
    PYTESSERACT_AVAILABLE = False

from logger import get_logger
log = get_logger(name="core_ingestion")


def _extract_text_from_image(image_path: str) -> str:
    """Extract text from an image using OCR (Tesseract).
    
    Args:
        image_path: Path to the image file
        
    Returns:
        Extracted text from the image, or empty string if OCR fails or unavailable
    """
    if not PYTESSERACT_AVAILABLE:
        return ""
    
    try:
        image = Image.open(image_path)
        # Extract text using Tesseract OCR
        text = pytesseract.image_to_string(image)
        if text.strip():
            log.info(f"βœ… Extracted {len(text)} characters of OCR text from {Path(image_path).name}")
            return text.strip()
        else:
            log.debug(f"No text found in image via OCR: {image_path}")
            return ""
    except Exception as e:
        log.warning(f"⚠️ Failed to extract OCR text from {image_path}: {e}")
        return ""


def _create_image_documents(documents: List[Document], user_id: str) -> List[Document]:
    """Create Document objects for extracted images to be embedded.
    
    Extracts image references from document metadata and creates separate
    documents for each image that can be embedded with Jina multimodal embeddings.
    Automatically extracts text from images using OCR to enhance searchability.
    Only creates unique images (avoids duplicates when same images attached to multiple text chunks).
    
    Args:
        documents: List of documents that may contain extracted images in metadata
        user_id: User ID for tracking
        
    Returns:
        List of Document objects for images (empty if no images found)
    """
    image_documents = []
    seen_image_ids = set()  # Track already-created images to avoid duplicates
    
    # Collect all unique images from all documents
    all_images = []
    for doc in documents:
        if 'images' in doc.metadata and doc.metadata['images']:
            all_images.extend(doc.metadata['images'])
    
    # Create documents for unique images only
    for img in all_images:
        image_id = img.get('image_id', 'unknown')
        
        # Skip if we've already created a document for this image
        if image_id in seen_image_ids:
            continue
        seen_image_ids.add(image_id)
        
        # Extract text from image using OCR
        image_path = img.get('image_path')
        ocr_text = ""
        if image_path:
            ocr_text = _extract_text_from_image(str(image_path))
            
            # Save OCR text to file alongside image (for auditing/reference)
            if ocr_text:
                try:
                    from pathlib import Path
                    ocr_file_path = Path(str(image_path)).with_suffix('.txt')
                    ocr_file_path.write_text(ocr_text, encoding='utf-8')
                    log.debug(f"Saved OCR text to: {ocr_file_path}")
                except Exception as e:
                    log.warning(f"Could not save OCR text to file: {e}")
        
        # Create page_content with OCR text if available, otherwise minimal content
        if ocr_text:
            page_content = f"[IMAGE] {image_id}\n\nOCR Text:\n{ocr_text}"
        else:
            page_content = f"[IMAGE] {image_id}\n\nImage extracted from document. Use visual understanding to analyze."
        
        # Try to generate ColPali embeddings for visual understanding
        colpali_embedding = None
        colpali_embedding_summary = None
        if colpali_embeddings.is_colpali_available() and image_path:
            colpali_embedding = colpali_embeddings.embed_image(str(image_path))
            # Store a summary (num_patches) for Qdrant persistence
            if colpali_embedding:
                colpali_embedding_summary = colpali_embedding.get('num_patches', 0)
                # Persist full embedding to disk for later retrieval
                colpali_storage.save_colpali_embedding(image_id, user_id, colpali_embedding)
        
        # Create a document for this image
        image_doc = Document(
            page_content=page_content,
            metadata={
                'user_id': user_id,
                'type': 'image',
                'image_id': image_id,
                'image_path': str(image_path),  # Ensure string for serialization
                'page_number': img.get('page_number'),
                'position': img.get('position'),
                'source_pdf': img.get('metadata', {}).get('source_pdf'),
                'extractor': img.get('metadata', {}).get('extractor'),
                'has_ocr_text': bool(ocr_text),
                'colpali_available': colpali_embedding_summary is not None,
                'colpali_patches': colpali_embedding_summary or 0,  # Number of patch embeddings
            }
        )
        image_documents.append(image_doc)
    
    if image_documents:
        ocr_count = sum(1 for doc in image_documents if doc.metadata.get('has_ocr_text'))
        colpali_count = sum(1 for doc in image_documents if doc.metadata.get('colpali_available'))
        log.info(f"βœ… Created {len(image_documents)} unique image documents ({ocr_count} with OCR, {colpali_count} with ColPali embeddings)")
    
    return image_documents


def ingest_file(user_id: str, file_path: str, vectorstore: VectorDB,
                embeddings: Embeddings = None) -> tuple[bool, List[str], str]:
    """Ingest a file into the vector database with multimodal support.
    
    Ingests both text chunks and extracted images, using Jina embeddings
    if available for multimodal support.
    
    Returns the ids of vector embeddings stored in database.

    Args:
        user_id: User identifier for document tracking
        file_path (str): The absolute path to the file to be ingested.
        vectorstore (VectorDB): The vector database instance.
        embeddings (Embeddings): Optional embeddings model. If None, uses get_embeddings() factory.

    Returns:
        tuple[bool, List[str], str]: A tuple containing:
            - bool: True if ingestion was successful, False otherwise.
            - List[str]: List of document IDs that were ingested.
            - str: Message indicating the result of the ingestion.
    """

    log.info(f"πŸ”„ [INGESTION START] user_id={user_id}, file_path={file_path}")
    
    # Use provided embeddings or get from factory (supports Jina multimodal)
    if embeddings is None:
        log.info("πŸ“Š Using embeddings from factory (will use Jina if configured)")
        embeddings = get_embeddings()

    # Load the file and get its content as Document objects:
    log.info(f"πŸ“‚ [STEP 1] Loading file from: {file_path}")
    status, documents, message = load_file(user_id, file_path)
    log.info(f"πŸ“‚ [STEP 1 RESULT] status={status}, docs_count={len(documents) if documents else 0}, message={message}")

    if not status:
        log.error(f"❌ [STEP 1 FAILED] load_file returned False: {message}")
        return False, [], message

    # Split the documents into smaller chunks:
    log.info(f"βœ‚οΈ [STEP 2] Splitting {len(documents)} documents into chunks")
    status, split_docs, message = split_text(documents)
    log.info(f"βœ‚οΈ [STEP 2 RESULT] status={status}, chunks_count={len(split_docs) if split_docs else 0}, message={message}")
    
    if status and not split_docs:
        log.warning(f"⚠️ [STEP 2 WARNING] No content found in the file: {file_path}")
        return True, [], f"No content found in the file: {file_path}"

    if not status:
        log.error(f"❌ [STEP 2 FAILED] split_text returned False: {message}")
        return False, [], message

    # Create image documents for multimodal embedding
    log.info(f"πŸ–ΌοΈ  [STEP 2.5] Extracting image documents for multimodal embedding")
    image_docs = _create_image_documents(documents, user_id)
    
    # Combine text chunks and image documents
    all_docs_to_embed = split_docs + image_docs
    total_docs_count = len(all_docs_to_embed)
    
    log.info(f"πŸ“¦ [STEP 2.5 RESULT] text_chunks={len(split_docs)}, image_docs={len(image_docs)}, total={total_docs_count}")

    # Add the documents to the vector database:
    log.info(f"πŸ’Ύ [STEP 3] Adding {total_docs_count} documents (text + images) to vector database")
    try:
        doc_ids = vectorstore.db.add_documents(all_docs_to_embed)
        log.info(f"πŸ’Ύ [STEP 3 RESULT] doc_ids={len(doc_ids) if doc_ids else 0}")
        
        log.info(f"πŸ’Ώ [STEP 4] Saving vector database to disk")
        if vectorstore.save_db_to_disk():
            log.info(f"βœ… [INGESTION SUCCESS] Ingested {total_docs_count} items ({len(split_docs)} text chunks + {len(image_docs)} images) from {file_path}")
            return True, doc_ids, f"Ingested {total_docs_count} items successfully ({len(split_docs)} text chunks + {len(image_docs)} images)."
        else:
            log.error("❌ [STEP 4 FAILED] Failed to save the vector database to disk after ingestion.")
            return False, [], "Failed to save the vector database to disk after ingestion."
            
    except Exception as e:
        log.error(f"❌ [STEP 3 EXCEPTION] Failed to add documents to vector database: {e}")
        import traceback
        log.error(f"Traceback: {traceback.format_exc()}")
        return False, [], f"Failed to ingest documents: {e}"


if __name__ == "__main__":
    from dotenv import load_dotenv
    from langchain.callbacks.tracers.langchain import wait_for_all_tracers
    load_dotenv()

    # Example usage
    user = "test_user"
    # example_file_path = "../../../GenAI/Data/attention_is_all_you_need_1706.03762v7.pdf"
    example_file_path="/Users/neetikasaxena/Documents/sanchit/sample_code/chat-with-your-data/test_data/resume_sanchit_imo_health.pdf"
    # example_file_path = "../../../GenAI/Data/speech.md"

    vector_db = VectorDB(embed_model="mxbai-embed-large:latest", persist_path=None)

    # Ingest with multimodal support (embeddings auto-selected from factory)
    status, doc_ids, message = ingest_file(user, example_file_path, vector_db)
    if status:
        print(doc_ids)
    else:
        print(f"Error: {message}")

    # Retrieve the documents to verify ingestion:
    relevant_docs= vector_db.retriever.invoke(
            input="what contribution did Sanchit make for the retail pharmacy client?",
            filter={"user_id": user},
            k=2,
            search_type="similarity"
        )
    
    print("\n\n.............................\n\n")
    # Print the retrieved documents
    print(f"Query 1")
    for i, doc in enumerate(relevant_docs):
        print(f"\nChunk {i+1}:")
        print(doc.page_content)
        print("\n\n.............................\n\n")
        print(f"Metadata: {doc.metadata}")

    print("\n\n.............................\n\n")

    relevant_docs= vector_db.retriever.invoke(
            input="what was the impact for the tech client?",
            filter={"user_id": "random"},
            k=2,
            search_type="similarity"
        )

    print("\n\n.............................\n\n")


    # Retrieve the documents to verify ingestion:
    relevant_docs= vector_db.retriever.invoke(
            input="what did Sanchit do during his employment with Samsung ?",
            filter={"user_id": user},
            k=2,
            search_type="similarity"
        )

    # Print the retrieved documents
    print(f"Query 2")
    for i, doc in enumerate(relevant_docs):
        print(f"\nChunk {i+1}:")
        print(doc.page_content)
        print("\n\n.............................\n\n")
        print(f"Metadata: {doc.metadata}")

    wait_for_all_tracers()