| |
| import nest_asyncio |
| nest_asyncio.apply() |
|
|
| import gradio as gr |
| from typing import List, Optional, Tuple, Dict, Any |
| import uuid |
| import os |
| import sys |
| import traceback |
| import warnings |
| import logging |
|
|
| |
| warnings.filterwarnings("ignore", message=".*Invalid file descriptor.*") |
| logging.getLogger("asyncio").setLevel(logging.CRITICAL) |
|
|
| from agent import run_agent |
|
|
|
|
|
|
| |
| |
| |
|
|
| |
| EXAMPLE_IMAGE_URLS = [ |
| "https://64.media.tumblr.com/456e9e6d8f42e677581f7d7994554600/03546756eb18cebb-2e/s400x600/7cd50d0a76327cf08cc75d17e540a11212b56a3b.jpg", |
| "https://64.media.tumblr.com/97e808fda7863d31729da77de9f03285/03546756eb18cebb-2b/s400x600/7fc1a84a8d3f5922ca1f24fd6cc453d45ba88f7f.jpg", |
| "https://64.media.tumblr.com/380d1592fa32f1e2290531879cfdd329/03546756eb18cebb-61/s400x600/e9d78c4467fa3a8dc6223667e236b922bb943775.jpg", |
| ] |
|
|
| |
| |
| |
|
|
| def get_session_id(): |
| """Generate a unique session ID for the user""" |
| return str(uuid.uuid4()) |
|
|
| def format_books_html(books: List[dict]) -> str: |
| """Format final books as HTML for display in a 3-column layout with larger covers""" |
| html = "<div style='width: 100%;'>" |
| html += "<h2 style='color: #667eea; margin-bottom: 20px;'>๐ Your Personalized Recommendations</h2>" |
| html += "<div style='display: grid; grid-template-columns: repeat(3, 1fr); gap: 20px; align-items: start;'>" |
|
|
| for i, book in enumerate(books, 1): |
| desc = book.get("description", "") |
| html += f""" |
| <div style='padding: 16px; background: white; |
| border-radius: 12px; border-left: 4px solid #667eea; box-shadow: 0 2px 8px rgba(0,0,0,0.08);'> |
| <div style='display: flex; gap: 16px; align-items: flex-start;'> |
| <img src='{book.get("cover_url", "")}' |
| style='width: 120px; min-width: 120px; height: 180px; object-fit: cover; border-radius: 8px; box-shadow: 0 4px 12px rgba(0,0,0,0.15);' |
| onerror='this.style.display="none"' /> |
| <div style='flex: 1; min-width: 0;'> |
| <h3 style='margin: 0 0 8px 0; color: #667eea; font-size: 1.1em; line-height: 1.3;'>{i}. {book["title"]}</h3> |
| <p style='margin: 0; color: #666; font-style: italic; font-size: 0.9em;'>by {book["author"]}</p> |
| </div> |
| </div> |
| <p style='margin: 16px 0 0 0; color: #555; line-height: 1.6; font-size: 0.9em;'>{desc}</p> |
| </div> |
| """ |
| html += "</div></div>" |
| return html |
|
|
| def load_example_images(): |
| """Load example images from URLs""" |
| return EXAMPLE_IMAGE_URLS |
|
|
| |
|
|
| |
| |
| |
|
|
| def process_upload(images: List, session_id: str, progress=gr.Progress()): |
| """Handle image upload and start the agent workflow""" |
| if not images: |
| |
| yield [], "Please upload images.", "", None, gr.update(visible=True), "" |
| return |
| |
| |
| image_paths = [] |
| for img in images: |
| if hasattr(img, 'name'): image_paths.append(img.name) |
| |
| elif isinstance(img, dict) and 'path' in img: image_paths.append(img['path']) |
| elif isinstance(img, str) and img.startswith('http'): image_paths.append(img) |
| elif isinstance(img, str) and os.path.isfile(img): image_paths.append(img) |
| elif isinstance(img, tuple): image_paths.append(img[0]) |
|
|
|
|
| if not image_paths: |
| yield [], "Error processing images.", "", None, gr.update(visible=True), "" |
| return |
|
|
| try: |
| |
| yield [], "", "", None, gr.update(visible=False), "๐จ Analyzing your vibe images..." |
| |
| |
| result = run_agent(images=image_paths, thread_id=session_id) |
| |
| |
| chat_history = result["messages"] |
| reasoning = "\n".join(result.get("reasoning", [])) |
|
|
| |
| yield chat_history, reasoning, "", None, gr.update(visible=False), "โจ Vibe analysis complete!" |
| except Exception as e: |
| yield [], f"Error: {e}\n{traceback.format_exc()}", "", None, gr.update(visible=True), "โ Error occurred" |
|
|
| def add_user_message(user_message: str, history: List[Dict[str, str]]): |
| """ |
| Step 1 of Chat: Add user message to history in the new Chatbot format. |
| """ |
| if not user_message.strip(): |
| return history, "" |
| |
| |
| new_message = {"role": "user", "content": user_message} |
| return history + [new_message], "" |
|
|
| def generate_bot_response(history: List[Dict[str, str]], session_id: str): |
| """ |
| Step 2 of Chat: Call agent and update history with response. |
| Uses yield to show loading status. |
| """ |
| print(f"[DEBUG] generate_bot_response called with session_id={session_id}") |
| print(f"[DEBUG] history has {len(history) if history else 0} messages") |
| |
| |
| if not history or history[-1]["role"] != "user": |
| |
| print("[DEBUG] No user message found in history") |
| yield history, "No message to process", "", None, "" |
| return |
| |
|
|
| |
| user_content = history[-1]["content"] |
| |
| if isinstance(user_content, list): |
| user_message = " ".join(item.get("text", str(item)) for item in user_content if isinstance(item, dict)) |
| else: |
| user_message = str(user_content) |
| print(f"[DEBUG] Resuming agent with user_message: {user_message[:50]}...") |
| |
| try: |
| |
| yield history, "", "", None, "๐ Processing your response..." |
| |
| |
| result = run_agent(images=[], user_message=user_message, thread_id=session_id) |
| |
| print(f"[DEBUG] run_agent returned: {type(result)}") |
| if result: |
| print(f"[DEBUG] result keys: {result.keys() if isinstance(result, dict) else 'N/A'}") |
| |
| if result is None: |
| print("[DEBUG] result is None - agent may not have resumed properly") |
| history.append({"role": "assistant", "content": "Error: Agent did not return a response."}) |
| yield history, "Agent returned None", "", None, "โ Agent error" |
| return |
| |
| |
| updated_history = result["messages"] |
| reasoning = "\n".join(result.get("reasoning", [])) |
|
|
| print(f"[DEBUG] updated_history has {len(updated_history)} messages") |
|
|
| |
| |
| books_html = "" |
| if result.get("final_books"): |
| books_html = format_books_html(result["final_books"]) |
| |
| soundtrack = result.get("soundtrack_url", "") or None |
| |
| |
| if result.get("final_books"): |
| status = "โ
Recommendations ready!" |
| elif "retrieved_books" in result and result["retrieved_books"]: |
| status = "๐ Books retrieved, refining..." |
| else: |
| status = "๐ญ Awaiting your input..." |
| |
| |
| yield updated_history, reasoning, books_html, soundtrack, status |
| |
| except Exception as e: |
| |
| error_msg = f"Agent Error: {str(e)}" |
| print(f"[DEBUG] Exception in generate_bot_response: {e}") |
| traceback.print_exc() |
| |
| history.append({"role": "assistant", "content": error_msg}) |
| yield history, f"Error trace: {traceback.format_exc()}", "", None, "โ Error occurred" |
|
|
| def reset_app(): |
| """Reset the session""" |
| new_id = get_session_id() |
| |
| return new_id, [], "", "", None, "", None, gr.update(visible=True), "Ready to analyze your vibe!" |
|
|
| |
| |
| |
|
|
| with gr.Blocks() as demo: |
| |
| session_id = gr.State(get_session_id()) |
| |
| gr.Markdown("# ๐ The Vibe Reader", elem_id='main-title') |
| gr.Markdown(""" |
| **How it works:** |
| - ๐จ **Vision AI** extracts mood, themes, and aesthetic keywords from your images |
| - ๐ **Semantic search** queries a vector DB of 50k+ book recs from r/BooksThatFeelLikeThis |
| - ๐ฌ **Conversational refinement** asks targeted questions to narrow down preferences |
| - ๐ **Google Books MCP** enriches results with covers, descriptions, and metadata |
| - ๐ต **ElevenLabs AI** generates a custom soundtrack that matches your reading vibe |
| """, elem_id='subtitle') |
|
|
| with gr.Row(): |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### 1. Upload Your Vibe") |
| image_input = gr.Gallery(label="Visual Inspiration", columns=3, height="300px") |
| load_examples_btn = gr.Button("๐ท Load Example Images (Credits: @thegorgonist)", variant="secondary", size="md") |
| start_btn = gr.Button("๐ฎ Analyze Vibe", variant="primary", size="lg") |
| status_display = gr.Textbox(label="Status", value="Ready to analyze your vibe!", interactive=False, elem_id="status-display") |
| reset_btn = gr.Button("๐ Start Over", variant="secondary") |
|
|
| |
| with gr.Column(scale=1): |
| gr.Markdown("### 2. Refine & Discover") |
| |
| chatbot = gr.Chatbot(height=500, label="Agent Conversation") |
| |
| with gr.Row(): |
| msg_input = gr.Textbox( |
| show_label=False, |
| placeholder="Type your response here...", |
| scale=4, |
| container=False |
| ) |
| submit_btn = gr.Button("Send", variant="primary", scale=1) |
|
|
| |
| recommendations_output = gr.HTML(label="Recommendations") |
| soundtrack_player = gr.Audio(label="Vibe Soundtrack", type="filepath", interactive=False) |
| |
| with gr.Accordion("๐ Internal Reasoning", open=True): |
| reasoning_display = gr.Textbox(label="Agent Thoughts", lines=10, interactive=False) |
|
|
| |
| |
| |
|
|
| |
| load_examples_btn.click( |
| fn=load_example_images, |
| inputs=[], |
| outputs=[image_input] |
| ) |
|
|
| |
| start_btn.click( |
| fn=process_upload, |
| inputs=[image_input, session_id], |
| outputs=[chatbot, reasoning_display, recommendations_output, soundtrack_player, start_btn, status_display] |
| ) |
|
|
| |
| |
| |
| user_event = msg_input.submit( |
| fn=add_user_message, |
| inputs=[msg_input, chatbot], |
| outputs=[chatbot, msg_input], |
| queue=False |
| ) |
| |
| |
| user_event.then( |
| fn=generate_bot_response, |
| inputs=[chatbot, session_id], |
| outputs=[chatbot, reasoning_display, recommendations_output, soundtrack_player, status_display] |
| ) |
|
|
| submit_btn.click( |
| fn=add_user_message, |
| inputs=[msg_input, chatbot], |
| outputs=[chatbot, msg_input], |
| queue=False |
| ).then( |
| fn=generate_bot_response, |
| inputs=[chatbot, session_id], |
| outputs=[chatbot, reasoning_display, recommendations_output, soundtrack_player, status_display] |
| ) |
|
|
| |
| reset_btn.click( |
| fn=reset_app, |
| inputs=[], |
| outputs=[session_id, chatbot, reasoning_display, recommendations_output, soundtrack_player, msg_input, image_input, start_btn, status_display] |
| ) |
|
|
| if __name__ == "__main__": |
| |
| demo.queue().launch(theme=gr.themes.Monochrome(), css_paths='assets/custom.css',ssr_mode=False) |