diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,2437 +1,1495 @@
#!/usr/bin/env python3
"""
-Crypto API Monitor ULTIMATE - Real API Integration
-Complete professional monitoring system with 100+ real free crypto APIs
+Crypto Data Aggregator - Complete Gradio Dashboard
+6-tab comprehensive interface for cryptocurrency data analysis
"""
-from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request
-from fastapi.responses import HTMLResponse, FileResponse, Response
-from fastapi.staticfiles import StaticFiles
-from fastapi.middleware.cors import CORSMiddleware
-from pydantic import BaseModel
-from typing import List, Dict, Optional, Literal
-import asyncio
-import aiohttp
-import random
+import gradio as gr
+import pandas as pd
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+from datetime import datetime, timedelta
import json
+import threading
+import time
import logging
-from datetime import datetime, timedelta
-import uvicorn
-from collections import defaultdict
-import os
-from urllib.parse import urljoin, unquote
-from pathlib import Path
-from threading import Lock
-
-from database import Database
-from config import config as global_config
-from starlette.middleware.trustedhost import TrustedHostMiddleware
-from backend.feature_flags import feature_flags, is_feature_enabled
-
-class SentimentRequest(BaseModel):
- texts: List[str]
-
-class PoolCreate(BaseModel):
- name: str
- category: str
- rotation_strategy: str = "round_robin"
- description: Optional[str] = None
-
-class PoolMemberAdd(BaseModel):
- provider_id: str
- priority: int = 1
- weight: int = 1
-
-class ProviderCreateRequest(BaseModel):
- name: str
- category: str
- endpoint_url: str
- requires_key: bool = False
- api_key: Optional[str] = None
- rate_limit: Optional[str] = None
- timeout_ms: int = 10000
- health_check_endpoint: Optional[str] = None
- notes: Optional[str] = None
-
-
-class HFRegistryItemCreate(BaseModel):
- id: str
- kind: Literal["model", "dataset"]
- description: Optional[str] = None
- downloads: Optional[int] = None
- likes: Optional[int] = None
-
-class FeatureFlagUpdate(BaseModel):
- flag_name: str
- value: bool
-
-class FeatureFlagsUpdate(BaseModel):
- flags: Dict[str, bool]
-
-logger = logging.getLogger("crypto_monitor")
-
-
-app = FastAPI(title="Crypto Monitor Ultimate", version="3.0.0")
-
-
-def _split_env_list(value: Optional[str]) -> List[str]:
- if not value:
- return []
- return [item.strip() for item in value.split(",") if item.strip()]
-
-
-allowed_origins_env = os.getenv("ALLOWED_ORIGINS", "")
-allowed_origin_regex_env = os.getenv("ALLOWED_ORIGIN_REGEX")
-allowed_origins = _split_env_list(allowed_origins_env)
-
-cors_kwargs = {
- "allow_methods": ["*"],
- "allow_headers": ["*"],
- "allow_credentials": True,
-}
-
-if allowed_origin_regex_env:
- cors_kwargs["allow_origin_regex"] = allowed_origin_regex_env
-elif not allowed_origins or "*" in allowed_origins:
- cors_kwargs["allow_origin_regex"] = ".*"
-else:
- cors_kwargs["allow_origins"] = allowed_origins
-
-app.add_middleware(CORSMiddleware, **cors_kwargs)
-
-trusted_hosts = _split_env_list(os.getenv("TRUSTED_HOSTS"))
-if not trusted_hosts:
- trusted_hosts = ["*"]
-app.add_middleware(TrustedHostMiddleware, allowed_hosts=trusted_hosts)
-
-
-CUSTOM_REGISTRY_PATH = Path("data/custom_registry.json")
-_registry_lock = Lock()
-_custom_registry: Dict[str, List[Dict]] = {
- "providers": [],
- "hf_models": [],
- "hf_datasets": []
-}
-
-
-def _load_custom_registry() -> Dict[str, List[Dict]]:
- if not CUSTOM_REGISTRY_PATH.exists():
- return {
- "providers": [],
- "hf_models": [],
- "hf_datasets": []
- }
- try:
- with CUSTOM_REGISTRY_PATH.open("r", encoding="utf-8") as f:
- data = json.load(f)
- return {
- "providers": data.get("providers", []),
- "hf_models": data.get("hf_models", []),
- "hf_datasets": data.get("hf_datasets", []),
- }
- except Exception:
- return {
- "providers": [],
- "hf_models": [],
- "hf_datasets": []
- }
+from typing import List, Dict, Optional, Tuple, Any
+import traceback
+# Import local modules
+import config
+import database
+import collectors
+import ai_models
+import utils
-def _save_custom_registry() -> None:
- CUSTOM_REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True)
- with CUSTOM_REGISTRY_PATH.open("w", encoding="utf-8") as f:
- json.dump(_custom_registry, f, ensure_ascii=False, indent=2)
+# Setup logging
+logger = utils.setup_logging()
+# Initialize database
+db = database.get_database()
-def _refresh_custom_registry() -> None:
- global _custom_registry
- with _registry_lock:
- _custom_registry = _load_custom_registry()
+# Global state for background collection
+_collection_started = False
+_collection_lock = threading.Lock()
+# ==================== TAB 1: LIVE DASHBOARD ====================
-_refresh_custom_registry()
+def get_live_dashboard(search_filter: str = "") -> pd.DataFrame:
+ """
+ Get live dashboard data with top 100 cryptocurrencies
-# WebSocket Manager
-class ConnectionManager:
- def __init__(self):
- self.active_connections: List[WebSocket] = []
+ Args:
+ search_filter: Search/filter text for cryptocurrencies
- async def connect(self, websocket: WebSocket):
- await websocket.accept()
- self.active_connections.append(websocket)
+ Returns:
+ DataFrame with formatted cryptocurrency data
+ """
+ try:
+ logger.info("Fetching live dashboard data...")
+
+ # Get latest prices from database
+ prices = db.get_latest_prices(100)
+
+ if not prices:
+ logger.warning("No price data available")
+ return pd.DataFrame({
+ "Rank": [],
+ "Name": [],
+ "Symbol": [],
+ "Price (USD)": [],
+ "24h Change (%)": [],
+ "Volume": [],
+ "Market Cap": []
+ })
- def disconnect(self, websocket: WebSocket):
- self.active_connections.remove(websocket)
+ # Convert to DataFrame
+ df_data = []
+ for price in prices:
+ # Apply search filter if provided
+ if search_filter:
+ search_lower = search_filter.lower()
+ name_lower = (price.get('name') or '').lower()
+ symbol_lower = (price.get('symbol') or '').lower()
+
+ if search_lower not in name_lower and search_lower not in symbol_lower:
+ continue
+
+ df_data.append({
+ "Rank": price.get('rank', 999),
+ "Name": price.get('name', 'Unknown'),
+ "Symbol": price.get('symbol', 'N/A').upper(),
+ "Price (USD)": f"${price.get('price_usd', 0):,.2f}" if price.get('price_usd') else "N/A",
+ "24h Change (%)": f"{price.get('percent_change_24h', 0):+.2f}%" if price.get('percent_change_24h') is not None else "N/A",
+ "Volume": utils.format_number(price.get('volume_24h', 0)),
+ "Market Cap": utils.format_number(price.get('market_cap', 0))
+ })
- async def broadcast(self, message: dict):
- for connection in self.active_connections:
- try:
- await connection.send_json(message)
- except:
- pass
-
-manager = ConnectionManager()
-
-db = Database("data/crypto_monitor.db")
-
-# API Provider Configuration - Real Free APIs
-API_PROVIDERS = {
- "market_data": [
- {
- "name": "CoinGecko",
- "base_url": "https://api.coingecko.com/api/v3",
- "endpoints": {
- "coins_markets": "/coins/markets",
- "simple_price": "/simple/price",
- "global": "/global",
- "trending": "/search/trending"
- },
- "auth": None,
- "rate_limit": "50/min",
- "status": "active"
- },
- {
- "name": "CoinCap",
- "base_url": "https://api.coincap.io/v2",
- "endpoints": {
- "assets": "/assets",
- "rates": "/rates"
- },
- "auth": None,
- "rate_limit": "200/min",
- "status": "active"
- },
- {
- "name": "CoinStats",
- "base_url": "https://api.coinstats.app",
- "endpoints": {
- "coins": "/public/v1/coins",
- "charts": "/public/v1/charts"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- },
- {
- "name": "Cryptorank",
- "base_url": "https://api.cryptorank.io/v1",
- "endpoints": {
- "currencies": "/currencies"
- },
- "auth": None,
- "rate_limit": "100/min",
- "status": "active"
- }
- ],
- "exchanges": [
- {
- "name": "Binance",
- "base_url": "https://api.binance.com/api/v3",
- "endpoints": {
- "ticker": "/ticker/24hr",
- "price": "/ticker/price"
- },
- "auth": None,
- "rate_limit": "1200/min",
- "status": "active"
- },
- {
- "name": "Coinbase",
- "base_url": "https://api.coinbase.com/v2",
- "endpoints": {
- "prices": "/prices",
- "exchange_rates": "/exchange-rates"
- },
- "auth": None,
- "rate_limit": "10000/hour",
- "status": "active"
- },
- {
- "name": "Kraken",
- "base_url": "https://api.kraken.com/0/public",
- "endpoints": {
- "ticker": "/Ticker",
- "trades": "/Trades"
- },
- "auth": None,
- "rate_limit": "1/sec",
- "status": "active"
- }
- ],
- "news": [
- {
- "name": "CoinStats News",
- "base_url": "https://api.coinstats.app",
- "endpoints": {
- "feed": "/public/v1/news"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- },
- {
- "name": "CoinDesk RSS",
- "base_url": "https://www.coindesk.com",
- "endpoints": {
- "rss": "/arc/outboundfeeds/rss/?outputType=xml"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- },
- {
- "name": "Cointelegraph RSS",
- "base_url": "https://cointelegraph.com",
- "endpoints": {
- "rss": "/rss"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- }
- ],
- "sentiment": [
- {
- "name": "Alternative.me Fear & Greed",
- "base_url": "https://api.alternative.me",
- "endpoints": {
- "fng": "/fng/?limit=1&format=json"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- }
- ],
- "defi": [
- {
- "name": "DeFi Llama",
- "base_url": "https://api.llama.fi",
- "endpoints": {
- "protocols": "/protocols",
- "tvl": "/tvl"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- },
- {
- "name": "1inch",
- "base_url": "https://api.1inch.io/v5.0/1",
- "endpoints": {
- "quote": "/quote"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- }
- ],
- "blockchain": [
- {
- "name": "Blockscout Ethereum",
- "base_url": "https://eth.blockscout.com/api",
- "endpoints": {
- "balance": "/v2/addresses"
- },
- "auth": None,
- "rate_limit": "unlimited",
- "status": "active"
- },
- {
- "name": "Ethplorer",
- "base_url": "https://api.ethplorer.io",
- "endpoints": {
- "address": "/getAddressInfo"
- },
- "auth": {"type": "query", "key": "freekey"},
- "rate_limit": "limited",
- "status": "active"
- }
- ]
-}
-
-# Fallback data used when upstream APIs یا پایگاه داده در دسترس نیستند
-DEFI_FALLBACK = [
- {
- "name": "Sample Protocol",
- "tvl": 0.0,
- "change_24h": 0.0,
- "chain": "N/A",
- }
-]
-
-# Health check configuration
-HEALTH_TESTS = {
- "CoinGecko": {"path": "/ping"},
- "CoinCap": {"path": "/assets/bitcoin", "params": {"limit": 1}},
- "CoinStats": {"path": "/public/v1/coins", "params": {"skip": 0, "limit": 1}},
- "CoinStats News": {"path": "/public/v1/news", "params": {"skip": 0, "limit": 1}},
- "Cryptorank": {"path": "/currencies"},
- "Binance": {"path": "/ping"},
- "Coinbase": {"path": "/exchange-rates"},
- "Kraken": {"path": "/SystemStatus"},
- "Alternative.me Fear & Greed": {"path": "/fng/", "params": {"limit": 1, "format": "json"}},
- "DeFi Llama": {"path": "/protocols"},
- "1inch": {"path": "/tokens"},
- "Blockscout Ethereum": {"path": "/v2/stats"},
- "Ethplorer": {"path": "/getTop", "params": {"apikey": "freekey"}},
- "CoinDesk RSS": {"path": "/arc/outboundfeeds/rss/?outputType=xml"},
- "Cointelegraph RSS": {"path": "/rss"}
-}
-
-KEY_HEADER_MAP = {
- "CoinMarketCap": ("X-CMC_PRO_API_KEY", "plain"),
- "CryptoCompare": ("Authorization", "apikey")
-}
-
-KEY_QUERY_MAP = {
- "Etherscan": "apikey",
- "BscScan": "apikey",
- "TronScan": "apikey"
-}
-
-HEALTH_CACHE_TTL = 120 # seconds
-provider_health_cache: Dict[str, Dict] = {}
-
-
-def provider_slug(name: str) -> str:
- return name.lower().replace(" ", "_")
-
-
-def _get_custom_providers() -> List[Dict]:
- with _registry_lock:
- return [dict(provider) for provider in _custom_registry.get("providers", [])]
-
-
-def _add_custom_provider(payload: Dict) -> Dict:
- slug = provider_slug(payload["name"])
- with _registry_lock:
- existing = _custom_registry.setdefault("providers", [])
- if any(provider_slug(item.get("name", "")) == slug for item in existing):
- raise ValueError("Provider already exists")
- existing.append(payload)
- _save_custom_registry()
- return payload
-
-
-def _remove_custom_provider(slug: str) -> bool:
- removed = False
- with _registry_lock:
- providers = _custom_registry.setdefault("providers", [])
- new_list = []
- for item in providers:
- if provider_slug(item.get("name", "")) == slug:
- removed = True
- continue
- new_list.append(item)
- if removed:
- _custom_registry["providers"] = new_list
- _save_custom_registry()
- return removed
-
-
-def _get_custom_hf(kind: Literal["models", "datasets"]) -> List[Dict]:
- key = "hf_models" if kind == "models" else "hf_datasets"
- with _registry_lock:
- return [dict(item) for item in _custom_registry.get(key, [])]
-
-
-def _add_custom_hf_item(kind: Literal["models", "datasets"], payload: Dict) -> Dict:
- key = "hf_models" if kind == "models" else "hf_datasets"
- identifier = payload.get("id") or payload.get("name")
- if not identifier:
- raise ValueError("id is required")
- with _registry_lock:
- collection = _custom_registry.setdefault(key, [])
- if any((item.get("id") or item.get("name")) == identifier for item in collection):
- raise ValueError("Item already exists")
- collection.append(payload)
- _save_custom_registry()
- return payload
-
-
-def _remove_custom_hf_item(kind: Literal["models", "datasets"], identifier: str) -> bool:
- key = "hf_models" if kind == "models" else "hf_datasets"
- removed = False
- with _registry_lock:
- collection = _custom_registry.setdefault(key, [])
- filtered = []
- for item in collection:
- if (item.get("id") or item.get("name")) == identifier:
- removed = True
- continue
- filtered.append(item)
- if removed:
- _custom_registry[key] = filtered
- _save_custom_registry()
- return removed
-
-
-def assemble_providers() -> List[Dict]:
- providers: List[Dict] = []
- seen = set()
-
- for category, provider_list in API_PROVIDERS.items():
- for provider in provider_list:
- entry = {
- "name": provider["name"],
- "category": category,
- "base_url": provider["base_url"],
- "endpoints": provider.get("endpoints", {}),
- "health_endpoint": provider.get("health_endpoint"),
- "requires_key": False,
- "api_key": None,
- "timeout_ms": 10000
- }
+ df = pd.DataFrame(df_data)
+
+ if df.empty:
+ logger.warning("No data matches filter criteria")
+ return pd.DataFrame({
+ "Rank": [],
+ "Name": [],
+ "Symbol": [],
+ "Price (USD)": [],
+ "24h Change (%)": [],
+ "Volume": [],
+ "Market Cap": []
+ })
- cfg = global_config.get_provider(provider["name"])
- if cfg:
- entry["health_endpoint"] = cfg.health_check_endpoint
- entry["requires_key"] = cfg.requires_key
- entry["api_key"] = cfg.api_key
- entry["timeout_ms"] = cfg.timeout_ms
-
- providers.append(entry)
- seen.add(provider_slug(provider["name"]))
-
- for cfg in global_config.get_all_providers():
- slug = provider_slug(cfg.name)
- if slug in seen:
- continue
-
- providers.append({
- "name": cfg.name,
- "category": cfg.category,
- "base_url": cfg.endpoint_url,
- "endpoints": {},
- "health_endpoint": cfg.health_check_endpoint,
- "requires_key": cfg.requires_key,
- "api_key": cfg.api_key,
- "timeout_ms": cfg.timeout_ms
- })
+ # Sort by rank
+ df = df.sort_values('Rank')
+
+ logger.info(f"Dashboard loaded with {len(df)} cryptocurrencies")
+ return df
- for custom in _get_custom_providers():
- slug = provider_slug(custom.get("name", ""))
- if not slug or slug in seen:
- continue
- providers.append({
- "name": custom.get("name"),
- "category": custom.get("category", "custom"),
- "base_url": custom.get("base_url") or custom.get("endpoint_url"),
- "endpoints": custom.get("endpoints", {}),
- "health_endpoint": custom.get("health_endpoint") or custom.get("base_url"),
- "requires_key": custom.get("requires_key", False),
- "api_key": custom.get("api_key"),
- "timeout_ms": custom.get("timeout_ms", 10000),
- "rate_limit": custom.get("rate_limit"),
- "notes": custom.get("notes"),
+ except Exception as e:
+ logger.error(f"Error in get_live_dashboard: {e}\n{traceback.format_exc()}")
+ return pd.DataFrame({
+ "Error": [f"Failed to load dashboard: {str(e)}"]
})
- seen.add(slug)
-
- return providers
-
-# Cache for API responses
-cache = {
- "market_data": {"data": None, "timestamp": None, "ttl": 60},
- "news": {"data": None, "timestamp": None, "ttl": 300},
- "sentiment": {"data": None, "timestamp": None, "ttl": 3600},
- "defi": {"data": None, "timestamp": None, "ttl": 300}
-}
-
-# Smart Proxy Mode - Cache which providers need proxy
-provider_proxy_cache: Dict[str, Dict] = {}
-
-# CORS proxy list (from config)
-CORS_PROXIES = [
- 'https://api.allorigins.win/get?url=',
- 'https://proxy.cors.sh/',
- 'https://corsproxy.io/?',
-]
-
-def should_use_proxy(provider_name: str) -> bool:
- """Check if a provider should use proxy based on past failures"""
- if not is_feature_enabled("enableProxyAutoMode"):
- return False
-
- cached = provider_proxy_cache.get(provider_name)
- if not cached:
- return False
-
- # Check if cache is still valid (5 minutes)
- if (datetime.now() - cached.get("timestamp", datetime.now())).total_seconds() > 300:
- # Cache expired, remove it
- provider_proxy_cache.pop(provider_name, None)
- return False
-
- return cached.get("use_proxy", False)
-
-def mark_provider_needs_proxy(provider_name: str):
- """Mark a provider as needing proxy"""
- provider_proxy_cache[provider_name] = {
- "use_proxy": True,
- "timestamp": datetime.now(),
- "reason": "Network error or CORS issue"
- }
- logger.info(f"Provider '{provider_name}' marked for proxy routing")
-def mark_provider_direct_ok(provider_name: str):
- """Mark a provider as working with direct connection"""
- if provider_name in provider_proxy_cache:
- provider_proxy_cache.pop(provider_name)
- logger.info(f"Provider '{provider_name}' restored to direct routing")
-async def fetch_with_proxy(session, url: str, proxy_url: str = None):
- """Fetch data through a CORS proxy"""
- if not proxy_url:
- proxy_url = CORS_PROXIES[0] # Default to first proxy
+def refresh_price_data() -> Tuple[pd.DataFrame, str]:
+ """
+ Manually trigger price data collection and refresh dashboard
+ Returns:
+ Tuple of (DataFrame, status_message)
+ """
try:
- proxied_url = f"{proxy_url}{url}"
- async with session.get(proxied_url, timeout=aiohttp.ClientTimeout(total=15)) as response:
- if response.status == 200:
- data = await response.json()
- # Some proxies wrap the response
- if isinstance(data, dict) and "contents" in data:
- return json.loads(data["contents"])
- return data
- return None
+ logger.info("Manual refresh triggered...")
+
+ # Collect fresh price data
+ success, count = collectors.collect_price_data()
+
+ if success:
+ message = f"✅ Successfully refreshed! Collected {count} price records."
+ else:
+ message = f"⚠️ Refresh completed with warnings. Collected {count} records."
+
+ # Return updated dashboard
+ df = get_live_dashboard()
+
+ return df, message
+
except Exception as e:
- logger.debug(f"Proxy fetch failed for {url}: {e}")
- return None
+ logger.error(f"Error in refresh_price_data: {e}")
+ return get_live_dashboard(), f"❌ Refresh failed: {str(e)}"
+
+
+# ==================== TAB 2: HISTORICAL CHARTS ====================
+
+def get_available_symbols() -> List[str]:
+ """Get list of available cryptocurrency symbols from database"""
+ try:
+ prices = db.get_latest_prices(100)
+ symbols = sorted(list(set([
+ f"{p.get('name', 'Unknown')} ({p.get('symbol', 'N/A').upper()})"
+ for p in prices if p.get('symbol')
+ ])))
+
+ if not symbols:
+ return ["BTC", "ETH", "BNB"]
+
+ return symbols
+
+ except Exception as e:
+ logger.error(f"Error getting symbols: {e}")
+ return ["BTC", "ETH", "BNB"]
-async def smart_fetch(session, url: str, provider_name: str = None, retries=3):
- """
- Smart fetch with automatic proxy fallback
- Flow:
- 1. If provider is marked for proxy -> use proxy directly
- 2. Otherwise, try direct connection
- 3. On failure (timeout, CORS, 403, connection error) -> fallback to proxy
- 4. Cache the proxy decision for the provider
+def generate_chart(symbol_display: str, timeframe: str) -> go.Figure:
"""
- # Check if we should go through proxy directly
- if provider_name and should_use_proxy(provider_name):
- logger.debug(f"Using proxy for {provider_name} (cached decision)")
- return await fetch_with_proxy(session, url)
+ Generate interactive plotly chart with price history and technical indicators
- # Try direct connection first
- for attempt in range(retries):
- try:
- async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
- if response.status == 200:
- # Success! Mark provider as working directly
- if provider_name:
- mark_provider_direct_ok(provider_name)
- return await response.json()
- elif response.status == 429: # Rate limit
- await asyncio.sleep(2 ** attempt)
- elif response.status in [403, 451]: # Forbidden or CORS
- # Try proxy fallback
- if provider_name:
- mark_provider_needs_proxy(provider_name)
- logger.info(f"HTTP {response.status} on {url}, trying proxy...")
- return await fetch_with_proxy(session, url)
- else:
- return None
- except asyncio.TimeoutError:
- # Timeout - try proxy on last attempt
- if attempt == retries - 1 and provider_name:
- mark_provider_needs_proxy(provider_name)
- logger.info(f"Timeout on {url}, trying proxy...")
- return await fetch_with_proxy(session, url)
- await asyncio.sleep(1)
- except aiohttp.ClientError as e:
- # Network error (connection refused, CORS, etc) - try proxy
- if "CORS" in str(e) or "Connection" in str(e) or "SSL" in str(e):
- if provider_name:
- mark_provider_needs_proxy(provider_name)
- logger.info(f"Network error on {url} ({e}), trying proxy...")
- return await fetch_with_proxy(session, url)
- if attempt == retries - 1:
- logger.debug(f"Error fetching {url}: {e}")
- return None
- await asyncio.sleep(1)
- except Exception as e:
- if attempt == retries - 1:
- logger.debug(f"Error fetching {url}: {e}")
- return None
- await asyncio.sleep(1)
-
- return None
-
-# Keep old function for backward compatibility
-async def fetch_with_retry(session, url, retries=3):
- """Fetch data with retry mechanism (uses smart_fetch internally)"""
- return await smart_fetch(session, url, retries=retries)
-
-def is_cache_valid(cache_entry):
- """Check if cache is still valid"""
- if cache_entry["data"] is None or cache_entry["timestamp"] is None:
- return False
- elapsed = (datetime.now() - cache_entry["timestamp"]).total_seconds()
- return elapsed < cache_entry["ttl"]
-
-async def get_market_data():
- """Fetch real market data from multiple sources"""
- if is_cache_valid(cache["market_data"]):
- return cache["market_data"]["data"]
-
- async with aiohttp.ClientSession() as session:
- # Try CoinGecko first
- url = "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=50&page=1"
- data = await fetch_with_retry(session, url)
-
- if data:
- formatted_data = []
- for coin in data[:20]:
- formatted_data.append({
- "symbol": coin.get("symbol", "").upper(),
- "name": coin.get("name", ""),
- "price": coin.get("current_price", 0),
- "change_24h": coin.get("price_change_percentage_24h", 0),
- "market_cap": coin.get("market_cap", 0),
- "volume_24h": coin.get("total_volume", 0),
- "rank": coin.get("market_cap_rank", 0),
- "image": coin.get("image", "")
- })
-
- cache["market_data"]["data"] = formatted_data
- cache["market_data"]["timestamp"] = datetime.now()
- return formatted_data
-
- # Fallback to CoinCap
- url = "https://api.coincap.io/v2/assets?limit=20"
- data = await fetch_with_retry(session, url)
-
- if data and "data" in data:
- formatted_data = []
- for coin in data["data"]:
- formatted_data.append({
- "symbol": coin.get("symbol", "").upper(),
- "name": coin.get("name", ""),
- "price": float(coin.get("priceUsd", 0)),
- "change_24h": float(coin.get("changePercent24Hr", 0)),
- "market_cap": float(coin.get("marketCapUsd", 0)),
- "volume_24h": float(coin.get("volumeUsd24Hr", 0)),
- "rank": int(coin.get("rank", 0)),
- "image": ""
- })
-
- cache["market_data"]["data"] = formatted_data
- cache["market_data"]["timestamp"] = datetime.now()
- return formatted_data
-
- return []
-
-async def get_global_stats():
- """Fetch global crypto market statistics"""
- async with aiohttp.ClientSession() as session:
- # CoinGecko global data
- url = "https://api.coingecko.com/api/v3/global"
- data = await fetch_with_retry(session, url)
-
- if data and "data" in data:
- global_data = data["data"]
- return {
- "total_market_cap": global_data.get("total_market_cap", {}).get("usd", 0),
- "total_volume": global_data.get("total_volume", {}).get("usd", 0),
- "btc_dominance": global_data.get("market_cap_percentage", {}).get("btc", 0),
- "eth_dominance": global_data.get("market_cap_percentage", {}).get("eth", 0),
- "active_cryptocurrencies": global_data.get("active_cryptocurrencies", 0),
- "markets": global_data.get("markets", 0)
- }
-
- return {
- "total_market_cap": 0,
- "total_volume": 0,
- "btc_dominance": 0,
- "eth_dominance": 0,
- "active_cryptocurrencies": 0,
- "markets": 0
- }
+ Args:
+ symbol_display: Display name like "Bitcoin (BTC)"
+ timeframe: Time period (1d, 7d, 30d, 90d, 1y, All)
-async def get_trending():
- """Fetch trending coins"""
- async with aiohttp.ClientSession() as session:
- url = "https://api.coingecko.com/api/v3/search/trending"
- data = await fetch_with_retry(session, url)
-
- if data and "coins" in data:
- return [
- {
- "name": coin["item"].get("name", ""),
- "symbol": coin["item"].get("symbol", "").upper(),
- "rank": coin["item"].get("market_cap_rank", 0),
- "thumb": coin["item"].get("thumb", "")
- }
- for coin in data["coins"][:7]
- ]
-
- return []
-
-async def get_sentiment():
- """Fetch Fear & Greed Index"""
- if is_cache_valid(cache["sentiment"]):
- return cache["sentiment"]["data"]
-
- async with aiohttp.ClientSession() as session:
- url = "https://api.alternative.me/fng/?limit=1&format=json"
- data = await fetch_with_retry(session, url)
-
- if data and "data" in data and len(data["data"]) > 0:
- fng_data = data["data"][0]
- result = {
- "value": int(fng_data.get("value", 50)),
- "classification": fng_data.get("value_classification", "Neutral"),
- "timestamp": fng_data.get("timestamp", "")
- }
- cache["sentiment"]["data"] = result
- cache["sentiment"]["timestamp"] = datetime.now()
- return result
-
- return {"value": 50, "classification": "Neutral", "timestamp": ""}
-
-async def get_defi_tvl():
- """Fetch DeFi Total Value Locked"""
- if is_cache_valid(cache["defi"]):
- return cache["defi"]["data"]
-
- async with aiohttp.ClientSession() as session:
- url = "https://api.llama.fi/protocols"
- data = await fetch_with_retry(session, url)
-
- if data and isinstance(data, list):
- top_protocols = sorted(data, key=lambda x: x.get("tvl", 0), reverse=True)[:10]
- result = [
- {
- "name": p.get("name", ""),
- "tvl": p.get("tvl", 0),
- "change_24h": p.get("change_1d", 0),
- "chain": p.get("chain", "")
- }
- for p in top_protocols
- ]
- cache["defi"]["data"] = result
- cache["defi"]["timestamp"] = datetime.now()
- return result
-
- return []
-
-async def fetch_provider_health(session: aiohttp.ClientSession, provider: Dict, force_refresh: bool = False) -> Dict:
- """Fetch real health information for a provider"""
- name = provider["name"]
- cached = provider_health_cache.get(name)
- if cached and not force_refresh:
- age = (datetime.now() - cached["timestamp"]).total_seconds()
- if age < HEALTH_CACHE_TTL:
- return cached["data"]
-
- health_config = HEALTH_TESTS.get(name, {})
- health_endpoint = provider.get("health_endpoint") or health_config.get("path")
- if not health_endpoint:
- endpoints = provider.get("endpoints", {})
- health_endpoint = next(iter(endpoints.values()), "/")
-
- params = dict(health_config.get("params", {}))
- headers = {
- "User-Agent": "CryptoMonitor/1.0 (+https://github.com/nimazasinich/crypto-dt-source)"
- }
+ Returns:
+ Plotly figure with price chart, volume, MA, and RSI
+ """
+ try:
+ logger.info(f"Generating chart for {symbol_display} - {timeframe}")
- requires_key = provider.get("requires_key", False)
- api_key = provider.get("api_key")
- cfg = global_config.get_provider(name)
- if cfg:
- requires_key = cfg.requires_key
- if not api_key:
- api_key = cfg.api_key
-
- if health_endpoint.startswith("http"):
- url = health_endpoint
- else:
- url = urljoin(provider["base_url"].rstrip("/") + "/", health_endpoint.lstrip("/"))
-
- if requires_key:
- if not api_key:
- result = {
- "name": name,
- "category": provider["category"],
- "base_url": provider["base_url"],
- "status": "degraded",
- "uptime": db.get_uptime_percentage(name),
- "response_time_ms": None,
- "rate_limit": "",
- "endpoints": len(provider.get("endpoints", {})),
- "last_fetch": datetime.now().isoformat(),
- "last_check": datetime.now().isoformat(),
- "message": "API key not configured"
- }
- provider_health_cache[name] = {"timestamp": datetime.now(), "data": result}
- db.log_provider_status(name, provider["category"], "degraded", endpoint_tested=url, error_message="missing_api_key")
- return result
-
- header_mapping = KEY_HEADER_MAP.get(name)
- if header_mapping:
- header_name, mode = header_mapping
- if mode == "plain":
- headers[header_name] = api_key
- elif mode == "apikey":
- headers[header_name] = f"Apikey {api_key}"
+ # Extract symbol from display name
+ if '(' in symbol_display and ')' in symbol_display:
+ symbol = symbol_display.split('(')[1].split(')')[0].strip().upper()
else:
- query_key = KEY_QUERY_MAP.get(name)
- if query_key:
- params[query_key] = api_key
+ symbol = symbol_display.strip().upper()
+
+ # Determine hours to look back
+ timeframe_hours = {
+ "1d": 24,
+ "7d": 24 * 7,
+ "30d": 24 * 30,
+ "90d": 24 * 90,
+ "1y": 24 * 365,
+ "All": 24 * 365 * 10 # 10 years
+ }
+ hours = timeframe_hours.get(timeframe, 168)
+
+ # Get price history
+ history = db.get_price_history(symbol, hours)
+
+ if not history:
+ # Try to find by name instead
+ prices = db.get_latest_prices(100)
+ matching = [p for p in prices if symbol.lower() in (p.get('name') or '').lower()]
+
+ if matching:
+ symbol = matching[0].get('symbol', symbol)
+ history = db.get_price_history(symbol, hours)
+
+ if not history or len(history) < 2:
+ # Create empty chart with message
+ fig = go.Figure()
+ fig.add_annotation(
+ text=f"No historical data available for {symbol}
Try refreshing or selecting a different cryptocurrency",
+ xref="paper", yref="paper",
+ x=0.5, y=0.5, showarrow=False,
+ font=dict(size=16)
+ )
+ fig.update_layout(
+ title=f"{symbol} - No Data Available",
+ height=600
+ )
+ return fig
+
+ # Extract data
+ timestamps = [datetime.fromisoformat(h['timestamp'].replace('Z', '+00:00')) if isinstance(h['timestamp'], str) else datetime.now() for h in history]
+ prices_data = [h.get('price_usd', 0) for h in history]
+ volumes = [h.get('volume_24h', 0) for h in history]
+
+ # Calculate technical indicators
+ ma7_values = []
+ ma30_values = []
+ rsi_values = []
+
+ for i in range(len(prices_data)):
+ # MA7
+ if i >= 6:
+ ma7 = utils.calculate_moving_average(prices_data[:i+1], 7)
+ ma7_values.append(ma7)
else:
- headers["Authorization"] = f"Bearer {api_key}"
+ ma7_values.append(None)
- timeout_total = max(provider.get("timeout_ms", 10000) / 1000, 5)
- timeout = aiohttp.ClientTimeout(total=timeout_total)
- loop = asyncio.get_running_loop()
- start_time = loop.time()
-
- status = "offline"
- status_code = None
- error_message = None
- response_time_ms = None
+ # MA30
+ if i >= 29:
+ ma30 = utils.calculate_moving_average(prices_data[:i+1], 30)
+ ma30_values.append(ma30)
+ else:
+ ma30_values.append(None)
- try:
- async with session.get(url, params=params, headers=headers, timeout=timeout) as response:
- status_code = response.status
- response_time_ms = round((loop.time() - start_time) * 1000, 2)
-
- if status_code < 400:
- status = "online"
- elif status_code < 500:
- status = "degraded"
+ # RSI
+ if i >= 14:
+ rsi = utils.calculate_rsi(prices_data[:i+1], 14)
+ rsi_values.append(rsi)
else:
- status = "offline"
+ rsi_values.append(None)
+
+ # Create subplots: Price + Volume + RSI
+ fig = make_subplots(
+ rows=3, cols=1,
+ shared_xaxes=True,
+ vertical_spacing=0.05,
+ row_heights=[0.5, 0.25, 0.25],
+ subplot_titles=(f'{symbol} Price Chart', 'Volume', 'RSI (14)')
+ )
- if status != "online":
- try:
- error_message = await response.text()
- except Exception:
- error_message = f"HTTP {status_code}"
- except Exception as exc:
- status = "offline"
- error_message = str(exc)
-
- db.log_provider_status(
- name,
- provider["category"],
- status,
- response_time=response_time_ms,
- status_code=status_code,
- endpoint_tested=url,
- error_message=error_message[:500] if error_message else None
- )
-
- uptime = db.get_uptime_percentage(name)
- avg_response = db.get_avg_response_time(name)
-
- result = {
- "name": name,
- "category": provider["category"],
- "base_url": provider["base_url"],
- "status": status,
- "uptime": uptime,
- "response_time_ms": response_time_ms,
- "avg_response_time_ms": avg_response,
- "rate_limit": provider.get("rate_limit", ""),
- "endpoints": len(provider.get("endpoints", {})),
- "last_fetch": datetime.now().isoformat(),
- "last_check": datetime.now().isoformat(),
- "status_code": status_code,
- "message": error_message[:200] if error_message else None
- }
+ # Price line
+ fig.add_trace(
+ go.Scatter(
+ x=timestamps,
+ y=prices_data,
+ name='Price',
+ line=dict(color='#2962FF', width=2),
+ hovertemplate='Price: $%{y:,.2f}
Date: %{x}
{str(e)}",
+ xref="paper", yref="paper",
+ x=0.5, y=0.5, showarrow=False,
+ font=dict(size=14, color="red")
+ )
+ fig.update_layout(title="Chart Error", height=600)
+ return fig
-@app.get("/api/defi")
-async def defi():
- """Get DeFi protocols and TVL"""
- try:
- data = await get_defi_tvl()
- except Exception as exc: # pragma: no cover - defensive
- logger.warning("defi endpoint fallback due to error: %s", exc)
- data = []
-
- if not data:
- data = DEFI_FALLBACK
-
- total_tvl = sum(p.get("tvl", 0) for p in data)
- return {
- "protocols": data,
- "total_tvl": total_tvl,
- "timestamp": datetime.now().isoformat(),
- "source": "DeFi Llama (fallback)" if data == DEFI_FALLBACK else "DeFi Llama"
- }
-@app.get("/api/providers")
-async def providers():
- """Get all API providers status"""
- data = await get_provider_stats()
- return data
-
-
-@app.get("/api/providers/custom")
-async def providers_custom():
- """Return custom providers registered through the UI."""
- return _get_custom_providers()
-
-
-@app.post("/api/providers", status_code=201)
-async def create_provider(request: ProviderCreateRequest):
- """Create a custom provider entry."""
- name = request.name.strip()
- if not name:
- raise HTTPException(status_code=400, detail="name is required")
- category = request.category.strip() or "custom"
- endpoint_url = request.endpoint_url.strip()
- if not endpoint_url:
- raise HTTPException(status_code=400, detail="endpoint_url is required")
-
- payload = {
- "name": name,
- "category": category,
- "base_url": endpoint_url,
- "endpoint_url": endpoint_url,
- "health_endpoint": request.health_check_endpoint.strip() if request.health_check_endpoint else endpoint_url,
- "requires_key": request.requires_key,
- "api_key": request.api_key.strip() if request.api_key else None,
- "timeout_ms": request.timeout_ms,
- "rate_limit": request.rate_limit.strip() if request.rate_limit else None,
- "notes": request.notes.strip() if request.notes else None,
- "created_at": datetime.utcnow().isoformat(),
- }
- try:
- created = _add_custom_provider(payload)
- except ValueError as exc:
- raise HTTPException(status_code=400, detail=str(exc))
-
- return {"message": "Provider registered", "provider": created}
-
-
-@app.delete("/api/providers/{slug}", status_code=204)
-async def delete_provider(slug: str):
- """Delete a custom provider by slug."""
- if not _remove_custom_provider(slug):
- raise HTTPException(status_code=404, detail="Provider not found")
- return Response(status_code=204)
-
-@app.get("/api/status")
-async def status():
- """Get system status for dashboard"""
- providers = await get_provider_stats()
- online = len([p for p in providers if p.get("status") == "online"])
- offline = len([p for p in providers if p.get("status") == "offline"])
- degraded = len([p for p in providers if p.get("status") == "degraded"])
- avg_response = 0.0
- if providers:
- response_values = [
- p.get("avg_response_time_ms") or p.get("response_time_ms") or 0
- for p in providers
- ]
- avg_response = sum(response_values) / len(response_values)
-
- return {
- "total_providers": len(providers),
- "online": online,
- "offline": offline,
- "degraded": degraded,
- "avg_response_time_ms": round(avg_response, 1),
- "system_health": "healthy" if not providers or online >= len(providers) * 0.8 else "degraded",
- "timestamp": datetime.now().isoformat()
- }
+# ==================== TAB 3: NEWS & SENTIMENT ====================
+def get_news_feed(sentiment_filter: str = "All", coin_filter: str = "All") -> str:
+ """
+ Get news feed with sentiment analysis as HTML cards
-@app.get("/status", include_in_schema=False)
-async def status_legacy():
- return await status()
-
-
-@app.get("/info", include_in_schema=False)
-async def info_legacy():
- return await api_info()
-
-
-@app.get("/system/info", include_in_schema=False)
-async def system_info():
- return await api_info()
-
-@app.get("/api/stats")
-async def stats():
- """Get comprehensive statistics"""
- market = await get_market_data()
- global_stats = await get_global_stats()
- providers = await get_provider_stats()
- sentiment_data = await get_sentiment()
-
- return {
- "market": {
- "total_market_cap": global_stats["total_market_cap"],
- "total_volume": global_stats["total_volume"],
- "btc_dominance": global_stats["btc_dominance"],
- "active_cryptos": global_stats["active_cryptocurrencies"],
- "top_crypto_count": len(market)
- },
- "sentiment": {
- "fear_greed_value": sentiment_data["value"],
- "classification": sentiment_data["classification"]
- },
- "providers": {
- "total": len(providers),
- "operational": len([p for p in providers if p["status"] == "online"]),
- "degraded": len([p for p in providers if p["status"] == "degraded"]),
- "avg_uptime": round(sum(p.get("uptime", 0) for p in providers) / len(providers), 2) if providers else 0,
- "avg_response_time": round(
- sum((p.get("avg_response_time_ms") or p.get("response_time_ms") or 0) for p in providers) / len(providers),
- 1
- ) if providers else 0
- },
- "timestamp": datetime.now().isoformat()
- }
+ Args:
+ sentiment_filter: Filter by sentiment (All, Positive, Neutral, Negative)
+ coin_filter: Filter by coin (All, BTC, ETH, etc.)
-# HuggingFace endpoints (mock for now)
-@app.get("/api/hf/health")
-async def hf_health():
- return {
- "status": "healthy",
- "model_loaded": True,
- "timestamp": datetime.now().isoformat()
- }
+ Returns:
+ HTML string with news cards
+ """
+ try:
+ logger.info(f"Fetching news feed: sentiment={sentiment_filter}, coin={coin_filter}")
+
+ # Map sentiment filter
+ sentiment_map = {
+ "All": None,
+ "Positive": "positive",
+ "Neutral": "neutral",
+ "Negative": "negative",
+ "Very Positive": "very_positive",
+ "Very Negative": "very_negative"
+ }
-@app.post("/api/hf/run-sentiment")
-async def hf_run_sentiment(request: SentimentRequest):
- """Run sentiment analysis on crypto text"""
- texts = request.texts
-
- # Mock sentiment analysis
- # In production, this would call HuggingFace API
- results = []
- total_vote = 0
-
- for text in texts:
- # Simple mock sentiment
- text_lower = text.lower()
- positive_words = ["bullish", "strong", "breakout", "pump", "moon", "buy", "up"]
- negative_words = ["bearish", "weak", "crash", "dump", "sell", "down", "drop"]
-
- positive_score = sum(1 for word in positive_words if word in text_lower)
- negative_score = sum(1 for word in negative_words if word in text_lower)
-
- sentiment_score = (positive_score - negative_score) / max(len(text.split()), 1)
- total_vote += sentiment_score
-
- results.append({
- "text": text,
- "sentiment": "positive" if sentiment_score > 0 else "negative" if sentiment_score < 0 else "neutral",
- "score": round(sentiment_score, 3)
- })
-
- avg_vote = total_vote / len(texts) if texts else 0
-
- return {
- "vote": round(avg_vote, 3),
- "results": results,
- "timestamp": datetime.now().isoformat()
- }
+ sentiment_db = sentiment_map.get(sentiment_filter)
-@app.websocket("/ws")
-async def websocket_root(websocket: WebSocket):
- """WebSocket endpoint for compatibility with websocket-client.js"""
- await websocket_endpoint(websocket)
+ # Get news from database
+ if coin_filter != "All":
+ news_list = db.get_news_by_coin(coin_filter, limit=50)
+ else:
+ news_list = db.get_latest_news(limit=50, sentiment=sentiment_db)
+
+ if not news_list:
+ return """
+
Try adjusting your filters or refresh the data
+{str(e)}
+Not enough historical data available for {symbol} to perform analysis.
+Please try a different cryptocurrency or wait for more data to be collected.
+{prediction}
+Latest analysis generated on {datetime.now().strftime('%B %d, %Y at %H:%M:%S')}
+Data Points Analyzed: {len(price_history)}
+Time Range: {len(price_history)} hours of historical data
+Failed to generate analysis: {str(e)}
+Please try again or select a different cryptocurrency.
+Error: {str(e)}
" -@app.delete("/api/pools/{pool_id}/members/{provider_id}") -async def remove_pool_member(pool_id: int, provider_id: str): - """Remove a member from a pool""" - pool = db.get_pool(pool_id) - if not pool: - raise HTTPException(status_code=404, detail="Pool not found") - db.remove_pool_member(pool_id, provider_id) - await broadcast_pool_update("member_removed", pool_id, {"provider_id": provider_id}) +def get_error_log_html() -> str: + """Get last 10 errors from log file as HTML""" + try: + if not config.LOG_FILE.exists(): + return "No error log file found
" - providers = await get_provider_stats() - provider_map = {provider_slug(p["name"]): p for p in providers} - pool_record = db.get_pool(pool_id) - payload = build_pool_payload(pool_record, provider_map) + # Read last 100 lines of log file + with open(config.LOG_FILE, 'r') as f: + lines = f.readlines() - return { - "message": "Member removed successfully", - "pool": payload - } + # Get lines with ERROR or WARNING + error_lines = [line for line in lines[-100:] if 'ERROR' in line or 'WARNING' in line] + if not error_lines: + return "✅ No recent errors or warnings
" -@app.post("/api/pools/{pool_id}/rotate") -async def rotate_pool(pool_id: int, request: Optional[Dict] = None): - """Rotate pool to next provider""" - pool = db.get_pool(pool_id) - if not pool: - raise HTTPException(status_code=404, detail="Pool not found") - - if not pool["members"]: - raise HTTPException(status_code=400, detail="Pool has no members") - - providers = await get_provider_stats(force_refresh=True) - provider_map = {provider_slug(p["name"]): p for p in providers} - - members_with_status = [] - for member in pool["members"]: - status_info = provider_map.get(member["provider_id"]) - if status_info: - members_with_status.append((member, status_info)) - - online_members = [m for m in members_with_status if m[1]["status"] == "online"] - degraded_members = [m for m in members_with_status if m[1]["status"] == "degraded"] - - candidates = online_members or degraded_members - if not candidates: - raise HTTPException(status_code=400, detail="No healthy providers available for rotation") - - strategy = pool.get("rotation_strategy", "round_robin") - - if strategy == "priority": - candidates.sort(key=lambda x: (x[0].get("priority", 1), x[0].get("weight", 1)), reverse=True) - selected_member, status_info = candidates[0] - elif strategy == "weighted": - weights = [max(1, c[0].get("weight", 1)) for c in candidates] - total_weight = sum(weights) - roll = random.uniform(0, total_weight) - cumulative = 0 - selected_member = candidates[0][0] - status_info = candidates[0][1] - for (candidate, status), weight in zip(candidates, weights): - cumulative += weight - if roll <= cumulative: - selected_member, status_info = candidate, status - break - elif strategy == "least_used": - candidates.sort(key=lambda x: x[0].get("use_count", 0)) - selected_member, status_info = candidates[0] - else: # round_robin or default - candidates.sort(key=lambda x: x[0].get("use_count", 0)) - selected_member, status_info = candidates[0] - - db.increment_member_use(pool_id, selected_member["provider_id"]) - db.update_member_stats( - pool_id, - selected_member["provider_id"], - success_rate=status_info.get("uptime", selected_member.get("success_rate")), - rate_limit_usage=status_info.get("rate_limit", {}).get("usage", 0) if isinstance(status_info.get("rate_limit"), dict) else None, - rate_limit_limit=status_info.get("rate_limit", {}).get("limit", 0) if isinstance(status_info.get("rate_limit"), dict) else None, - rate_limit_percentage=status_info.get("rate_limit", {}).get("percentage", 0) if isinstance(status_info.get("rate_limit"), dict) else None, - ) - db.log_pool_rotation( - pool_id, - selected_member["provider_id"], - selected_member["provider_name"], - request.get("reason", "manual") if request else "manual" - ) - - pool_record = db.get_pool(pool_id) - payload = build_pool_payload(pool_record, provider_map) - - await broadcast_pool_update("rotated", pool_id, { - "provider_id": selected_member["provider_id"], - "provider_name": selected_member["provider_name"] - }) - - return { - "message": "Pool rotated successfully", - "provider_name": selected_member["provider_name"], - "provider_id": selected_member["provider_id"], - "total_rotations": pool_record.get("rotation_count", 0), - "pool": payload - } + # Take last 10 + error_lines = error_lines[-10:] + html = "Error reading log: {str(e)}
" + + +def manual_data_collection() -> Tuple[pd.DataFrame, str, str]: """ - Check health status of a specific provider - Returns: { status: 'online'|'offline', response_time: number, error?: string } + Manually trigger data collection for all sources + + Returns: + Tuple of (status DataFrame, status HTML, message) """ try: - # Load provider config - config_path = Path(__file__).parent / "providers_config_ultimate.json" - with open(config_path, 'r', encoding='utf-8') as f: - config = json.load(f) + logger.info("Manual data collection triggered...") + + message = "🔄 Collecting data from all sources...\n\n" + + # Collect price data + try: + success, count = collectors.collect_price_data() + if success: + message += f"✅ Prices: {count} records collected\n" + else: + message += f"⚠️ Prices: Collection had issues\n" + except Exception as e: + message += f"❌ Prices: {str(e)}\n" - provider = config.get('providers', {}).get(provider_id) - if not provider: - raise HTTPException(status_code=404, detail=f"Provider '{provider_id}' not found") + # Collect news data + try: + count = collectors.collect_news_data() + message += f"✅ News: {count} articles collected\n" + except Exception as e: + message += f"❌ News: {str(e)}\n" - # Try to ping the provider's base URL - base_url = provider.get('base_url') - if not base_url: - return {"status": "unknown", "error": "No base URL configured"} + # Collect sentiment data + try: + sentiment = collectors.collect_sentiment_data() + if sentiment: + message += f"✅ Sentiment: {sentiment.get('classification', 'N/A')}\n" + else: + message += "⚠️ Sentiment: No data collected\n" + except Exception as e: + message += f"❌ Sentiment: {str(e)}\n" - import time - start_time = time.time() + message += "\n✅ Data collection complete!" - async with aiohttp.ClientSession() as session: - try: - async with session.get(base_url, timeout=aiohttp.ClientTimeout(total=5.0)) as response: - response_time = (time.time() - start_time) * 1000 # Convert to milliseconds - status = "online" if response.status in [200, 201, 204, 301, 302, 404] else "offline" - return { - "status": status, - "response_time": round(response_time, 2), - "http_status": response.status - } - except asyncio.TimeoutError: - return {"status": "offline", "error": "Timeout after 5s"} - except Exception as e: - return {"status": "offline", "error": str(e)} + # Get updated status + df, html = get_data_sources_status() + return df, html, message + + except Exception as e: + logger.error(f"Error in manual data collection: {e}") + df, html = get_data_sources_status() + return df, html, f"❌ Collection failed: {str(e)}" + + +# ==================== GRADIO INTERFACE ==================== + +def create_gradio_interface(): + """Create the complete Gradio interface with all 6 tabs""" + + # Custom CSS for better styling + custom_css = """ + .gradio-container { + max-width: 1400px !important; + } + .tab-nav button { + font-size: 16px !important; + font-weight: 600 !important; + } + """ + + with gr.Blocks( + title="Crypto Data Aggregator - Complete Dashboard", + theme=gr.themes.Soft(), + css=custom_css + ) as interface: + + # Header + gr.Markdown(""" + # 🚀 Crypto Data Aggregator - Complete Dashboard + + **Comprehensive cryptocurrency analytics platform** with real-time data, AI-powered insights, and advanced technical analysis. + + **Key Features:** + - 📊 Live price tracking for top 100 cryptocurrencies + - 📈 Historical charts with technical indicators (MA, RSI) + - 📰 News aggregation with sentiment analysis + - 🤖 AI-powered market trend predictions + - 🗄️ Powerful database explorer with export functionality + - 🔍 Real-time data source monitoring + """) + + with gr.Tabs(): + + # ==================== TAB 1: LIVE DASHBOARD ==================== + with gr.Tab("📊 Live Dashboard"): + gr.Markdown("### Real-time cryptocurrency prices and market data") + + with gr.Row(): + search_box = gr.Textbox( + label="Search/Filter", + placeholder="Enter coin name or symbol (e.g., Bitcoin, BTC)...", + scale=3 + ) + refresh_btn = gr.Button("🔄 Refresh Data", variant="primary", scale=1) + + dashboard_table = gr.Dataframe( + label="Top 100 Cryptocurrencies", + interactive=False, + wrap=True, + height=600 + ) + + refresh_status = gr.Textbox(label="Status", interactive=False) + + # Auto-refresh timer + timer = gr.Timer(value=config.AUTO_REFRESH_INTERVAL) + + # Load initial data + interface.load( + fn=get_live_dashboard, + outputs=dashboard_table + ) + + # Search/filter functionality + search_box.change( + fn=get_live_dashboard, + inputs=search_box, + outputs=dashboard_table + ) + + # Refresh button + refresh_btn.click( + fn=refresh_price_data, + outputs=[dashboard_table, refresh_status] + ) + + # Auto-refresh + timer.tick( + fn=get_live_dashboard, + outputs=dashboard_table + ) + + # ==================== TAB 2: HISTORICAL CHARTS ==================== + with gr.Tab("📈 Historical Charts"): + gr.Markdown("### Interactive price charts with technical analysis") + + with gr.Row(): + symbol_dropdown = gr.Dropdown( + label="Select Cryptocurrency", + choices=get_available_symbols(), + value=get_available_symbols()[0] if get_available_symbols() else "BTC", + scale=2 + ) + + timeframe_buttons = gr.Radio( + label="Timeframe", + choices=["1d", "7d", "30d", "90d", "1y", "All"], + value="7d", + scale=2 + ) + + chart_plot = gr.Plot(label="Price Chart with Indicators") + + with gr.Row(): + generate_chart_btn = gr.Button("📊 Generate Chart", variant="primary") + export_chart_btn = gr.Button("💾 Export Chart (PNG)") + + # Generate chart + generate_chart_btn.click( + fn=generate_chart, + inputs=[symbol_dropdown, timeframe_buttons], + outputs=chart_plot + ) + + # Also update on dropdown/timeframe change + symbol_dropdown.change( + fn=generate_chart, + inputs=[symbol_dropdown, timeframe_buttons], + outputs=chart_plot + ) + + timeframe_buttons.change( + fn=generate_chart, + inputs=[symbol_dropdown, timeframe_buttons], + outputs=chart_plot + ) + + # Load initial chart + interface.load( + fn=generate_chart, + inputs=[symbol_dropdown, timeframe_buttons], + outputs=chart_plot + ) + + # ==================== TAB 3: NEWS & SENTIMENT ==================== + with gr.Tab("📰 News & Sentiment"): + gr.Markdown("### Latest cryptocurrency news with AI sentiment analysis") + + with gr.Row(): + sentiment_filter = gr.Dropdown( + label="Filter by Sentiment", + choices=["All", "Positive", "Neutral", "Negative", "Very Positive", "Very Negative"], + value="All", + scale=1 + ) + + coin_filter = gr.Dropdown( + label="Filter by Coin", + choices=["All", "BTC", "ETH", "BNB", "XRP", "ADA", "SOL", "DOT", "DOGE"], + value="All", + scale=1 + ) + + news_refresh_btn = gr.Button("🔄 Refresh News", variant="primary", scale=1) + + news_html = gr.HTML(label="News Feed") + + # Load initial news + interface.load( + fn=get_news_feed, + inputs=[sentiment_filter, coin_filter], + outputs=news_html + ) + + # Update on filter change + sentiment_filter.change( + fn=get_news_feed, + inputs=[sentiment_filter, coin_filter], + outputs=news_html + ) + + coin_filter.change( + fn=get_news_feed, + inputs=[sentiment_filter, coin_filter], + outputs=news_html + ) + + # Refresh button + news_refresh_btn.click( + fn=get_news_feed, + inputs=[sentiment_filter, coin_filter], + outputs=news_html + ) + + # ==================== TAB 4: AI ANALYSIS ==================== + with gr.Tab("🤖 AI Analysis"): + gr.Markdown("### AI-powered market trend analysis and predictions") + + with gr.Row(): + analysis_symbol = gr.Dropdown( + label="Select Cryptocurrency for Analysis", + choices=get_available_symbols(), + value=get_available_symbols()[0] if get_available_symbols() else "BTC", + scale=3 + ) + + analyze_btn = gr.Button("🔮 Generate Analysis", variant="primary", scale=1) + + analysis_html = gr.HTML(label="AI Analysis Results") + + # Generate analysis + analyze_btn.click( + fn=generate_ai_analysis, + inputs=analysis_symbol, + outputs=analysis_html + ) + + # ==================== TAB 5: DATABASE EXPLORER ==================== + with gr.Tab("🗄️ Database Explorer"): + gr.Markdown("### Query and explore the cryptocurrency database") + + query_type = gr.Dropdown( + label="Select Query", + choices=[ + "Top 10 gainers in last 24h", + "All news with positive sentiment", + "Price history for BTC", + "Database statistics", + "Latest 100 prices", + "Recent news (50)", + "All market analyses", + "Custom Query" + ], + value="Database statistics" + ) + + custom_query_box = gr.Textbox( + label="Custom SQL Query (SELECT only)", + placeholder="SELECT * FROM prices WHERE symbol = 'BTC' LIMIT 10", + lines=3, + visible=False + ) + + with gr.Row(): + execute_btn = gr.Button("▶️ Execute Query", variant="primary") + export_btn = gr.Button("💾 Export to CSV") + + query_results = gr.Dataframe(label="Query Results", interactive=False, wrap=True) + query_status = gr.Textbox(label="Status", interactive=False) + export_status = gr.Textbox(label="Export Status", interactive=False) + + # Show/hide custom query box + def toggle_custom_query(query_type): + return gr.update(visible=(query_type == "Custom Query")) + + query_type.change( + fn=toggle_custom_query, + inputs=query_type, + outputs=custom_query_box + ) + + # Execute query + execute_btn.click( + fn=execute_database_query, + inputs=[query_type, custom_query_box], + outputs=[query_results, query_status] + ) + + # Export results + export_btn.click( + fn=export_query_results, + inputs=query_results, + outputs=[gr.Textbox(visible=False), export_status] + ) + + # Load initial query + interface.load( + fn=execute_database_query, + inputs=[query_type, custom_query_box], + outputs=[query_results, query_status] + ) + + # ==================== TAB 6: DATA SOURCES STATUS ==================== + with gr.Tab("🔍 Data Sources Status"): + gr.Markdown("### Monitor the health of all data sources") + + with gr.Row(): + status_refresh_btn = gr.Button("🔄 Refresh Status", variant="primary") + collect_btn = gr.Button("📥 Run Manual Collection", variant="secondary") + + status_table = gr.Dataframe(label="Data Sources Status", interactive=False) + error_log_html = gr.HTML(label="Error Log") + collection_status = gr.Textbox(label="Collection Status", lines=8, interactive=False) + + # Load initial status + interface.load( + fn=get_data_sources_status, + outputs=[status_table, error_log_html] + ) + + # Refresh status + status_refresh_btn.click( + fn=get_data_sources_status, + outputs=[status_table, error_log_html] + ) + + # Manual collection + collect_btn.click( + fn=manual_data_collection, + outputs=[status_table, error_log_html, collection_status] + ) + + # Footer + gr.Markdown(""" + --- + **Crypto Data Aggregator** | Powered by CoinGecko, CoinCap, Binance APIs | AI Models by HuggingFace + """) + + return interface + + +# ==================== MAIN ENTRY POINT ==================== + +def main(): + """Main function to initialize and launch the Gradio app""" + + logger.info("=" * 60) + logger.info("Starting Crypto Data Aggregator Dashboard") + logger.info("=" * 60) + + # Initialize database + logger.info("Initializing database...") + db = database.get_database() + logger.info("Database initialized successfully") + + # Start background data collection + global _collection_started + with _collection_lock: + if not _collection_started: + logger.info("Starting background data collection...") + collectors.schedule_data_collection() + _collection_started = True + logger.info("Background collection started") + + # Create Gradio interface + logger.info("Creating Gradio interface...") + interface = create_gradio_interface() + + # Launch Gradio + logger.info("Launching Gradio dashboard...") + logger.info(f"Server: {config.GRADIO_SERVER_NAME}:{config.GRADIO_SERVER_PORT}") + logger.info(f"Share: {config.GRADIO_SHARE}") + + try: + interface.launch( + share=config.GRADIO_SHARE, + server_name=config.GRADIO_SERVER_NAME, + server_port=config.GRADIO_SERVER_PORT, + show_error=True, + quiet=False + ) + except KeyboardInterrupt: + logger.info("\nShutting down...") + collectors.stop_scheduled_collection() + logger.info("Shutdown complete") except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error(f"Error launching Gradio: {e}\n{traceback.format_exc()}") + raise + if __name__ == "__main__": - print("🚀 Crypto Monitor ULTIMATE") - print("📊 Real APIs: CoinGecko, CoinCap, Binance, DeFi Llama, Fear & Greed") - print("🌐 http://localhost:8000/dashboard") - print("📡 API Docs: http://localhost:8000/docs") - uvicorn.run(app, host="0.0.0.0", port=8000) - -# === Compatibility routes without /api prefix for frontend fallbacks === - -@app.get("/providers") -async def providers_root(): - """Compatibility: mirror /api/providers at /providers""" - return await providers() - -@app.get("/providers/health") -async def providers_health_root(): - """Compatibility: health-style endpoint for providers""" - data = await get_provider_stats(force_refresh=True) - return data - -@app.get("/categories") -async def categories_root(): - """Compatibility: mirror /api/categories at /categories""" - return await api_categories() - -@app.get("/rate-limits") -async def rate_limits_root(): - """Compatibility: mirror /api/rate-limits at /rate-limits""" - return await api_rate_limits() - -@app.get("/logs") -async def logs_root(type: str = "all"): - """Compatibility: mirror /api/logs at /logs""" - return await api_logs(type=type) - -@app.get("/alerts") -async def alerts_root(): - """Compatibility: mirror /api/alerts at /alerts""" - return await api_alerts() - -@app.get("/hf/health") -async def hf_health_root(): - """Compatibility: mirror /api/hf/health at /hf/health""" - return await hf_health() - -@app.get("/hf/registry") -async def hf_registry_root(type: str = "models"): - """Compatibility: mirror /api/hf/registry at /hf/registry""" - return await hf_registry(type=type) - -@app.get("/hf/search") -async def hf_search_root(q: str = "", kind: str = "models"): - """Compatibility: mirror /api/hf/search at /hf/search""" - return await hf_search(q=q, kind=kind) - -@app.post("/hf/run-sentiment") -async def hf_run_sentiment_root(request: SentimentRequest): - """Compatibility: mirror /api/hf/run-sentiment at /hf/run-sentiment""" - return await hf_run_sentiment(request) + main()