File size: 12,646 Bytes
4aec76b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
""" A script which will deal with ingestion of new documents into the vector database.
- Currently has file ingestion which supports txt, pdf, and md files.
- Plan to add more file types in the future.
- Plan to add web based ingestion in the future.
- Now supports multimodal ingestion with image embeddings and OCR text extraction from images
"""
from typing import List
from pathlib import Path
import uuid
from llm_system.utils.loader import load_file
from llm_system.utils.splitter import split_text
from llm_system.core.llm import get_embeddings
from llm_system.core import colpali_embeddings
from llm_system.core import colpali_storage
# For type hinting
from llm_system.core.qdrant_database import VectorDB
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
from PIL import Image
# Try to import pytesseract for OCR
try:
import pytesseract
PYTESSERACT_AVAILABLE = True
except ImportError:
PYTESSERACT_AVAILABLE = False
from logger import get_logger
log = get_logger(name="core_ingestion")
def _extract_text_from_image(image_path: str) -> str:
"""Extract text from an image using OCR (Tesseract).
Args:
image_path: Path to the image file
Returns:
Extracted text from the image, or empty string if OCR fails or unavailable
"""
if not PYTESSERACT_AVAILABLE:
return ""
try:
image = Image.open(image_path)
# Extract text using Tesseract OCR
text = pytesseract.image_to_string(image)
if text.strip():
log.info(f"β
Extracted {len(text)} characters of OCR text from {Path(image_path).name}")
return text.strip()
else:
log.debug(f"No text found in image via OCR: {image_path}")
return ""
except Exception as e:
log.warning(f"β οΈ Failed to extract OCR text from {image_path}: {e}")
return ""
def _create_image_documents(documents: List[Document], user_id: str) -> List[Document]:
"""Create Document objects for extracted images to be embedded.
Extracts image references from document metadata and creates separate
documents for each image that can be embedded with Jina multimodal embeddings.
Automatically extracts text from images using OCR to enhance searchability.
Only creates unique images (avoids duplicates when same images attached to multiple text chunks).
Args:
documents: List of documents that may contain extracted images in metadata
user_id: User ID for tracking
Returns:
List of Document objects for images (empty if no images found)
"""
image_documents = []
seen_image_ids = set() # Track already-created images to avoid duplicates
# Collect all unique images from all documents
all_images = []
for doc in documents:
if 'images' in doc.metadata and doc.metadata['images']:
all_images.extend(doc.metadata['images'])
# Create documents for unique images only
for img in all_images:
image_id = img.get('image_id', 'unknown')
# Skip if we've already created a document for this image
if image_id in seen_image_ids:
continue
seen_image_ids.add(image_id)
# Extract text from image using OCR
image_path = img.get('image_path')
ocr_text = ""
if image_path:
ocr_text = _extract_text_from_image(str(image_path))
# Save OCR text to file alongside image (for auditing/reference)
if ocr_text:
try:
from pathlib import Path
ocr_file_path = Path(str(image_path)).with_suffix('.txt')
ocr_file_path.write_text(ocr_text, encoding='utf-8')
log.debug(f"Saved OCR text to: {ocr_file_path}")
except Exception as e:
log.warning(f"Could not save OCR text to file: {e}")
# Create page_content with OCR text if available, otherwise minimal content
if ocr_text:
page_content = f"[IMAGE] {image_id}\n\nOCR Text:\n{ocr_text}"
else:
page_content = f"[IMAGE] {image_id}\n\nImage extracted from document. Use visual understanding to analyze."
# Try to generate ColPali embeddings for visual understanding
colpali_embedding = None
colpali_embedding_summary = None
if colpali_embeddings.is_colpali_available() and image_path:
colpali_embedding = colpali_embeddings.embed_image(str(image_path))
# Store a summary (num_patches) for Qdrant persistence
if colpali_embedding:
colpali_embedding_summary = colpali_embedding.get('num_patches', 0)
# Persist full embedding to disk for later retrieval
colpali_storage.save_colpali_embedding(image_id, user_id, colpali_embedding)
# Create a document for this image
image_doc = Document(
page_content=page_content,
metadata={
'user_id': user_id,
'type': 'image',
'image_id': image_id,
'image_path': str(image_path), # Ensure string for serialization
'page_number': img.get('page_number'),
'position': img.get('position'),
'source_pdf': img.get('metadata', {}).get('source_pdf'),
'extractor': img.get('metadata', {}).get('extractor'),
'has_ocr_text': bool(ocr_text),
'colpali_available': colpali_embedding_summary is not None,
'colpali_patches': colpali_embedding_summary or 0, # Number of patch embeddings
}
)
image_documents.append(image_doc)
if image_documents:
ocr_count = sum(1 for doc in image_documents if doc.metadata.get('has_ocr_text'))
colpali_count = sum(1 for doc in image_documents if doc.metadata.get('colpali_available'))
log.info(f"β
Created {len(image_documents)} unique image documents ({ocr_count} with OCR, {colpali_count} with ColPali embeddings)")
return image_documents
def ingest_file(user_id: str, file_path: str, vectorstore: VectorDB,
embeddings: Embeddings = None) -> tuple[bool, List[str], str]:
"""Ingest a file into the vector database with multimodal support.
Ingests both text chunks and extracted images, using Jina embeddings
if available for multimodal support.
Returns the ids of vector embeddings stored in database.
Args:
user_id: User identifier for document tracking
file_path (str): The absolute path to the file to be ingested.
vectorstore (VectorDB): The vector database instance.
embeddings (Embeddings): Optional embeddings model. If None, uses get_embeddings() factory.
Returns:
tuple[bool, List[str], str]: A tuple containing:
- bool: True if ingestion was successful, False otherwise.
- List[str]: List of document IDs that were ingested.
- str: Message indicating the result of the ingestion.
"""
log.info(f"π [INGESTION START] user_id={user_id}, file_path={file_path}")
# Use provided embeddings or get from factory (supports Jina multimodal)
if embeddings is None:
log.info("π Using embeddings from factory (will use Jina if configured)")
embeddings = get_embeddings()
# Load the file and get its content as Document objects:
log.info(f"π [STEP 1] Loading file from: {file_path}")
status, documents, message = load_file(user_id, file_path)
log.info(f"π [STEP 1 RESULT] status={status}, docs_count={len(documents) if documents else 0}, message={message}")
if not status:
log.error(f"β [STEP 1 FAILED] load_file returned False: {message}")
return False, [], message
# Split the documents into smaller chunks:
log.info(f"βοΈ [STEP 2] Splitting {len(documents)} documents into chunks")
status, split_docs, message = split_text(documents)
log.info(f"βοΈ [STEP 2 RESULT] status={status}, chunks_count={len(split_docs) if split_docs else 0}, message={message}")
if status and not split_docs:
log.warning(f"β οΈ [STEP 2 WARNING] No content found in the file: {file_path}")
return True, [], f"No content found in the file: {file_path}"
if not status:
log.error(f"β [STEP 2 FAILED] split_text returned False: {message}")
return False, [], message
# Create image documents for multimodal embedding
log.info(f"πΌοΈ [STEP 2.5] Extracting image documents for multimodal embedding")
image_docs = _create_image_documents(documents, user_id)
# Combine text chunks and image documents
all_docs_to_embed = split_docs + image_docs
total_docs_count = len(all_docs_to_embed)
log.info(f"π¦ [STEP 2.5 RESULT] text_chunks={len(split_docs)}, image_docs={len(image_docs)}, total={total_docs_count}")
# Add the documents to the vector database:
log.info(f"πΎ [STEP 3] Adding {total_docs_count} documents (text + images) to vector database")
try:
doc_ids = vectorstore.db.add_documents(all_docs_to_embed)
log.info(f"πΎ [STEP 3 RESULT] doc_ids={len(doc_ids) if doc_ids else 0}")
log.info(f"πΏ [STEP 4] Saving vector database to disk")
if vectorstore.save_db_to_disk():
log.info(f"β
[INGESTION SUCCESS] Ingested {total_docs_count} items ({len(split_docs)} text chunks + {len(image_docs)} images) from {file_path}")
return True, doc_ids, f"Ingested {total_docs_count} items successfully ({len(split_docs)} text chunks + {len(image_docs)} images)."
else:
log.error("β [STEP 4 FAILED] Failed to save the vector database to disk after ingestion.")
return False, [], "Failed to save the vector database to disk after ingestion."
except Exception as e:
log.error(f"β [STEP 3 EXCEPTION] Failed to add documents to vector database: {e}")
import traceback
log.error(f"Traceback: {traceback.format_exc()}")
return False, [], f"Failed to ingest documents: {e}"
if __name__ == "__main__":
from dotenv import load_dotenv
from langchain.callbacks.tracers.langchain import wait_for_all_tracers
load_dotenv()
# Example usage
user = "test_user"
# example_file_path = "../../../GenAI/Data/attention_is_all_you_need_1706.03762v7.pdf"
example_file_path="/Users/neetikasaxena/Documents/sanchit/sample_code/chat-with-your-data/test_data/resume_sanchit_imo_health.pdf"
# example_file_path = "../../../GenAI/Data/speech.md"
vector_db = VectorDB(embed_model="mxbai-embed-large:latest", persist_path=None)
# Ingest with multimodal support (embeddings auto-selected from factory)
status, doc_ids, message = ingest_file(user, example_file_path, vector_db)
if status:
print(doc_ids)
else:
print(f"Error: {message}")
# Retrieve the documents to verify ingestion:
relevant_docs= vector_db.retriever.invoke(
input="what contribution did Sanchit make for the retail pharmacy client?",
filter={"user_id": user},
k=2,
search_type="similarity"
)
print("\n\n.............................\n\n")
# Print the retrieved documents
print(f"Query 1")
for i, doc in enumerate(relevant_docs):
print(f"\nChunk {i+1}:")
print(doc.page_content)
print("\n\n.............................\n\n")
print(f"Metadata: {doc.metadata}")
print("\n\n.............................\n\n")
relevant_docs= vector_db.retriever.invoke(
input="what was the impact for the tech client?",
filter={"user_id": "random"},
k=2,
search_type="similarity"
)
print("\n\n.............................\n\n")
# Retrieve the documents to verify ingestion:
relevant_docs= vector_db.retriever.invoke(
input="what did Sanchit do during his employment with Samsung ?",
filter={"user_id": user},
k=2,
search_type="similarity"
)
# Print the retrieved documents
print(f"Query 2")
for i, doc in enumerate(relevant_docs):
print(f"\nChunk {i+1}:")
print(doc.page_content)
print("\n\n.............................\n\n")
print(f"Metadata: {doc.metadata}")
wait_for_all_tracers()
|