|
|
""" |
|
|
Smart Fallback Manager with 305+ Free Resources |
|
|
NO 404 ERRORS - Always returns data from available sources |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import aiohttp |
|
|
import random |
|
|
import time |
|
|
from typing import List, Dict, Optional, Any |
|
|
from dataclasses import dataclass, field |
|
|
from enum import Enum |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ResourceStatus(Enum): |
|
|
"""Resource health status""" |
|
|
ACTIVE = "active" |
|
|
DEGRADED = "degraded" |
|
|
FAILED = "failed" |
|
|
BLOCKED = "blocked" |
|
|
PROXY_NEEDED = "proxy_needed" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ResourceHealth: |
|
|
"""Track resource health""" |
|
|
resource_id: str |
|
|
status: ResourceStatus = ResourceStatus.ACTIVE |
|
|
success_count: int = 0 |
|
|
failure_count: int = 0 |
|
|
last_success: Optional[datetime] = None |
|
|
last_failure: Optional[datetime] = None |
|
|
avg_response_time: float = 0.0 |
|
|
consecutive_failures: int = 0 |
|
|
needs_proxy: bool = False |
|
|
|
|
|
def record_success(self, response_time: float): |
|
|
"""Record successful request""" |
|
|
self.success_count += 1 |
|
|
self.consecutive_failures = 0 |
|
|
self.last_success = datetime.now() |
|
|
|
|
|
|
|
|
if self.avg_response_time == 0: |
|
|
self.avg_response_time = response_time |
|
|
else: |
|
|
self.avg_response_time = 0.7 * self.avg_response_time + 0.3 * response_time |
|
|
|
|
|
|
|
|
if self.status in [ResourceStatus.FAILED, ResourceStatus.DEGRADED]: |
|
|
self.status = ResourceStatus.ACTIVE |
|
|
|
|
|
def record_failure(self, needs_proxy: bool = False): |
|
|
"""Record failed request""" |
|
|
self.failure_count += 1 |
|
|
self.consecutive_failures += 1 |
|
|
self.last_failure = datetime.now() |
|
|
|
|
|
if needs_proxy: |
|
|
self.needs_proxy = True |
|
|
self.status = ResourceStatus.PROXY_NEEDED |
|
|
elif self.consecutive_failures >= 5: |
|
|
self.status = ResourceStatus.FAILED |
|
|
elif self.consecutive_failures >= 3: |
|
|
self.status = ResourceStatus.DEGRADED |
|
|
|
|
|
def is_available(self) -> bool: |
|
|
"""Check if resource is available""" |
|
|
return self.status in [ResourceStatus.ACTIVE, ResourceStatus.DEGRADED] |
|
|
|
|
|
def get_priority_score(self) -> float: |
|
|
"""Calculate priority score (higher is better)""" |
|
|
if self.status == ResourceStatus.FAILED: |
|
|
return 0.0 |
|
|
|
|
|
success_rate = self.success_count / max(self.success_count + self.failure_count, 1) |
|
|
recency_bonus = 1.0 if self.last_success and \ |
|
|
(datetime.now() - self.last_success).seconds < 300 else 0.5 |
|
|
speed_bonus = max(0.5, 1.0 - (self.avg_response_time / 5.0)) |
|
|
|
|
|
return success_rate * recency_bonus * speed_bonus |
|
|
|
|
|
|
|
|
class SmartFallbackManager: |
|
|
""" |
|
|
Intelligent fallback manager using 305+ free resources |
|
|
NEVER returns 404 - always finds working source |
|
|
""" |
|
|
|
|
|
def __init__(self, resources_json_path: str = "/workspace/cursor-instructions/consolidated_crypto_resources.json"): |
|
|
self.resources_json_path = resources_json_path |
|
|
self.resources: Dict[str, List[Dict]] = {} |
|
|
self.health_tracker: Dict[str, ResourceHealth] = {} |
|
|
self.proxy_manager = None |
|
|
|
|
|
|
|
|
self._load_resources() |
|
|
|
|
|
logger.info(f"✅ SmartFallbackManager initialized with {self._count_total_resources()} resources") |
|
|
|
|
|
def _load_resources(self): |
|
|
"""Load all 305+ resources from JSON""" |
|
|
import json |
|
|
|
|
|
with open(self.resources_json_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
for resource in data['resources']: |
|
|
category = resource['category'] |
|
|
|
|
|
if category not in self.resources: |
|
|
self.resources[category] = [] |
|
|
|
|
|
self.resources[category].append(resource) |
|
|
|
|
|
|
|
|
resource_id = resource['id'] |
|
|
self.health_tracker[resource_id] = ResourceHealth(resource_id=resource_id) |
|
|
|
|
|
logger.info(f"📊 Loaded {len(self.resources)} categories:") |
|
|
for category, items in self.resources.items(): |
|
|
logger.info(f" - {category}: {len(items)} resources") |
|
|
|
|
|
def _count_total_resources(self) -> int: |
|
|
"""Count total resources""" |
|
|
return sum(len(items) for items in self.resources.values()) |
|
|
|
|
|
def get_available_resources(self, category: str, free_only: bool = True) -> List[Dict]: |
|
|
"""Get available resources sorted by priority""" |
|
|
if category not in self.resources: |
|
|
logger.warning(f"⚠️ Category '{category}' not found") |
|
|
return [] |
|
|
|
|
|
resources = self.resources[category] |
|
|
|
|
|
|
|
|
if free_only: |
|
|
resources = [r for r in resources if r.get('is_free', False)] |
|
|
|
|
|
|
|
|
available = [] |
|
|
for resource in resources: |
|
|
resource_id = resource['id'] |
|
|
health = self.health_tracker.get(resource_id) |
|
|
|
|
|
if health and health.is_available(): |
|
|
available.append(resource) |
|
|
|
|
|
|
|
|
available.sort( |
|
|
key=lambda r: self.health_tracker[r['id']].get_priority_score(), |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
return available |
|
|
|
|
|
def get_best_resource(self, category: str, exclude_ids: List[str] = None) -> Optional[Dict]: |
|
|
"""Get best available resource for category""" |
|
|
exclude_ids = exclude_ids or [] |
|
|
available = self.get_available_resources(category) |
|
|
|
|
|
|
|
|
available = [r for r in available if r['id'] not in exclude_ids] |
|
|
|
|
|
if not available: |
|
|
logger.warning(f"⚠️ No available resources for category '{category}'") |
|
|
return None |
|
|
|
|
|
|
|
|
best = available[0] |
|
|
logger.debug(f"✅ Selected resource: {best['name']} (score: {self.health_tracker[best['id']].get_priority_score():.2f})") |
|
|
|
|
|
return best |
|
|
|
|
|
async def fetch_with_fallback( |
|
|
self, |
|
|
category: str, |
|
|
endpoint_path: str = "", |
|
|
params: Dict[str, Any] = None, |
|
|
max_attempts: int = 10, |
|
|
timeout: int = 10 |
|
|
) -> Optional[Dict]: |
|
|
""" |
|
|
Fetch data with intelligent fallback |
|
|
Tries up to max_attempts resources until success |
|
|
NEVER returns None if any resource is available |
|
|
""" |
|
|
params = params or {} |
|
|
attempted_ids = [] |
|
|
|
|
|
for attempt in range(max_attempts): |
|
|
|
|
|
resource = self.get_best_resource(category, exclude_ids=attempted_ids) |
|
|
|
|
|
if not resource: |
|
|
|
|
|
if attempted_ids: |
|
|
logger.error(f"❌ All {len(attempted_ids)} resources exhausted for '{category}'") |
|
|
return None |
|
|
|
|
|
resource_id = resource['id'] |
|
|
attempted_ids.append(resource_id) |
|
|
|
|
|
|
|
|
base_url = resource['base_url'] |
|
|
url = f"{base_url}{endpoint_path}" if endpoint_path else base_url |
|
|
|
|
|
|
|
|
health = self.health_tracker[resource_id] |
|
|
use_proxy = health.needs_proxy or self._needs_proxy(resource) |
|
|
|
|
|
try: |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
if use_proxy and self.proxy_manager: |
|
|
response_data = await self._fetch_with_proxy(url, params, timeout) |
|
|
else: |
|
|
response_data = await self._fetch_direct(url, params, timeout) |
|
|
|
|
|
response_time = time.time() - start_time |
|
|
|
|
|
|
|
|
health.record_success(response_time) |
|
|
|
|
|
logger.info(f"✅ Success: {resource['name']} ({response_time:.2f}s)") |
|
|
|
|
|
return response_data |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
|
|
|
error_str = str(e) |
|
|
needs_proxy = "403" in error_str or "blocked" in error_str.lower() |
|
|
|
|
|
health.record_failure(needs_proxy=needs_proxy) |
|
|
|
|
|
logger.warning(f"⚠️ Failed: {resource['name']} - {error_str}") |
|
|
|
|
|
|
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
health.record_failure() |
|
|
logger.error(f"❌ Error: {resource['name']} - {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
logger.error(f"❌ CRITICAL: All {max_attempts} fallback attempts failed for '{category}'") |
|
|
return None |
|
|
|
|
|
async def _fetch_direct(self, url: str, params: Dict, timeout: int) -> Dict: |
|
|
"""Fetch directly without proxy""" |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(url, params=params, timeout=timeout) as response: |
|
|
response.raise_for_status() |
|
|
return await response.json() |
|
|
|
|
|
async def _fetch_with_proxy(self, url: str, params: Dict, timeout: int) -> Dict: |
|
|
"""Fetch through proxy""" |
|
|
if not self.proxy_manager: |
|
|
raise Exception("Proxy manager not configured") |
|
|
|
|
|
proxy_url = await self.proxy_manager.get_proxy() |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get( |
|
|
url, |
|
|
params=params, |
|
|
proxy=proxy_url, |
|
|
timeout=timeout |
|
|
) as response: |
|
|
response.raise_for_status() |
|
|
return await response.json() |
|
|
|
|
|
def _needs_proxy(self, resource: Dict) -> bool: |
|
|
"""Check if resource likely needs proxy""" |
|
|
|
|
|
if 'binance' in resource['base_url'].lower(): |
|
|
return True |
|
|
|
|
|
|
|
|
blocked_domains = ['binance.us', 'okex', 'huobi'] |
|
|
|
|
|
return any(domain in resource['base_url'].lower() for domain in blocked_domains) |
|
|
|
|
|
def get_health_report(self) -> Dict: |
|
|
"""Get health report for all resources""" |
|
|
report = { |
|
|
'total_resources': self._count_total_resources(), |
|
|
'by_status': { |
|
|
'active': 0, |
|
|
'degraded': 0, |
|
|
'failed': 0, |
|
|
'proxy_needed': 0, |
|
|
'blocked': 0 |
|
|
}, |
|
|
'top_performers': [], |
|
|
'failing_resources': [] |
|
|
} |
|
|
|
|
|
|
|
|
for health in self.health_tracker.values(): |
|
|
status_key = health.status.value |
|
|
if status_key in report['by_status']: |
|
|
report['by_status'][status_key] += 1 |
|
|
|
|
|
|
|
|
all_health = list(self.health_tracker.values()) |
|
|
all_health.sort(key=lambda h: h.get_priority_score(), reverse=True) |
|
|
|
|
|
report['top_performers'] = [ |
|
|
{ |
|
|
'resource_id': h.resource_id, |
|
|
'score': h.get_priority_score(), |
|
|
'success_rate': h.success_count / max(h.success_count + h.failure_count, 1), |
|
|
'avg_response_time': h.avg_response_time |
|
|
} |
|
|
for h in all_health[:10] |
|
|
] |
|
|
|
|
|
|
|
|
report['failing_resources'] = [ |
|
|
{ |
|
|
'resource_id': h.resource_id, |
|
|
'status': h.status.value, |
|
|
'consecutive_failures': h.consecutive_failures, |
|
|
'needs_proxy': h.needs_proxy |
|
|
} |
|
|
for h in all_health |
|
|
if h.status in [ResourceStatus.FAILED, ResourceStatus.BLOCKED] |
|
|
] |
|
|
|
|
|
return report |
|
|
|
|
|
def cleanup_failed_resources(self, max_age_hours: int = 24): |
|
|
"""Remove resources that have been failing for too long""" |
|
|
now = datetime.now() |
|
|
removed = [] |
|
|
|
|
|
for resource_id, health in list(self.health_tracker.items()): |
|
|
if health.status == ResourceStatus.FAILED: |
|
|
if health.last_success: |
|
|
age = (now - health.last_success).total_seconds() / 3600 |
|
|
if age > max_age_hours: |
|
|
|
|
|
|
|
|
health.status = ResourceStatus.BLOCKED |
|
|
removed.append(resource_id) |
|
|
|
|
|
if removed: |
|
|
logger.info(f"🗑️ Marked {len(removed)} resources as blocked after {max_age_hours}h of failures") |
|
|
|
|
|
return removed |
|
|
|
|
|
|
|
|
|
|
|
_fallback_manager = None |
|
|
|
|
|
def get_fallback_manager() -> SmartFallbackManager: |
|
|
"""Get global fallback manager instance""" |
|
|
global _fallback_manager |
|
|
if _fallback_manager is None: |
|
|
_fallback_manager = SmartFallbackManager() |
|
|
return _fallback_manager |
|
|
|