Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| import os | |
| import glob | |
| import pandas as pd | |
| import gradio as gr | |
| import time | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| import json | |
| from pathlib import Path | |
| # Configuration | |
| DATA_DIR = Path("../data/tiktok_profiles") | |
| CACHE_FILE = Path("../data/tiktok_profiles_combined.parquet") | |
| PROCESSED_FILES_LOG = Path("../data/processed_files.json") | |
| COLUMNS = [ | |
| "id", | |
| "unique_id", | |
| "follower_count", | |
| "nickname", | |
| "video_count", | |
| "following_count", | |
| "signature", | |
| "email", | |
| "bio_link", | |
| "updated_at", | |
| "tt_seller", | |
| "region", | |
| "language", | |
| "url", | |
| ] | |
| def get_processed_files(): | |
| """ | |
| Get the list of already processed files from the log. | |
| Returns a set of filenames that have been processed. | |
| """ | |
| if PROCESSED_FILES_LOG.exists(): | |
| with open(PROCESSED_FILES_LOG, "r") as f: | |
| return set(json.load(f)) | |
| return set() | |
| def update_processed_files(processed_files): | |
| """ | |
| Update the log of processed files. | |
| """ | |
| PROCESSED_FILES_LOG.parent.mkdir(exist_ok=True) | |
| with open(PROCESSED_FILES_LOG, "w") as f: | |
| json.dump(list(processed_files), f) | |
| def load_data(force_reload=False): | |
| """ | |
| Load data from either the cache file or from individual CSV files. | |
| Only processes new files that haven't been processed before. | |
| Returns a pandas DataFrame with all the data. | |
| Args: | |
| force_reload: If True, reprocess all files regardless of whether they've been processed before. | |
| """ | |
| start_time = time.time() | |
| # Get all available CSV files | |
| all_csv_files = {file.name: file for file in DATA_DIR.glob("*.csv")} | |
| # If cache exists and we're not forcing a reload, load from cache | |
| if CACHE_FILE.exists() and not force_reload: | |
| print(f"Loading data from cache file: {CACHE_FILE}") | |
| df = pd.read_parquet(CACHE_FILE) | |
| # Check for new files | |
| processed_files = get_processed_files() | |
| new_files = [ | |
| all_csv_files[name] for name in all_csv_files if name not in processed_files | |
| ] | |
| if not new_files: | |
| print( | |
| f"No new files to process. Data loaded in {time.time() - start_time:.2f} seconds" | |
| ) | |
| return df | |
| print(f"Found {len(new_files)} new files to process") | |
| # Process only the new files | |
| new_dfs = [] | |
| for i, file in enumerate(new_files): | |
| print(f"Loading new file {i+1}/{len(new_files)}: {file.name}") | |
| # Read CSV with optimized settings | |
| chunk_df = pd.read_csv( | |
| file, | |
| dtype={ | |
| "id": "str", | |
| "unique_id": "str", | |
| "follower_count": "Int64", | |
| "nickname": "str", | |
| "video_count": "Int64", | |
| "following_count": "Int64", | |
| "signature": "str", | |
| "email": "str", | |
| "bio_link": "str", | |
| "updated_at": "str", | |
| "tt_seller": "str", | |
| "region": "str", | |
| "language": "str", | |
| "url": "str", | |
| }, | |
| low_memory=False, | |
| ) | |
| new_dfs.append(chunk_df) | |
| processed_files.add(file.name) | |
| if new_dfs: | |
| # Combine new data with existing data | |
| print("Combining new data with existing data...") | |
| new_data = pd.concat(new_dfs, ignore_index=True) | |
| df = pd.concat([df, new_data], ignore_index=True) | |
| # Remove duplicates based on unique_id | |
| df = df.drop_duplicates(subset=["unique_id"], keep="last") | |
| # Save updated data to cache file | |
| print(f"Saving updated data to {CACHE_FILE}") | |
| df.to_parquet(CACHE_FILE, index=False) | |
| # Update the processed files log | |
| update_processed_files(processed_files) | |
| print(f"Data loaded and updated in {time.time() - start_time:.2f} seconds") | |
| return df | |
| # If no cache file or force_reload is True, process all files | |
| print(f"Loading data from CSV files in {DATA_DIR}") | |
| # Get all CSV files | |
| csv_files = list(all_csv_files.values()) | |
| total_files = len(csv_files) | |
| print(f"Found {total_files} CSV files") | |
| # Load data in chunks | |
| dfs = [] | |
| processed_files = set() | |
| for i, file in enumerate(csv_files): | |
| if i % 10 == 0: | |
| print(f"Loading file {i+1}/{total_files}: {file.name}") | |
| # Read CSV with optimized settings | |
| chunk_df = pd.read_csv( | |
| file, | |
| dtype={ | |
| "id": "str", | |
| "unique_id": "str", | |
| "follower_count": "Int64", | |
| "nickname": "str", | |
| "video_count": "Int64", | |
| "following_count": "Int64", | |
| "signature": "str", | |
| "email": "str", | |
| "bio_link": "str", | |
| "updated_at": "str", | |
| "tt_seller": "str", | |
| "region": "str", | |
| "language": "str", | |
| "url": "str", | |
| }, | |
| low_memory=False, | |
| ) | |
| dfs.append(chunk_df) | |
| processed_files.add(file.name) | |
| # Combine all dataframes | |
| print("Combining all dataframes...") | |
| df = pd.concat(dfs, ignore_index=True) | |
| # Remove duplicates based on unique_id | |
| df = df.drop_duplicates(subset=["unique_id"], keep="last") | |
| # Save to cache file | |
| print(f"Saving combined data to {CACHE_FILE}") | |
| CACHE_FILE.parent.mkdir(exist_ok=True) | |
| df.to_parquet(CACHE_FILE, index=False) | |
| # Update the processed files log | |
| update_processed_files(processed_files) | |
| print(f"Data loaded and cached in {time.time() - start_time:.2f} seconds") | |
| return df | |
| def search_by_username(df, username): | |
| """Search for profiles by username (unique_id)""" | |
| if not username: | |
| return pd.DataFrame() | |
| # Case-insensitive search | |
| results = df[df["unique_id"].str.lower().str.contains(username.lower(), na=False)] | |
| return results.head(100) # Limit results to prevent UI overload | |
| def search_by_nickname(df, nickname): | |
| """Search for profiles by nickname""" | |
| if not nickname: | |
| return pd.DataFrame() | |
| # Case-insensitive search | |
| results = df[df["nickname"].str.lower().str.contains(nickname.lower(), na=False)] | |
| return results.head(100) # Limit results to prevent UI overload | |
| def search_by_follower_count(df, min_followers, max_followers): | |
| """Search for profiles by follower count range""" | |
| if min_followers is None: | |
| min_followers = 0 | |
| if max_followers is None: | |
| max_followers = df["follower_count"].max() | |
| results = df[ | |
| (df["follower_count"] >= min_followers) | |
| & (df["follower_count"] <= max_followers) | |
| ] | |
| return results.head(100) # Limit results to prevent UI overload | |
| def format_results(df): | |
| """Format the results for display""" | |
| if df.empty: | |
| # Return an empty DataFrame with the same columns instead of a string | |
| return pd.DataFrame(columns=df.columns) | |
| # Format the DataFrame for display | |
| display_df = df.copy() | |
| # Convert follower count to human-readable format | |
| def format_number(num): | |
| if pd.isna(num): | |
| return "N/A" | |
| if num >= 1_000_000: | |
| return f"{num/1_000_000:.1f}M" | |
| elif num >= 1_000: | |
| return f"{num/1_000:.1f}K" | |
| return str(num) | |
| display_df["follower_count"] = display_df["follower_count"].apply(format_number) | |
| display_df["video_count"] = display_df["video_count"].apply(format_number) | |
| display_df["following_count"] = display_df["following_count"].apply(format_number) | |
| return display_df | |
| def combined_search( | |
| df, | |
| min_followers, | |
| max_followers, | |
| min_videos, | |
| max_videos, | |
| signature_query, | |
| region, | |
| has_email, | |
| ): | |
| """Combined search function using all criteria""" | |
| results = df.copy() | |
| # Apply each filter if provided | |
| if min_followers is not None: | |
| results = results[results["follower_count"] >= min_followers] | |
| if max_followers is not None: | |
| results = results[results["follower_count"] <= max_followers] | |
| if min_videos is not None: | |
| results = results[results["video_count"] >= min_videos] | |
| if max_videos is not None: | |
| results = results[results["video_count"] <= max_videos] | |
| if signature_query: | |
| results = results[ | |
| results["signature"] | |
| .str.lower() | |
| .str.contains(signature_query.lower(), na=False) | |
| ] | |
| if region: | |
| results = results[results["region"].str.lower() == region.lower()] | |
| # Filter for profiles with email | |
| if has_email: | |
| results = results[results["email"].notna() & (results["email"] != "")] | |
| return results.head(1000) # Limit to 1000 results to prevent UI overload | |
| def create_interface(df): | |
| """Create the Gradio interface""" | |
| # Get min and max follower counts for slider | |
| min_followers_global = max(1000, int(df["follower_count"].min())) | |
| max_followers_global = min(10000000, int(df["follower_count"].max())) | |
| # Get min and max video counts for slider | |
| min_videos_global = max(1, int(df["video_count"].min())) | |
| max_videos_global = min(10000, int(df["video_count"].max())) | |
| # Get unique regions for dropdown | |
| regions = sorted(df["region"].dropna().unique().tolist()) | |
| regions = [""] + regions # Add empty option | |
| with gr.Blocks(title="TikTok Creator Analyzer") as interface: | |
| gr.Markdown("# TikTok Creator Analyzer") | |
| gr.Markdown(f"Database contains {len(df):,} creator profiles") | |
| # Show top 100 profiles by default | |
| top_profiles = df.sort_values(by="follower_count", ascending=False).head(100) | |
| default_view = format_results(top_profiles) | |
| with gr.Tab("Overview"): | |
| gr.Markdown("## Top 100 Profiles by Follower Count") | |
| overview_results = gr.Dataframe(value=default_view, label="Top Profiles") | |
| refresh_btn = gr.Button("Refresh") | |
| refresh_btn.click( | |
| fn=lambda: format_results( | |
| df.sort_values(by="follower_count", ascending=False).head(100) | |
| ), | |
| inputs=[], | |
| outputs=overview_results, | |
| ) | |
| with gr.Tab("Advanced Search"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Follower Count") | |
| min_followers_slider = gr.Slider( | |
| minimum=min_followers_global, | |
| maximum=max_followers_global, | |
| value=min_followers_global, | |
| step=1000, | |
| label="Minimum Followers", | |
| interactive=True, | |
| ) | |
| max_followers_slider = gr.Slider( | |
| minimum=min_followers_global, | |
| maximum=max_followers_global, | |
| value=max_followers_global, | |
| step=1000, | |
| label="Maximum Followers", | |
| interactive=True, | |
| ) | |
| gr.Markdown("### Video Count") | |
| min_videos_slider = gr.Slider( | |
| minimum=min_videos_global, | |
| maximum=max_videos_global, | |
| value=min_videos_global, | |
| step=10, | |
| label="Minimum Videos", | |
| interactive=True, | |
| ) | |
| max_videos_slider = gr.Slider( | |
| minimum=min_videos_global, | |
| maximum=max_videos_global, | |
| value=max_videos_global, | |
| step=10, | |
| label="Maximum Videos", | |
| interactive=True, | |
| ) | |
| with gr.Column(scale=1): | |
| signature_input = gr.Textbox(label="Keywords in Signature") | |
| region_input = gr.Dropdown(label="Region", choices=regions) | |
| has_email_checkbox = gr.Checkbox(label="Has Email", value=False) | |
| search_btn = gr.Button("Search", variant="primary", size="lg") | |
| results_count = gr.Markdown("### Results: 0 profiles found") | |
| # Create a dataframe with download button | |
| with gr.Row(): | |
| search_results = gr.Dataframe(label="Results") | |
| download_btn = gr.Button("Download Results as CSV") | |
| # Function to update results count | |
| def update_results_count(results_df): | |
| count = len(results_df) | |
| return f"### Results: {count:,} profiles found" | |
| # Function to perform search and update results | |
| def perform_search( | |
| min_followers, | |
| max_followers, | |
| min_videos, | |
| max_videos, | |
| signature, | |
| region, | |
| has_email, | |
| ): | |
| results = combined_search( | |
| df, | |
| min_followers, | |
| max_followers, | |
| min_videos, | |
| max_videos, | |
| signature, | |
| region, | |
| has_email, | |
| ) | |
| formatted_results = format_results(results) | |
| count_text = update_results_count(results) | |
| return formatted_results, count_text | |
| # Function to download results as CSV | |
| def download_results(results_df): | |
| if results_df.empty: | |
| return None | |
| # Convert back to original format for download | |
| download_df = df[df["unique_id"].isin(results_df["unique_id"])] | |
| # Save to temporary CSV file | |
| temp_csv = "temp_results.csv" | |
| download_df.to_csv(temp_csv, index=False) | |
| return temp_csv | |
| # Connect the search button | |
| search_btn.click( | |
| fn=perform_search, | |
| inputs=[ | |
| min_followers_slider, | |
| max_followers_slider, | |
| min_videos_slider, | |
| max_videos_slider, | |
| signature_input, | |
| region_input, | |
| has_email_checkbox, | |
| ], | |
| outputs=[search_results, results_count], | |
| ) | |
| # Connect the download button | |
| download_btn.click( | |
| fn=download_results, | |
| inputs=[search_results], | |
| outputs=[gr.File(label="Download")], | |
| ) | |
| with gr.Tab("Statistics"): | |
| gr.Markdown("## Database Statistics") | |
| # Calculate some basic statistics | |
| total_creators = len(df) | |
| total_followers = df["follower_count"].sum() | |
| avg_followers = df["follower_count"].mean() | |
| median_followers = df["follower_count"].median() | |
| max_followers = df["follower_count"].max() | |
| stats_md = f""" | |
| - Total Creators: {total_creators:,} | |
| - Total Followers: {total_followers:,} | |
| - Average Followers: {avg_followers:,.2f} | |
| - Median Followers: {median_followers:,} | |
| - Max Followers: {max_followers:,} | |
| """ | |
| gr.Markdown(stats_md) | |
| with gr.Tab("Maintenance"): | |
| gr.Markdown("## Database Maintenance") | |
| # Get processed files info | |
| processed_files = get_processed_files() | |
| maintenance_md = f""" | |
| - Total processed files: {len(processed_files)} | |
| - Last update: {time.ctime(CACHE_FILE.stat().st_mtime) if CACHE_FILE.exists() else 'Never'} | |
| """ | |
| gr.Markdown(maintenance_md) | |
| with gr.Row(): | |
| force_reload_btn = gr.Button("Force Reload All Files") | |
| reload_status = gr.Markdown("Click to reload all files from scratch") | |
| def reload_all_files(): | |
| return "Reloading all files... This may take a while. Please restart the application." | |
| force_reload_btn.click( | |
| fn=reload_all_files, inputs=[], outputs=reload_status | |
| ) | |
| return interface | |
| def main(): | |
| print("Loading TikTok creator data...") | |
| df = load_data() | |
| print(f"Loaded {len(df):,} creator profiles") | |
| # Create and launch the interface | |
| interface = create_interface(df) | |
| interface.launch(share=True, server_name="0.0.0.0") | |
| if __name__ == "__main__": | |
| main() | |