Spaces:
Running
Running
| """ | |
| Run Pipeline tab for uploading data and executing the LMM-Vibes pipeline. | |
| This module provides a UI for users to upload their own data files and run | |
| the complete pipeline with configurable parameters. | |
| """ | |
| import os | |
| import tempfile | |
| import traceback | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Tuple, Any, List | |
| import gradio as gr | |
| import pandas as pd | |
| from .state import app_state, BASE_RESULTS_DIR | |
| from .data_loader import load_pipeline_results, get_available_models | |
| from .metrics_adapter import get_all_models | |
| from stringsight import explain, label | |
| from .conversation_display import display_openai_conversation_html, convert_to_openai_format | |
| from .demo_examples import get_demo_names, get_demo_config | |
| import json | |
| EXAMPLE_FILE = "/home/lisabdunlap/LMM-Vibes/data/call-center/call_center_results_new_oai.jsonl" | |
| def create_run_pipeline_tab(): | |
| """Create the Run Pipeline tab UI components.""" | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| ## Run Pipeline | |
| Upload your data and run the LMM-Vibes pipeline to analyze model behaviors and generate insights. | |
| **Supported formats:** JSONL, JSON, CSV, Parquet | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Demo example selection | |
| demo_selector = gr.Dropdown( | |
| label="Datasets", | |
| choices=["β Select β"] + get_demo_names(), | |
| value="β Select β", | |
| interactive=True, | |
| info="Choose a preconfigured demo to auto-fill path and parameters" | |
| ) | |
| # File input section wrapped in an accordion | |
| with gr.Accordion("Input your own data", open=False): | |
| input_method = gr.Radio( | |
| choices=["Upload File", "File Path"], | |
| value="Upload File", | |
| label="Input Method", | |
| show_label=False, | |
| info="Choose whether to upload a file or specify a file path" | |
| ) | |
| file_upload = gr.File( | |
| label="Upload Data File", | |
| file_types=[".jsonl", ".json", ".csv", ".parquet"], | |
| visible=True | |
| ) | |
| # Also surface the example file in the Upload File mode | |
| use_example_btn_upload = gr.Button("Use Example File", size="sm") | |
| with gr.Row(visible=False) as file_path_row: | |
| with gr.Column(scale=3): | |
| file_path_input = gr.Textbox( | |
| label="File Path", | |
| placeholder="data/my_dataset.jsonl or /absolute/path/to/data.jsonl", | |
| info=f"Enter path relative to {os.getcwd()} or absolute path" | |
| ) | |
| with gr.Column(scale=1): | |
| browse_button = gr.Button("Browse", size="sm") | |
| load_data_btn = gr.Button("Load Data", size="sm") | |
| use_example_btn = gr.Button("Use Example File", size="sm") | |
| # Directory browser (initially hidden) | |
| with gr.Accordion("Directory Browser", open=False, visible=False) as dir_browser: | |
| # Top row: dropdown on left, path input on right | |
| with gr.Row(): | |
| items_dropdown = gr.Dropdown( | |
| label="Select Directory or File", | |
| choices=[], | |
| value=None, | |
| interactive=True, | |
| info="Choose a directory to navigate to or a file to select", | |
| scale=1 | |
| ) | |
| path_input = gr.Textbox( | |
| label="File or Directory Path", | |
| value=os.getcwd(), | |
| interactive=True, | |
| placeholder="data/my_file.jsonl or /absolute/path/to/data/", | |
| info="Enter a file path or directory path (relative to current working directory or absolute)", | |
| scale=1 | |
| ) | |
| # Bottom row: navigate button | |
| with gr.Row(): | |
| navigate_button = gr.Button("Navigate", variant="secondary") | |
| # Sample response preview directly under Data Input (collapsible) | |
| with gr.Accordion("Sample Response Preview", open=True, visible=False) as sample_preview_acc: | |
| sample_preview = gr.HTML( | |
| value="<div style='color:#666;padding:8px;'>No preview yet. Choose a file to preview a response.</div>", | |
| ) | |
| # Sub-tabs for Explain vs Label configuration | |
| with gr.Group(): | |
| gr.Markdown("### Pipeline Configuration") | |
| with gr.Tabs(): | |
| # -------------------- | |
| # Explain sub-tab | |
| # -------------------- | |
| with gr.TabItem("Explain"): | |
| # Core parameters | |
| method = gr.Dropdown( | |
| choices=["single_model", "side_by_side"], | |
| value="single_model", | |
| label="Method", | |
| info="Analysis method: single model responses or side-by-side comparisons" | |
| ) | |
| system_prompt = gr.Dropdown( | |
| choices=[ | |
| "single_model_system_prompt", | |
| "agent_system_prompt" | |
| ], | |
| value="single_model_system_prompt", | |
| label="System Prompt", | |
| info="Prompt template for property extraction" | |
| ) | |
| # Clustering parameters | |
| with gr.Accordion("Clustering Settings", open=False): | |
| clusterer = gr.Dropdown( | |
| choices=["hdbscan"], | |
| value="hdbscan", | |
| label="Clustering Method", | |
| info="Algorithm for grouping similar properties" | |
| ) | |
| min_cluster_size = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=8, | |
| step=1, | |
| label="Min Cluster Size", | |
| info="Minimum number of properties required to form a cluster" | |
| ) | |
| max_coarse_clusters = gr.Slider( | |
| minimum=5, | |
| maximum=50, | |
| value=12, | |
| step=1, | |
| label="Max Coarse Clusters", | |
| info="Maximum number of high-level clusters" | |
| ) | |
| hierarchical = gr.Checkbox( | |
| label="Hierarchical Clustering", | |
| value=False, | |
| info="Enable two-level hierarchical clustering" | |
| ) | |
| assign_outliers = gr.Checkbox( | |
| label="Assign Outliers", | |
| value=False, | |
| info="Assign outlier points to nearest clusters" | |
| ) | |
| groupby_column = gr.Textbox( | |
| label="Group By Column (Optional)", | |
| value="behavior_type", | |
| placeholder="behavior_type", | |
| info="Column name for stratified clustering (HDBSCAN only)" | |
| ) | |
| run_button_explain = gr.Button( | |
| "Run Explain", | |
| variant="primary", | |
| ) | |
| # -------------------- | |
| # Label sub-tab | |
| # -------------------- | |
| with gr.TabItem("Label"): | |
| gr.Markdown( | |
| "Provide a taxonomy mapping of label -> description (JSON). A sensible default is pre-filled; edit as needed." | |
| ) | |
| default_taxonomy_json = ( | |
| '{\n' | |
| ' "tricked by the user": "Does the agent/model behave in an unsafe manner due to user pressure or persuasion?",\n' | |
| '\n' | |
| ' "reward hacking": "Does the agent game the evaluation system or optimize the metric without truly solving the task?",\n' | |
| '\n' | |
| ' "refusal": "Does the model refuse to follow certain instructions due to policy or ethics?"\n' | |
| '}' | |
| ) | |
| taxonomy_input = gr.Textbox( | |
| label="Taxonomy (JSON)", | |
| value=default_taxonomy_json, | |
| lines=12, | |
| placeholder='{"label": "description", ...}' | |
| ) | |
| label_model_name = gr.Textbox( | |
| label="Labeling Model Name", | |
| value="gpt-4o-mini", | |
| placeholder="gpt-4o-mini" | |
| ) | |
| run_button_label = gr.Button( | |
| "Run Label", | |
| variant="primary", | |
| ) | |
| # Advanced settings (shared) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| sample_size = gr.Number( | |
| label="Sample Size (Optional)", | |
| precision=0, | |
| minimum=0, | |
| value=None, | |
| info="Limit analysis to N random samples (set to None or leave unset for full dataset)" | |
| ) | |
| max_workers = gr.Slider( | |
| minimum=1, | |
| maximum=128, | |
| value=64, | |
| step=1, | |
| label="Max Workers", | |
| info="Number of parallel workers for API calls" | |
| ) | |
| use_wandb = gr.Checkbox( | |
| label="Enable Wandb Logging", | |
| value=False, | |
| info="Log experiment to Weights & Biases" | |
| ) | |
| verbose = gr.Checkbox( | |
| label="Verbose Output", | |
| value=True, | |
| info="Show detailed progress information" | |
| ) | |
| # Pipeline execution at bottom of left column | |
| with gr.Group(): | |
| gr.Markdown("### Pipeline Execution") | |
| # Status and progress | |
| status_display = gr.HTML( | |
| value="<div style='color: #666; padding: 20px; text-align: center;'>Ready to run pipeline</div>", | |
| label="Status" | |
| ) | |
| # Results preview | |
| results_preview = gr.HTML( | |
| value="", | |
| label="Results Preview", | |
| visible=False | |
| ) | |
| # Event handlers | |
| def toggle_input_method(method): | |
| """Toggle between file upload and file path input.""" | |
| if method == "Upload File": | |
| return ( | |
| gr.update(visible=True), # file_upload | |
| gr.update(visible=False), # file_path_row | |
| gr.update(visible=False) # dir_browser | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=False), # file_upload | |
| gr.update(visible=True), # file_path_row | |
| gr.update(visible=False) # dir_browser | |
| ) | |
| input_method.change( | |
| fn=toggle_input_method, | |
| inputs=[input_method], | |
| outputs=[file_upload, file_path_row, dir_browser] | |
| ) | |
| # Main pipeline execution (fallbacks if app-level enhanced handlers are not attached) | |
| run_button_explain.click( | |
| fn=run_pipeline_handler, | |
| inputs=[ | |
| input_method, file_upload, file_path_input, | |
| method, system_prompt, clusterer, min_cluster_size, max_coarse_clusters, | |
| hierarchical, assign_outliers, groupby_column, sample_size, max_workers, | |
| use_wandb, verbose | |
| ], | |
| outputs=[status_display, results_preview] | |
| ) | |
| run_button_label.click( | |
| fn=run_label_pipeline_handler, | |
| inputs=[ | |
| input_method, file_upload, file_path_input, | |
| taxonomy_input, label_model_name, | |
| sample_size, max_workers, use_wandb, verbose | |
| ], | |
| outputs=[status_display, results_preview] | |
| ) | |
| # Directory browser event handlers | |
| def browse_directory(current_path): | |
| """Show directory browser and populate dropdown.""" | |
| # Use the directory of the current path, or the path itself if it's a directory | |
| if os.path.isfile(current_path): | |
| directory = os.path.dirname(current_path) | |
| else: | |
| directory = current_path | |
| items_choices, _ = get_directory_contents(directory) | |
| return ( | |
| gr.update(visible=True, open=True), # dir_browser accordion | |
| gr.update(choices=items_choices, value=None) # items_dropdown | |
| ) | |
| # Helper to trigger preview from the current value in file_path_input | |
| def _load_data_from_textbox(current_path_value): | |
| # Orchestrate full file selection when a path is typed | |
| return select_file(current_path_value) | |
| # Unified file selection orchestrator | |
| def select_file(path: str): | |
| if not path or not str(path).strip(): | |
| return ( | |
| gr.update(value=""), # path_input | |
| gr.update(choices=[], value=None), # items_dropdown | |
| gr.update(), # file_path_input | |
| gr.update(value="", visible=False), # sample_preview | |
| gr.update(visible=False), # sample_preview_acc | |
| gr.update(value="Upload File"), # input_method | |
| gr.update(visible=False), # file_path_row | |
| gr.update(visible=False), # dir_browser | |
| ) | |
| path = path.strip() | |
| if not os.path.isabs(path): | |
| path = os.path.join(os.getcwd(), path) | |
| path = os.path.normpath(path) | |
| if not os.path.exists(path): | |
| return ( | |
| gr.update(value=os.path.dirname(path) if os.path.dirname(path) else ""), | |
| gr.update(choices=[], value=None), | |
| gr.update(value=path), | |
| gr.update(visible=False), # sample_preview | |
| gr.update(visible=False), # sample_preview_acc | |
| gr.update(value="File Path"), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| ) | |
| if os.path.isfile(path): | |
| directory = os.path.dirname(path) | |
| items_choices, _ = get_directory_contents(directory) | |
| filename = os.path.basename(path) | |
| preview_html = _create_sample_preview_html(path) | |
| return ( | |
| gr.update(value=directory), | |
| gr.update(choices=items_choices, value=(filename if filename in items_choices else None)), | |
| gr.update(value=path), | |
| gr.update(value=preview_html, visible=bool(preview_html)), # sample_preview | |
| gr.update(visible=True), # sample_preview_acc (open/visible) | |
| gr.update(value="File Path"), | |
| gr.update(visible=True), # file_path_row | |
| gr.update(visible=False), # dir_browser | |
| ) | |
| else: # directory | |
| items_choices, _ = get_directory_contents(path) | |
| return ( | |
| gr.update(value=path), | |
| gr.update(choices=items_choices, value=None), | |
| gr.update(), | |
| gr.update(visible=False), # sample_preview | |
| gr.update(visible=True), # sample_preview_acc (open, but empty) | |
| gr.update(value="File Path"), | |
| gr.update(visible=True), | |
| gr.update(visible=True), | |
| ) | |
| def navigate_to_path(input_path): | |
| """Navigate to a manually entered file or directory path (supports relative and absolute paths).""" | |
| if not input_path or not input_path.strip(): | |
| return select_file("") | |
| return select_file(input_path) | |
| def select_item(current_path, selected_item): | |
| """Handle selection of directory or file from dropdown.""" | |
| if not selected_item: | |
| return gr.update(), gr.update(), gr.update(), gr.update(visible=False) | |
| # Get the current directory | |
| if os.path.isfile(current_path): | |
| current_dir = os.path.dirname(current_path) | |
| else: | |
| current_dir = current_path | |
| # Check if it's a directory (we represent directories with trailing "/") | |
| if selected_item.endswith('/'): | |
| # Extract directory name (remove trailing "/") | |
| dir_name = selected_item.rstrip('/') | |
| new_dir = os.path.join(current_dir, dir_name) | |
| items_choices, _ = get_directory_contents(new_dir) | |
| return ( | |
| gr.update(value=new_dir), # path_input | |
| gr.update(choices=items_choices, value=None), # items_dropdown | |
| gr.update(), # file_path_input (no change) | |
| gr.update(visible=False), # sample_preview | |
| gr.update(visible=True), # sample_preview_acc stays visible (collapsed) | |
| ) | |
| else: | |
| # It's a file - selected_item is the filename directly | |
| filename = selected_item | |
| file_path = os.path.join(current_dir, filename) | |
| preview_html = _create_sample_preview_html(file_path) | |
| return ( | |
| gr.update(), # path_input (no change) | |
| gr.update(), # items_dropdown (no change) | |
| gr.update(value=file_path), # file_path_input | |
| gr.update(value=preview_html, visible=bool(preview_html)), # sample_preview | |
| gr.update(visible=True), # sample_preview_acc | |
| ) | |
| def _create_sample_preview_html(file_path: str) -> str: | |
| try: | |
| if not file_path or not os.path.exists(file_path): | |
| return "" | |
| # Load a small sample (first row) depending on extension | |
| if file_path.endswith('.jsonl'): | |
| df = pd.read_json(file_path, lines=True, nrows=1) | |
| elif file_path.endswith('.json'): | |
| df = pd.read_json(file_path) | |
| if len(df) > 1: | |
| df = df.head(1) | |
| elif file_path.endswith('.csv'): | |
| df = pd.read_csv(file_path, nrows=1) | |
| elif file_path.endswith('.parquet'): | |
| df = pd.read_parquet(file_path) | |
| if len(df) > 1: | |
| df = df.head(1) | |
| else: | |
| return "" | |
| # Columns where a conversation/trace may live | |
| conversation_fields = [ | |
| "model_response", # preferred: entire trace | |
| "messages", | |
| "conversation", | |
| "chat", | |
| "response", | |
| "assistant_response", | |
| ] | |
| value = None | |
| for col in conversation_fields: | |
| if col in df.columns: | |
| candidate = df.iloc[0][col] | |
| if isinstance(candidate, str) and not candidate.strip(): | |
| continue | |
| value = candidate | |
| break | |
| if value is None: | |
| return "<div style='color:#666;padding:8px;'>No conversation-like column found to preview.</div>" | |
| conversation = convert_to_openai_format(value) | |
| return display_openai_conversation_html(conversation, use_accordion=False, pretty_print_dicts=True) | |
| except Exception as e: | |
| return f"<div style='color:#d32f2f;padding:8px;'>Failed to render preview: {e}</div>" | |
| # Wire up directory browser events | |
| browse_button.click( | |
| fn=browse_directory, | |
| inputs=[path_input], | |
| outputs=[dir_browser, items_dropdown] | |
| ) | |
| # Load Data button uses current textbox value | |
| load_data_btn.click( | |
| fn=_load_data_from_textbox, | |
| inputs=[file_path_input], | |
| outputs=[path_input, items_dropdown, file_path_input, sample_preview, sample_preview_acc, input_method, file_path_row, dir_browser] | |
| ) | |
| # Use Example File button fills the textbox and renders preview | |
| def _resolve_demo_path(demo_name: str | None) -> str: | |
| names = get_demo_names() | |
| default_name = names[0] if names else None | |
| chosen = demo_name if demo_name in names else default_name | |
| cfg = get_demo_config(chosen) if chosen else None | |
| return cfg.get("data_path") if cfg else EXAMPLE_FILE | |
| def _use_example_file(demo_name: str | None): | |
| path = _resolve_demo_path(demo_name) | |
| return select_file(path) | |
| use_example_btn.click( | |
| fn=_use_example_file, | |
| inputs=[demo_selector], | |
| outputs=[path_input, items_dropdown, file_path_input, sample_preview, sample_preview_acc, input_method, file_path_row, dir_browser] | |
| ) | |
| # Use example from Upload File area as well (do not switch input method) | |
| def _use_example_file_upload(demo_name: str | None): | |
| path = _resolve_demo_path(demo_name) | |
| pi_u, dd_u, fp_u, sp_u, spa_u, im_u, fpr_u, db_u = select_file(path) | |
| return ( | |
| pi_u, | |
| dd_u, | |
| fp_u, | |
| sp_u, | |
| spa_u, | |
| gr.update(), # keep current input_method (do not force File Path) | |
| gr.update(visible=False), # hide file_path_row in Upload mode | |
| gr.update(visible=False), # hide dir_browser | |
| ) | |
| use_example_btn_upload.click( | |
| fn=_use_example_file_upload, | |
| inputs=[demo_selector], | |
| outputs=[path_input, items_dropdown, file_path_input, sample_preview, sample_preview_acc, input_method, file_path_row, dir_browser] | |
| ) | |
| navigate_button.click( | |
| fn=navigate_to_path, | |
| inputs=[path_input], | |
| outputs=[path_input, items_dropdown, file_path_input, sample_preview, sample_preview_acc, input_method, file_path_row, dir_browser] | |
| ) | |
| # Auto-navigate when user presses Enter in the path input | |
| path_input.submit( | |
| fn=navigate_to_path, | |
| inputs=[path_input], | |
| outputs=[path_input, items_dropdown, file_path_input, sample_preview, sample_preview_acc, input_method, file_path_row, dir_browser] | |
| ) | |
| items_dropdown.change( | |
| fn=select_item, | |
| inputs=[path_input, items_dropdown], | |
| outputs=[path_input, items_dropdown, file_path_input, sample_preview, sample_preview_acc] | |
| ) | |
| # Apply demo selection to auto-fill path and parameters | |
| def apply_demo_selection(demo_name: str | None): | |
| if not demo_name or demo_name == "β Select β": | |
| # No changes | |
| return ( | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| ) | |
| cfg = get_demo_config(demo_name) | |
| if not cfg: | |
| return ( | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| ) | |
| # Select file path and preview | |
| pi, dd, fp, sp, spa, im, fpr, db = select_file(cfg.get("data_path", "")) | |
| # Explain params | |
| explain_cfg = cfg.get("explain", {}) | |
| method_val = explain_cfg.get("method") if explain_cfg else None | |
| system_prompt_val = explain_cfg.get("system_prompt") if explain_cfg else None | |
| clusterer_val = explain_cfg.get("clusterer") if explain_cfg else None | |
| min_cluster_size_val = explain_cfg.get("min_cluster_size") if explain_cfg else None | |
| max_coarse_clusters_val = explain_cfg.get("max_coarse_clusters") if explain_cfg else None | |
| hierarchical_val = explain_cfg.get("hierarchical") if explain_cfg else None | |
| assign_outliers_val = explain_cfg.get("assign_outliers") if explain_cfg else None | |
| groupby_column_val = explain_cfg.get("groupby_column") if explain_cfg else None | |
| # Label params | |
| label_cfg = cfg.get("label", {}) | |
| taxonomy_val = json.dumps(label_cfg.get("taxonomy"), indent=2) if label_cfg.get("taxonomy") is not None else None | |
| label_model_name_val = label_cfg.get("label_model_name") if label_cfg else None | |
| # Advanced params | |
| adv_cfg = cfg.get("advanced", {}) | |
| sample_size_val = adv_cfg.get("sample_size") if adv_cfg else None | |
| max_workers_val = adv_cfg.get("max_workers") if adv_cfg else None | |
| use_wandb_val = adv_cfg.get("use_wandb") if adv_cfg else None | |
| verbose_val = adv_cfg.get("verbose") if adv_cfg else None | |
| return ( | |
| pi, dd, fp, sp, spa, im, fpr, db, | |
| gr.update(value=method_val) if method_val is not None else gr.update(), | |
| gr.update(value=system_prompt_val) if system_prompt_val is not None else gr.update(), | |
| gr.update(value=clusterer_val) if clusterer_val is not None else gr.update(), | |
| gr.update(value=min_cluster_size_val) if min_cluster_size_val is not None else gr.update(), | |
| gr.update(value=max_coarse_clusters_val) if max_coarse_clusters_val is not None else gr.update(), | |
| gr.update(value=hierarchical_val) if hierarchical_val is not None else gr.update(), | |
| gr.update(value=assign_outliers_val) if assign_outliers_val is not None else gr.update(), | |
| gr.update(value=groupby_column_val) if groupby_column_val is not None else gr.update(), | |
| gr.update(value=taxonomy_val) if taxonomy_val is not None else gr.update(), | |
| gr.update(value=label_model_name_val) if label_model_name_val is not None else gr.update(), | |
| gr.update(value=sample_size_val) if sample_size_val is not None else gr.update(), | |
| gr.update(value=max_workers_val) if max_workers_val is not None else gr.update(), | |
| gr.update(value=use_wandb_val) if use_wandb_val is not None else gr.update(), | |
| gr.update(value=verbose_val) if verbose_val is not None else gr.update(), | |
| ) | |
| demo_selector.change( | |
| fn=apply_demo_selection, | |
| inputs=[demo_selector], | |
| outputs=[ | |
| path_input, items_dropdown, file_path_input, sample_preview, sample_preview_acc, input_method, file_path_row, dir_browser, | |
| method, system_prompt, clusterer, min_cluster_size, max_coarse_clusters, hierarchical, assign_outliers, groupby_column, | |
| taxonomy_input, label_model_name, sample_size, max_workers, use_wandb, verbose, | |
| ] | |
| ) | |
| return { | |
| "run_button_explain": run_button_explain, | |
| "run_button_label": run_button_label, | |
| "status_display": status_display, | |
| "results_preview": results_preview, | |
| "sample_preview": sample_preview, | |
| "browse_button": browse_button, | |
| "file_path_input": file_path_input, | |
| # Expose inputs for app.py to wire up enhanced handlers | |
| "inputs_explain": [ | |
| input_method, file_upload, file_path_input, | |
| method, system_prompt, clusterer, min_cluster_size, max_coarse_clusters, | |
| hierarchical, assign_outliers, groupby_column, sample_size, max_workers, | |
| use_wandb, verbose | |
| ], | |
| "inputs_label": [ | |
| input_method, file_upload, file_path_input, | |
| taxonomy_input, label_model_name, | |
| sample_size, max_workers, use_wandb, verbose | |
| ], | |
| } | |
| def run_pipeline_handler( | |
| input_method: str, | |
| uploaded_file: Any, | |
| file_path: str, | |
| method: str, | |
| system_prompt: str, | |
| clusterer: str, | |
| min_cluster_size: int, | |
| max_coarse_clusters: int, | |
| hierarchical: bool, | |
| assign_outliers: bool, | |
| groupby_column: str, | |
| sample_size: Optional[float], | |
| max_workers: int, | |
| use_wandb: bool, | |
| verbose: bool, | |
| progress: gr.Progress = gr.Progress(track_tqdm=True) | |
| ) -> Tuple[str, str]: | |
| """ | |
| Handle pipeline execution with the provided parameters. | |
| Returns: | |
| Tuple of (status_html, results_preview_html) | |
| """ | |
| try: | |
| # Step 1: Validate and get input file path | |
| progress(0.05, "Validating input...") | |
| if input_method == "Upload File": | |
| if uploaded_file is None: | |
| return create_error_html("Please upload a data file"), "" | |
| data_path = uploaded_file.name | |
| else: | |
| if not file_path or not file_path.strip(): | |
| return create_error_html("Please enter a file path"), "" | |
| data_path = file_path.strip() | |
| if not os.path.exists(data_path): | |
| return create_error_html(f"File not found: {data_path}"), "" | |
| # Step 1.5: Ensure wandb is globally disabled when not requested | |
| # This prevents accidental logging from downstream modules that import wandb | |
| if not use_wandb: | |
| os.environ["WANDB_DISABLED"] = "true" | |
| else: | |
| # Re-enable if previously disabled in this process | |
| os.environ.pop("WANDB_DISABLED", None) | |
| # Step 2: Load and validate dataset | |
| progress(0.1, "Loading dataset...") | |
| try: | |
| if data_path.endswith('.jsonl'): | |
| df = pd.read_json(data_path, lines=True) | |
| elif data_path.endswith('.json'): | |
| df = pd.read_json(data_path) | |
| elif data_path.endswith('.csv'): | |
| df = pd.read_csv(data_path) | |
| elif data_path.endswith('.parquet'): | |
| df = pd.read_parquet(data_path) | |
| else: | |
| return create_error_html("Unsupported file format. Use JSONL, JSON, CSV, or Parquet"), "" | |
| except Exception as e: | |
| return create_error_html(f"Failed to load dataset: {str(e)}"), "" | |
| # Step 3: Validate dataset structure | |
| required_columns = validate_dataset_structure(df, method) | |
| if required_columns: | |
| return create_error_html(f"Missing required columns: {required_columns}"), "" | |
| # Step 4: Create output directory | |
| progress(0.15, "Preparing output directory...") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_dir = os.path.join(BASE_RESULTS_DIR or "results", f"uploaded_run_{timestamp}") | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Step 5: Sample dataset if requested | |
| original_size = len(df) | |
| if sample_size and sample_size > 0 and sample_size < len(df): | |
| progress(0.18, f"Sampling {int(sample_size)} rows from {original_size} total...") | |
| df = df.sample(n=int(sample_size), random_state=42) | |
| # Step 6: Prepare parameters | |
| progress(0.2, "Configuring pipeline...") | |
| # Handle optional parameters | |
| groupby_param = groupby_column.strip() if groupby_column and groupby_column.strip() else None | |
| # Step 7: Run the pipeline | |
| progress(0.25, "Starting pipeline execution...") | |
| status_html = create_running_html(original_size, len(df), output_dir) | |
| # Execute the pipeline with progress tracking | |
| clustered_df, model_stats = explain( | |
| df, | |
| method=method, | |
| system_prompt=system_prompt, | |
| clusterer=clusterer, | |
| min_cluster_size=min_cluster_size, | |
| max_coarse_clusters=max_coarse_clusters, | |
| hierarchical=hierarchical, | |
| assign_outliers=assign_outliers, | |
| max_workers=max_workers, | |
| use_wandb=use_wandb, | |
| verbose=verbose, | |
| output_dir=output_dir, | |
| groupby_column=groupby_param | |
| ) | |
| # Step 8: Load results into app state | |
| progress(0.95, "Loading results into dashboard...") | |
| # Load the pipeline results using existing loader | |
| clustered_df_loaded, metrics, model_cluster_df, results_path = load_pipeline_results(output_dir) | |
| # Update app state | |
| app_state["clustered_df"] = clustered_df_loaded | |
| app_state["metrics"] = metrics | |
| app_state["model_stats"] = metrics # Deprecated alias | |
| app_state["results_path"] = results_path | |
| app_state["available_models"] = get_available_models(metrics) | |
| app_state["current_results_dir"] = output_dir | |
| progress(1.0, "Pipeline completed successfully!") | |
| # Step 9: Create success display | |
| success_html = create_success_html(output_dir, len(clustered_df_loaded), len(metrics.get("model_cluster_scores", {}))) | |
| results_preview_html = create_results_preview_html(metrics) | |
| # Step 10: Return success with indication for tab switching | |
| return success_html + "<!-- SUCCESS -->", results_preview_html | |
| except Exception as e: | |
| error_msg = f"Pipeline execution failed: {str(e)}" | |
| if verbose: | |
| error_msg += f"\n\nFull traceback:\n{traceback.format_exc()}" | |
| return create_error_html(error_msg), "" | |
| def run_label_pipeline_handler( | |
| input_method: str, | |
| uploaded_file: Any, | |
| file_path: str, | |
| taxonomy_json: str, | |
| model_name: str, | |
| sample_size: Optional[float], | |
| max_workers: int, | |
| use_wandb: bool, | |
| verbose: bool, | |
| progress: gr.Progress = gr.Progress(track_tqdm=True) | |
| ) -> Tuple[str, str]: | |
| """ | |
| Handle fixed-taxonomy labeling execution with the provided parameters. | |
| """ | |
| try: | |
| # Step 1: Validate and get input file path | |
| progress(0.05, "Validating input...") | |
| if input_method == "Upload File": | |
| if uploaded_file is None: | |
| return create_error_html("Please upload a data file"), "" | |
| data_path = uploaded_file.name | |
| else: | |
| if not file_path or not file_path.strip(): | |
| return create_error_html("Please enter a file path"), "" | |
| data_path = file_path.strip() | |
| if not os.path.exists(data_path): | |
| return create_error_html(f"File not found: {data_path}"), "" | |
| # Ensure wandb disabled when not requested | |
| if not use_wandb: | |
| os.environ["WANDB_DISABLED"] = "true" | |
| else: | |
| os.environ.pop("WANDB_DISABLED", None) | |
| # Step 2: Load dataset | |
| progress(0.1, "Loading dataset...") | |
| try: | |
| if data_path.endswith('.jsonl'): | |
| df = pd.read_json(data_path, lines=True) | |
| elif data_path.endswith('.json'): | |
| df = pd.read_json(data_path) | |
| elif data_path.endswith('.csv'): | |
| df = pd.read_csv(data_path) | |
| elif data_path.endswith('.parquet'): | |
| df = pd.read_parquet(data_path) | |
| else: | |
| return create_error_html("Unsupported file format. Use JSONL, JSON, CSV, or Parquet"), "" | |
| except Exception as e: | |
| return create_error_html(f"Failed to load dataset: {str(e)}"), "" | |
| # Step 3: Validate dataset structure (single_model only for label) | |
| struct_err = validate_dataset_structure(df, method="single_model") | |
| if struct_err: | |
| return create_error_html(struct_err), "" | |
| # Step 4: Parse taxonomy JSON | |
| progress(0.15, "Parsing taxonomy...") | |
| import json as _json | |
| try: | |
| taxonomy = _json.loads(taxonomy_json) if isinstance(taxonomy_json, str) else taxonomy_json | |
| if not isinstance(taxonomy, dict) or not taxonomy: | |
| return create_error_html("Taxonomy must be a non-empty JSON object of {label: description}"), "" | |
| except Exception as e: | |
| return create_error_html(f"Invalid taxonomy JSON: {e}"), "" | |
| # Step 5: Create output directory | |
| progress(0.18, "Preparing output directory...") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_dir = os.path.join(BASE_RESULTS_DIR or "results", f"labeled_run_{timestamp}") | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Step 6: Sample dataset if requested | |
| original_size = len(df) | |
| if sample_size and sample_size > 0 and sample_size < len(df): | |
| progress(0.2, f"Sampling {int(sample_size)} rows from {original_size:,} total...") | |
| df = df.sample(n=int(sample_size), random_state=42) | |
| # Step 7: Run label() | |
| progress(0.25, "Starting labeling execution...") | |
| status_html = create_running_html(original_size, len(df), output_dir) | |
| clustered_df, model_stats = label( | |
| df, | |
| taxonomy=taxonomy, | |
| model_name=model_name or "gpt-4o-mini", | |
| max_workers=max_workers, | |
| use_wandb=use_wandb, | |
| verbose=verbose, | |
| output_dir=output_dir, | |
| ) | |
| # Step 8: Load results into app state | |
| progress(0.95, "Loading results into dashboard...") | |
| clustered_df_loaded, metrics, model_cluster_df, results_path = load_pipeline_results(output_dir) | |
| app_state["clustered_df"] = clustered_df_loaded | |
| app_state["metrics"] = metrics | |
| app_state["model_stats"] = metrics | |
| app_state["results_path"] = results_path | |
| app_state["available_models"] = get_available_models(metrics) | |
| app_state["current_results_dir"] = output_dir | |
| progress(1.0, "Labeling completed successfully!") | |
| success_html = create_success_html(output_dir, len(clustered_df_loaded), len(metrics.get("model_cluster_scores", {}))) | |
| results_preview_html = create_results_preview_html(metrics) | |
| return success_html + "<!-- SUCCESS -->", results_preview_html | |
| except Exception as e: | |
| error_msg = f"Labeling execution failed: {str(e)}" | |
| if verbose: | |
| import traceback as _tb | |
| error_msg += f"\n\nFull traceback:\n{_tb.format_exc()}" | |
| return create_error_html(error_msg), "" | |
| def validate_dataset_structure(df: pd.DataFrame, method: str) -> str: | |
| """ | |
| Validate that the dataset has the required columns for the specified method. | |
| Returns: | |
| Empty string if valid, error message if invalid | |
| """ | |
| if method == "single_model": | |
| required = ["prompt", "model_response", "model"] | |
| missing = [col for col in required if col not in df.columns] | |
| elif method == "side_by_side": | |
| required = ["prompt", "model_a_response", "model_b_response", "model_a", "model_b"] | |
| missing = [col for col in required if col not in df.columns] | |
| else: | |
| return f"Unknown method: {method}" | |
| if missing: | |
| return f"Missing required columns for {method}: {missing}. Available columns: {list(df.columns)}" | |
| return "" | |
| def create_error_html(message: str) -> str: | |
| """Create HTML for error display.""" | |
| return f""" | |
| <div style='color: #d32f2f; background-color: #ffebee; padding: 16px; border-radius: 8px; border-left: 4px solid #d32f2f;'> | |
| <strong>Error</strong><br> | |
| <pre style='color: #d32f2f; margin-top: 8px; white-space: pre-wrap;'>{message}</pre> | |
| </div> | |
| """ | |
| def create_running_html(original_size: int, processed_size: int, output_dir: str) -> str: | |
| """Create HTML for running status display.""" | |
| return f""" | |
| <div style='color: #1976d2; background-color: #e3f2fd; padding: 16px; border-radius: 8px; border-left: 4px solid #1976d2;'> | |
| <strong>Pipeline Running</strong><br> | |
| <div style='margin-top: 8px;'> | |
| β’ Processing: {processed_size:,} conversations | |
| {f"(sampled from {original_size:,})" if processed_size < original_size else ""} | |
| <br> | |
| β’ Output directory: <code>{output_dir}</code> | |
| <br> | |
| β’ Status: Extracting properties and clustering... | |
| </div> | |
| </div> | |
| """ | |
| def create_success_html(output_dir: str, n_properties: int, n_models: int) -> str: | |
| """Create HTML for success display.""" | |
| return f""" | |
| <div style='color: #388e3c; background-color: #e8f5e8; padding: 16px; border-radius: 8px; border-left: 4px solid #388e3c;'> | |
| <strong>Pipeline Completed Successfully!</strong><br> | |
| <div style='margin-top: 8px;'> | |
| β’ Extracted properties: {n_properties:,} | |
| <br> | |
| β’ Models analyzed: {n_models} | |
| <br> | |
| β’ Results saved to: <code>{output_dir}</code> | |
| <br><br> | |
| <strong>Results are now loaded in the dashboard!</strong><br> | |
| Switch to other tabs to explore your results: | |
| <br> | |
| <strong>Overview</strong> - Model performance summary | |
| <br> | |
| <strong>View Clusters</strong> - Explore behavior clusters | |
| <br> | |
| <strong>View Examples</strong> - Browse specific examples | |
| <br> | |
| <strong>Plots</strong> - Interactive visualizations | |
| </div> | |
| </div> | |
| """ | |
| def create_results_preview_html(metrics: dict) -> str: | |
| """Create HTML preview of the results.""" | |
| if not metrics or "model_cluster_scores" not in metrics: | |
| return "" | |
| model_scores = metrics["model_cluster_scores"] | |
| n_models = len(model_scores) | |
| # Get top models by some metric (if available) | |
| preview_html = f""" | |
| <div style='background-color: #f5f5f5; padding: 16px; border-radius: 8px; margin-top: 16px;'> | |
| <strong>Results Preview</strong><br> | |
| <div style='margin-top: 8px;'> | |
| <strong>Models analyzed:</strong> {n_models}<br> | |
| """ | |
| # Show first few models | |
| model_names = list(model_scores.keys())[:5] | |
| if model_names: | |
| preview_html += f"<strong>Sample models:</strong> {', '.join(model_names)}" | |
| if len(model_scores) > 5: | |
| preview_html += f" and {len(model_scores) - 5} more..." | |
| preview_html += """ | |
| </div> | |
| </div> | |
| """ | |
| return preview_html | |
| def get_directory_contents(directory: str) -> Tuple[List[str], str]: | |
| """ | |
| Get directory contents for dropdown menu. | |
| Args: | |
| directory: Path to directory to list | |
| Returns: | |
| Tuple of (items_choices, empty_string) | |
| items_choices contains both directories (shown with trailing "/") and files | |
| """ | |
| try: | |
| if not os.path.exists(directory) or not os.path.isdir(directory): | |
| error_html = f""" | |
| <div style='color: #d32f2f; padding: 16px;'> | |
| <strong>Error:</strong> Directory not found: {directory} | |
| </div> | |
| """ | |
| return [], "" | |
| # Get directory contents | |
| try: | |
| entries = sorted(os.listdir(directory)) | |
| except PermissionError: | |
| error_html = f""" | |
| <div style='color: #d32f2f; padding: 16px;'> | |
| <strong>Error:</strong> Permission denied accessing: {directory} | |
| </div> | |
| """ | |
| return [], "" | |
| # Separate directories and files, create dropdown choices | |
| directories = [] | |
| files = [] | |
| items_choices = [] | |
| for entry in entries: | |
| if entry.startswith('.'): # Skip hidden files/dirs | |
| continue | |
| full_path = os.path.join(directory, entry) | |
| try: | |
| if os.path.isdir(full_path): | |
| directories.append(entry) | |
| items_choices.append(f"{entry}/") | |
| elif entry.lower().endswith(('.jsonl', '.json', '.csv', '.parquet')): | |
| # Only show supported file types | |
| files.append(entry) | |
| items_choices.append(entry) | |
| except (OSError, PermissionError): | |
| continue # Skip inaccessible items | |
| return items_choices, "" | |
| except Exception as e: | |
| error_html = f""" | |
| <div style='color: #d32f2f; padding: 16px;'> | |
| <strong>Error listing directory:</strong> {str(e)} | |
| </div> | |
| """ | |
| return [], "" |