Spaces:
Sleeping
Sleeping
| """ A script which will deal with ingestion of new documents into the vector database. | |
| - Currently has file ingestion which supports txt, pdf, and md files. | |
| - Plan to add more file types in the future. | |
| - Plan to add web based ingestion in the future. | |
| - Now supports multimodal ingestion with image embeddings and OCR text extraction from images | |
| """ | |
| from typing import List | |
| from pathlib import Path | |
| import uuid | |
| from llm_system.utils.loader import load_file | |
| from llm_system.utils.splitter import split_text | |
| from llm_system.core.llm import get_embeddings | |
| from llm_system.core import colpali_embeddings | |
| from llm_system.core import colpali_storage | |
| # For type hinting | |
| from llm_system.core.qdrant_database import VectorDB | |
| from langchain_core.embeddings import Embeddings | |
| from langchain_core.documents import Document | |
| from PIL import Image | |
| # Try to import pytesseract for OCR | |
| try: | |
| import pytesseract | |
| PYTESSERACT_AVAILABLE = True | |
| except ImportError: | |
| PYTESSERACT_AVAILABLE = False | |
| from logger import get_logger | |
| log = get_logger(name="core_ingestion") | |
| def _extract_text_from_image(image_path: str) -> str: | |
| """Extract text from an image using OCR (Tesseract). | |
| Args: | |
| image_path: Path to the image file | |
| Returns: | |
| Extracted text from the image, or empty string if OCR fails or unavailable | |
| """ | |
| if not PYTESSERACT_AVAILABLE: | |
| return "" | |
| try: | |
| image = Image.open(image_path) | |
| # Extract text using Tesseract OCR | |
| text = pytesseract.image_to_string(image) | |
| if text.strip(): | |
| log.info(f"β Extracted {len(text)} characters of OCR text from {Path(image_path).name}") | |
| return text.strip() | |
| else: | |
| log.debug(f"No text found in image via OCR: {image_path}") | |
| return "" | |
| except Exception as e: | |
| log.warning(f"β οΈ Failed to extract OCR text from {image_path}: {e}") | |
| return "" | |
| def _create_image_documents(documents: List[Document], user_id: str) -> List[Document]: | |
| """Create Document objects for extracted images to be embedded. | |
| Extracts image references from document metadata and creates separate | |
| documents for each image that can be embedded with Jina multimodal embeddings. | |
| Automatically extracts text from images using OCR to enhance searchability. | |
| Only creates unique images (avoids duplicates when same images attached to multiple text chunks). | |
| Args: | |
| documents: List of documents that may contain extracted images in metadata | |
| user_id: User ID for tracking | |
| Returns: | |
| List of Document objects for images (empty if no images found) | |
| """ | |
| image_documents = [] | |
| seen_image_ids = set() # Track already-created images to avoid duplicates | |
| # Collect all unique images from all documents | |
| all_images = [] | |
| for doc in documents: | |
| if 'images' in doc.metadata and doc.metadata['images']: | |
| all_images.extend(doc.metadata['images']) | |
| # Create documents for unique images only | |
| for img in all_images: | |
| image_id = img.get('image_id', 'unknown') | |
| # Skip if we've already created a document for this image | |
| if image_id in seen_image_ids: | |
| continue | |
| seen_image_ids.add(image_id) | |
| # Extract text from image using OCR | |
| image_path = img.get('image_path') | |
| ocr_text = "" | |
| if image_path: | |
| ocr_text = _extract_text_from_image(str(image_path)) | |
| # Save OCR text to file alongside image (for auditing/reference) | |
| if ocr_text: | |
| try: | |
| from pathlib import Path | |
| ocr_file_path = Path(str(image_path)).with_suffix('.txt') | |
| ocr_file_path.write_text(ocr_text, encoding='utf-8') | |
| log.debug(f"Saved OCR text to: {ocr_file_path}") | |
| except Exception as e: | |
| log.warning(f"Could not save OCR text to file: {e}") | |
| # Create page_content with OCR text if available, otherwise minimal content | |
| if ocr_text: | |
| page_content = f"[IMAGE] {image_id}\n\nOCR Text:\n{ocr_text}" | |
| else: | |
| page_content = f"[IMAGE] {image_id}\n\nImage extracted from document. Use visual understanding to analyze." | |
| # Try to generate ColPali embeddings for visual understanding | |
| colpali_embedding = None | |
| colpali_embedding_summary = None | |
| if colpali_embeddings.is_colpali_available() and image_path: | |
| colpali_embedding = colpali_embeddings.embed_image(str(image_path)) | |
| # Store a summary (num_patches) for Qdrant persistence | |
| if colpali_embedding: | |
| colpali_embedding_summary = colpali_embedding.get('num_patches', 0) | |
| # Persist full embedding to disk for later retrieval | |
| colpali_storage.save_colpali_embedding(image_id, user_id, colpali_embedding) | |
| # Create a document for this image | |
| image_doc = Document( | |
| page_content=page_content, | |
| metadata={ | |
| 'user_id': user_id, | |
| 'type': 'image', | |
| 'image_id': image_id, | |
| 'image_path': str(image_path), # Ensure string for serialization | |
| 'page_number': img.get('page_number'), | |
| 'position': img.get('position'), | |
| 'source_pdf': img.get('metadata', {}).get('source_pdf'), | |
| 'extractor': img.get('metadata', {}).get('extractor'), | |
| 'has_ocr_text': bool(ocr_text), | |
| 'colpali_available': colpali_embedding_summary is not None, | |
| 'colpali_patches': colpali_embedding_summary or 0, # Number of patch embeddings | |
| } | |
| ) | |
| image_documents.append(image_doc) | |
| if image_documents: | |
| ocr_count = sum(1 for doc in image_documents if doc.metadata.get('has_ocr_text')) | |
| colpali_count = sum(1 for doc in image_documents if doc.metadata.get('colpali_available')) | |
| log.info(f"β Created {len(image_documents)} unique image documents ({ocr_count} with OCR, {colpali_count} with ColPali embeddings)") | |
| return image_documents | |
| def ingest_file(user_id: str, file_path: str, vectorstore: VectorDB, | |
| embeddings: Embeddings = None) -> tuple[bool, List[str], str]: | |
| """Ingest a file into the vector database with multimodal support. | |
| Ingests both text chunks and extracted images, using Jina embeddings | |
| if available for multimodal support. | |
| Returns the ids of vector embeddings stored in database. | |
| Args: | |
| user_id: User identifier for document tracking | |
| file_path (str): The absolute path to the file to be ingested. | |
| vectorstore (VectorDB): The vector database instance. | |
| embeddings (Embeddings): Optional embeddings model. If None, uses get_embeddings() factory. | |
| Returns: | |
| tuple[bool, List[str], str]: A tuple containing: | |
| - bool: True if ingestion was successful, False otherwise. | |
| - List[str]: List of document IDs that were ingested. | |
| - str: Message indicating the result of the ingestion. | |
| """ | |
| log.info(f"π [INGESTION START] user_id={user_id}, file_path={file_path}") | |
| # Use provided embeddings or get from factory (supports Jina multimodal) | |
| if embeddings is None: | |
| log.info("π Using embeddings from factory (will use Jina if configured)") | |
| embeddings = get_embeddings() | |
| # Load the file and get its content as Document objects: | |
| log.info(f"π [STEP 1] Loading file from: {file_path}") | |
| status, documents, message = load_file(user_id, file_path) | |
| log.info(f"π [STEP 1 RESULT] status={status}, docs_count={len(documents) if documents else 0}, message={message}") | |
| if not status: | |
| log.error(f"β [STEP 1 FAILED] load_file returned False: {message}") | |
| return False, [], message | |
| # Split the documents into smaller chunks: | |
| log.info(f"βοΈ [STEP 2] Splitting {len(documents)} documents into chunks") | |
| status, split_docs, message = split_text(documents) | |
| log.info(f"βοΈ [STEP 2 RESULT] status={status}, chunks_count={len(split_docs) if split_docs else 0}, message={message}") | |
| if status and not split_docs: | |
| log.warning(f"β οΈ [STEP 2 WARNING] No content found in the file: {file_path}") | |
| return True, [], f"No content found in the file: {file_path}" | |
| if not status: | |
| log.error(f"β [STEP 2 FAILED] split_text returned False: {message}") | |
| return False, [], message | |
| # Create image documents for multimodal embedding | |
| log.info(f"πΌοΈ [STEP 2.5] Extracting image documents for multimodal embedding") | |
| image_docs = _create_image_documents(documents, user_id) | |
| # Combine text chunks and image documents | |
| all_docs_to_embed = split_docs + image_docs | |
| total_docs_count = len(all_docs_to_embed) | |
| log.info(f"π¦ [STEP 2.5 RESULT] text_chunks={len(split_docs)}, image_docs={len(image_docs)}, total={total_docs_count}") | |
| # Add the documents to the vector database: | |
| log.info(f"πΎ [STEP 3] Adding {total_docs_count} documents (text + images) to vector database") | |
| try: | |
| doc_ids = vectorstore.db.add_documents(all_docs_to_embed) | |
| log.info(f"πΎ [STEP 3 RESULT] doc_ids={len(doc_ids) if doc_ids else 0}") | |
| log.info(f"πΏ [STEP 4] Saving vector database to disk") | |
| if vectorstore.save_db_to_disk(): | |
| log.info(f"β [INGESTION SUCCESS] Ingested {total_docs_count} items ({len(split_docs)} text chunks + {len(image_docs)} images) from {file_path}") | |
| return True, doc_ids, f"Ingested {total_docs_count} items successfully ({len(split_docs)} text chunks + {len(image_docs)} images)." | |
| else: | |
| log.error("β [STEP 4 FAILED] Failed to save the vector database to disk after ingestion.") | |
| return False, [], "Failed to save the vector database to disk after ingestion." | |
| except Exception as e: | |
| log.error(f"β [STEP 3 EXCEPTION] Failed to add documents to vector database: {e}") | |
| import traceback | |
| log.error(f"Traceback: {traceback.format_exc()}") | |
| return False, [], f"Failed to ingest documents: {e}" | |
| if __name__ == "__main__": | |
| from dotenv import load_dotenv | |
| from langchain.callbacks.tracers.langchain import wait_for_all_tracers | |
| load_dotenv() | |
| # Example usage | |
| user = "test_user" | |
| # example_file_path = "../../../GenAI/Data/attention_is_all_you_need_1706.03762v7.pdf" | |
| example_file_path="/Users/neetikasaxena/Documents/sanchit/sample_code/chat-with-your-data/test_data/resume_sanchit_imo_health.pdf" | |
| # example_file_path = "../../../GenAI/Data/speech.md" | |
| vector_db = VectorDB(embed_model="mxbai-embed-large:latest", persist_path=None) | |
| # Ingest with multimodal support (embeddings auto-selected from factory) | |
| status, doc_ids, message = ingest_file(user, example_file_path, vector_db) | |
| if status: | |
| print(doc_ids) | |
| else: | |
| print(f"Error: {message}") | |
| # Retrieve the documents to verify ingestion: | |
| relevant_docs= vector_db.retriever.invoke( | |
| input="what contribution did Sanchit make for the retail pharmacy client?", | |
| filter={"user_id": user}, | |
| k=2, | |
| search_type="similarity" | |
| ) | |
| print("\n\n.............................\n\n") | |
| # Print the retrieved documents | |
| print(f"Query 1") | |
| for i, doc in enumerate(relevant_docs): | |
| print(f"\nChunk {i+1}:") | |
| print(doc.page_content) | |
| print("\n\n.............................\n\n") | |
| print(f"Metadata: {doc.metadata}") | |
| print("\n\n.............................\n\n") | |
| relevant_docs= vector_db.retriever.invoke( | |
| input="what was the impact for the tech client?", | |
| filter={"user_id": "random"}, | |
| k=2, | |
| search_type="similarity" | |
| ) | |
| print("\n\n.............................\n\n") | |
| # Retrieve the documents to verify ingestion: | |
| relevant_docs= vector_db.retriever.invoke( | |
| input="what did Sanchit do during his employment with Samsung ?", | |
| filter={"user_id": user}, | |
| k=2, | |
| search_type="similarity" | |
| ) | |
| # Print the retrieved documents | |
| print(f"Query 2") | |
| for i, doc in enumerate(relevant_docs): | |
| print(f"\nChunk {i+1}:") | |
| print(doc.page_content) | |
| print("\n\n.............................\n\n") | |
| print(f"Metadata: {doc.metadata}") | |
| wait_for_all_tracers() | |