Advanced_RAG / advanced_rag_app.py
towardsinnovationlab's picture
Upload advanced_rag_app.py
1736af9 verified
# ==============================================================================
# ADVANCED RAG WITH GPT, LANGCHAIN, AND RAGAS EVALUATION
# ==============================================================================
# Enhanced RAG application with quality metrics using RAGAS framework
# Supports multiple PDF documents
# ==============================================================================
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.retrievers.document_compressors import CrossEncoderReranker
from sentence_transformers import CrossEncoder
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
answer_correctness,
answer_similarity
)
import gradio as gr
import os
import pandas as pd
import json
# ==============================================================================
# GLOBAL VARIABLES
# ==============================================================================
rag_chain = None
current_documents = [] # Changed to list for multiple documents
openai_api_key = None
retriever = None
evaluation_data = []
# ==============================================================================
# HELPER FUNCTIONS
# ==============================================================================
def format_docs(docs):
"""Format retrieved documents with source citations"""
out = []
for d in docs:
src = d.metadata.get("source", "unknown")
# Extract just the filename from the full path
src = os.path.basename(src)
page = d.metadata.get("page", d.metadata.get("page_number", "?"))
try:
page_display = int(page) + 1
except (ValueError, TypeError):
page_display = page
out.append(f"[{src}:{page_display}] {d.page_content}")
return "\n\n".join(out)
def validate_api_key(api_key):
"""Validate that API key is provided"""
if not api_key or not api_key.strip():
return False
return True
def process_documents(pdf_files, api_key):
"""Process uploaded PDFs and create RAG chain"""
global rag_chain, current_documents, openai_api_key, retriever, evaluation_data
chatbot_clear = None
evaluation_data = [] # Reset evaluation data
if not validate_api_key(api_key):
return "⚠️ Please provide a valid OpenAI API key.", chatbot_clear, ""
if pdf_files is None or len(pdf_files) == 0:
return "⚠️ Please upload at least one PDF file.", chatbot_clear, ""
try:
openai_api_key = api_key.strip()
os.environ["OPENAI_API_KEY"] = openai_api_key
# Process all uploaded PDFs
all_docs = []
current_documents = []
total_pages = 0
for pdf_file in pdf_files:
loader = PyPDFLoader(pdf_file.name)
docs = loader.load()
all_docs.extend(docs)
current_documents.append(os.path.basename(pdf_file.name))
total_pages += len(docs)
# Split all documents
splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", ". ", " ", ""],
chunk_size=1000,
chunk_overlap=100
)
chunked_docs = splitter.split_documents(all_docs)
# Create embeddings and vector store
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=openai_api_key
)
db = FAISS.from_documents(chunked_docs, embeddings)
retriever_1 = db.as_retriever(search_type="similarity",search_kwargs={'k': 10})
retriever_2 = BM25Retriever.from_documents(chunked_docs, search_kwargs={"k": 10})
ensemble_retriever = EnsembleRetriever(retrievers=[retriever_1, retriever_2], weights=[0.7, 0.3])
cross_encoder_model = HuggingFaceCrossEncoder(model_name="cross-encoder/ms-marco-MiniLM-L-12-v2")
reranker = CrossEncoderReranker(model=cross_encoder_model,top_n=10)
reranking_retriever = ContextualCompressionRetriever(base_compressor=reranker,base_retriever=ensemble_retriever)
retriever=reranking_retriever
# Create LLM and prompt
llm = ChatOpenAI(
model="gpt-5-mini",
temperature=0.2,
openai_api_key=openai_api_key
)
prompt_template = """You are a professional research scientist involved in document data analysis.
Use the following context to answer the question using information provided by the documents.
Answer using ONLY these passages. Cite sources as [filename:page] after each claim.
Provide an answer in bullet points.
If you can't find it, say you don't know.
Question:
{question}
Passages:
{context}
Answer:"""
prompt = PromptTemplate(
input_variables=["context", "question"],
template=prompt_template,
)
llm_chain = prompt | llm | StrOutputParser()
rag_chain = (
{"context": reranking_retriever | format_docs, "question": RunnablePassthrough()}
| llm_chain
)
# Create status message with document list
doc_list = "\n".join([f" β€’ {doc}" for doc in current_documents])
status_msg = (
f"βœ… Documents processed successfully!\n\n"
f"πŸ“„ **Documents loaded ({len(current_documents)}):**\n{doc_list}\n\n"
f"πŸ“Š Total pages: {total_pages}\n"
f"πŸ“¦ Chunks created: {len(chunked_docs)}\n\n"
f"You can now ask questions and evaluate responses!"
)
return status_msg, chatbot_clear, ""
except Exception as e:
return f"❌ Error processing documents: {str(e)}", chatbot_clear, ""
def chat_with_document(message, history):
"""Handle chat interactions with the documents"""
global rag_chain, current_documents, retriever, evaluation_data
history.append({"role": "user", "content": message})
if rag_chain is None:
history.append({
"role": "assistant",
"content": "⚠️ Please upload and process PDF documents first."
})
return history
if not message.strip():
history.append({
"role": "assistant",
"content": "⚠️ Please enter a question."
})
return history
try:
# Retrieve contexts for RAGAS evaluation
retrieved_docs = retriever.invoke(message)
contexts = [doc.page_content for doc in retrieved_docs]
# Get response from RAG chain
response = rag_chain.invoke(message)
if isinstance(response, dict):
res_text = response.get("answer", response.get("result", str(response)))
else:
res_text = str(response)
# Store data for RAGAS evaluation
evaluation_data.append({
"question": message,
"answer": res_text,
"contexts": contexts
})
history.append({"role": "assistant", "content": res_text})
return history
except Exception as e:
error_msg = f"❌ Error generating response: {str(e)}"
history.append({"role": "assistant", "content": error_msg})
return history
def evaluate_rag_performance():
"""Evaluate RAG performance using RAGAS metrics"""
global evaluation_data, openai_api_key
if not evaluation_data:
return "⚠️ No evaluation data available. Please ask some questions first."
try:
# Prepare dataset for RAGAS
dataset_dict = {
"question": [item["question"] for item in evaluation_data],
"answer": [item["answer"] for item in evaluation_data],
"contexts": [item["contexts"] for item in evaluation_data],
}
dataset = Dataset.from_dict(dataset_dict)
# Run RAGAS evaluation
# Using only metrics that don't require ground truth (reference answers)
result = evaluate(
dataset,
metrics=[
faithfulness,
answer_relevancy,
],
llm=ChatOpenAI(model="gpt-4o-mini", openai_api_key=openai_api_key),
embeddings=OpenAIEmbeddings(openai_api_key=openai_api_key),
)
# Convert to DataFrame for better display
df = result.to_pandas()
# Calculate average scores from the result directly
metrics_summary = "## πŸ“Š RAGAS Evaluation Results\n\n"
metrics_summary += "### Average Scores:\n"
# Get metric scores safely
metric_cols = ['faithfulness', 'answer_relevancy']
metric_scores = {}
for col in metric_cols:
if col in df.columns:
# Convert to numeric, handling any non-numeric values
numeric_values = pd.to_numeric(df[col], errors='coerce')
avg_score = numeric_values.mean()
if not pd.isna(avg_score):
metric_scores[col] = avg_score
metrics_summary += f"- **{col.replace('_', ' ').title()}**: {avg_score:.4f}\n"
metrics_summary += "\n### Metric Explanations:\n"
metrics_summary += "- **Faithfulness** (0-1): Measures if the answer is factually consistent with the retrieved context. Higher scores mean the answer doesn't hallucinate or contradict the source.\n"
metrics_summary += "- **Answer Relevancy** (0-1): Measures how relevant the answer is to the question asked. Higher scores mean better alignment with the user's query.\n"
metrics_summary += "\n### Interpretation Guide:\n"
metrics_summary += "- **0.9 - 1.0**: Excellent performance\n"
metrics_summary += "- **0.7 - 0.9**: Good performance\n"
metrics_summary += "- **0.5 - 0.7**: Moderate performance (needs improvement)\n"
metrics_summary += "- **< 0.5**: Poor performance (requires significant optimization)\n"
metrics_summary += f"\n### Total Questions Evaluated: {len(evaluation_data)}\n"
# Add document info
if current_documents:
metrics_summary += f"\n### Documents in Index: {len(current_documents)}\n"
return metrics_summary
except Exception as e:
return f"❌ Error during evaluation: {str(e)}"
def export_evaluation_data():
"""Export evaluation data as JSON"""
global evaluation_data, current_documents
if not evaluation_data:
return None
try:
# Create a temporary file with metadata
output_data = {
"documents": current_documents,
"evaluation_data": evaluation_data,
"total_questions": len(evaluation_data)
}
output_path = "ragas_evaluation_data.json"
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
return output_path
except Exception as e:
print(f"Error exporting data: {str(e)}")
return None
def clear_chat():
"""Clear the chat history and evaluation data"""
global evaluation_data
evaluation_data = [] # Reset evaluation data when clearing chat
return [], "" # Return empty chatbot and empty eval_summary
# ==============================================================================
# GRADIO INTERFACE
# ==============================================================================
with gr.Blocks(title="RAG with RAGAS Evaluation", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# πŸ“š Multi-Document Q&A Analysis
### Advanced RAG System Powered by OpenAI GPT models, LangChain & RAGAS
Upload multiple PDFs, ask questions across all documents, and evaluate your RAG system's performance with industry-standard metrics.
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(
"""
### πŸ“‹ How to Use
1. Enter your OpenAI API key
2. Upload one or more PDF documents
3. Process the documents
4. Ask questions in the chat
5. Click "Evaluate" to see performance metrics
---
πŸ’‘ **RAGAS Metrics**:
- Faithfulness: Factual accuracy
- Answer Relevancy: Question alignment
πŸ“ **Multi-Document Support**:
- Upload multiple PDFs at once
- Search across all documents
- Get citations with document names
"""
)
gr.Markdown("### πŸ”‘ API Configuration")
api_key_input = gr.Textbox(
label="OpenAI API Key",
type="password",
placeholder="sk-...",
info="Required for GPT models and RAGAS evaluation"
)
gr.Markdown("### πŸ“€ Upload Documents")
pdf_input = gr.File(
label="Upload PDF Documents",
file_types=[".pdf"],
type="filepath",
file_count="multiple" # Enable multiple file upload
)
process_btn = gr.Button("πŸ“„ Process Documents", variant="primary", size="lg")
status_output = gr.Textbox(
label="Status",
lines=8, # Increased to show multiple documents
interactive=False,
placeholder="Enter API key, upload PDFs, and click 'Process Documents'..."
)
gr.Markdown("### πŸ“ˆ Evaluation")
evaluate_btn = gr.Button("πŸ” Evaluate RAG Performance", variant="secondary", size="lg")
export_btn = gr.Button("πŸ’Ύ Export Evaluation Data", size="sm")
export_file = gr.File(label="Download Evaluation Data", visible=True)
with gr.Column(scale=2):
gr.Markdown("### πŸ’¬ Chat with Your Documents")
chatbot = gr.Chatbot(
height=400,
placeholder="Upload and process documents to start...",
show_label=False,
type="messages"
)
msg = gr.Textbox(
label="Enter your question",
placeholder="Type your question here (searches across all uploaded documents)...",
lines=2
)
with gr.Row():
submit_btn = gr.Button("πŸ“€ Send", variant="primary", scale=4)
clear_btn = gr.Button("πŸ—‘οΈ Clear Chat", scale=1)
gr.Markdown("### πŸ“Š Evaluation Results")
eval_summary = gr.Markdown(value="")
# Event handlers
process_btn.click(
fn=process_documents, # Changed function name
inputs=[pdf_input, api_key_input],
outputs=[status_output, chatbot, eval_summary]
)
submit_btn.click(
fn=chat_with_document,
inputs=[msg, chatbot],
outputs=[chatbot]
).then(
lambda: "",
outputs=[msg]
)
msg.submit(
fn=chat_with_document,
inputs=[msg, chatbot],
outputs=[chatbot]
).then(
lambda: "",
outputs=[msg]
)
clear_btn.click(
fn=clear_chat,
outputs=[chatbot, eval_summary]
)
evaluate_btn.click(
fn=evaluate_rag_performance,
outputs=[eval_summary]
)
export_btn.click(
fn=export_evaluation_data,
outputs=[export_file]
)
# ==============================================================================
# LAUNCH APPLICATION
# ==============================================================================
if __name__ == "__main__":
demo.launch(share=False, debug=True)