|
|
""" |
|
|
Provider Fallback Manager |
|
|
Manages fallback to external providers when HF cannot provide data |
|
|
Uses /mnt/data/api-config-complete.txt as authoritative source |
|
|
""" |
|
|
|
|
|
import json |
|
|
import os |
|
|
import asyncio |
|
|
import hashlib |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional, Tuple |
|
|
from datetime import datetime, timezone |
|
|
import aiohttp |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
|
|
|
from ..enhanced_logger import logger |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FALLBACK_CONFIG_PATH = "/mnt/data/api-config-complete.txt" |
|
|
FALLBACK_CONFIG_URL = os.getenv("FALLBACK_CONFIG_URL", None) |
|
|
HF_PRIORITY = True |
|
|
MAX_RETRIES = 3 |
|
|
TIMEOUT_SECONDS = 10 |
|
|
CIRCUIT_BREAKER_THRESHOLD = 5 |
|
|
CIRCUIT_BREAKER_TIMEOUT = 300 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProviderStatus(Enum): |
|
|
"""Provider availability status""" |
|
|
AVAILABLE = "available" |
|
|
DEGRADED = "degraded" |
|
|
UNAVAILABLE = "unavailable" |
|
|
CIRCUIT_OPEN = "circuit_open" |
|
|
|
|
|
@dataclass |
|
|
class Provider: |
|
|
"""Provider configuration""" |
|
|
name: str |
|
|
base_url: str |
|
|
api_key: Optional[str] = None |
|
|
priority: int = 100 |
|
|
endpoints: Dict[str, str] = None |
|
|
rate_limit: Optional[int] = None |
|
|
status: ProviderStatus = ProviderStatus.AVAILABLE |
|
|
failures: int = 0 |
|
|
last_success: Optional[datetime] = None |
|
|
last_failure: Optional[datetime] = None |
|
|
circuit_open_until: Optional[datetime] = None |
|
|
|
|
|
def is_available(self) -> bool: |
|
|
"""Check if provider is available""" |
|
|
if self.status == ProviderStatus.CIRCUIT_OPEN: |
|
|
if self.circuit_open_until and datetime.now(timezone.utc) > self.circuit_open_until: |
|
|
|
|
|
self.status = ProviderStatus.AVAILABLE |
|
|
self.failures = 0 |
|
|
return True |
|
|
return False |
|
|
return self.status in [ProviderStatus.AVAILABLE, ProviderStatus.DEGRADED] |
|
|
|
|
|
def record_success(self): |
|
|
"""Record successful request""" |
|
|
self.failures = 0 |
|
|
self.last_success = datetime.now(timezone.utc) |
|
|
self.status = ProviderStatus.AVAILABLE |
|
|
|
|
|
def record_failure(self): |
|
|
"""Record failed request""" |
|
|
self.failures += 1 |
|
|
self.last_failure = datetime.now(timezone.utc) |
|
|
|
|
|
if self.failures >= CIRCUIT_BREAKER_THRESHOLD: |
|
|
|
|
|
self.status = ProviderStatus.CIRCUIT_OPEN |
|
|
self.circuit_open_until = datetime.now(timezone.utc).timestamp() + CIRCUIT_BREAKER_TIMEOUT |
|
|
logger.warning(f"Circuit breaker opened for {self.name} until {self.circuit_open_until}") |
|
|
elif self.failures >= 2: |
|
|
self.status = ProviderStatus.DEGRADED |
|
|
|
|
|
@dataclass |
|
|
class FallbackResult: |
|
|
"""Result from fallback attempt""" |
|
|
data: Optional[Any] |
|
|
source: str |
|
|
attempted: List[str] |
|
|
success: bool |
|
|
error: Optional[str] = None |
|
|
latency_ms: Optional[int] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProviderFallbackManager: |
|
|
"""Manages fallback to external providers with circuit breaker pattern""" |
|
|
|
|
|
def __init__(self): |
|
|
self.providers: List[Provider] = [] |
|
|
self.hf_handler = None |
|
|
self._load_providers() |
|
|
self._session: Optional[aiohttp.ClientSession] = None |
|
|
|
|
|
def _load_providers(self): |
|
|
"""Load provider configurations from file or URL""" |
|
|
config_data = None |
|
|
|
|
|
|
|
|
if Path(FALLBACK_CONFIG_PATH).exists(): |
|
|
try: |
|
|
with open(FALLBACK_CONFIG_PATH, 'r') as f: |
|
|
content = f.read() |
|
|
|
|
|
if content.strip().startswith('{'): |
|
|
config_data = json.loads(content) |
|
|
else: |
|
|
|
|
|
config_data = self._parse_text_config(content) |
|
|
logger.info(f"Loaded {len(config_data.get('providers', []))} providers from local file") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load local config: {e}") |
|
|
|
|
|
|
|
|
if not config_data and FALLBACK_CONFIG_URL: |
|
|
try: |
|
|
import requests |
|
|
response = requests.get(FALLBACK_CONFIG_URL, timeout=5) |
|
|
if response.status_code == 200: |
|
|
config_data = response.json() |
|
|
logger.info(f"Loaded {len(config_data.get('providers', []))} providers from URL") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load config from URL: {e}") |
|
|
|
|
|
|
|
|
if config_data and 'providers' in config_data: |
|
|
for idx, provider_config in enumerate(config_data['providers']): |
|
|
provider = Provider( |
|
|
name=provider_config.get('name', f'provider_{idx}'), |
|
|
base_url=provider_config.get('base_url', ''), |
|
|
api_key=provider_config.get('api_key') or os.getenv(f"{provider_config.get('name', '').upper()}_API_KEY"), |
|
|
priority=provider_config.get('priority', 100), |
|
|
endpoints=provider_config.get('endpoints', {}), |
|
|
rate_limit=provider_config.get('rate_limit') |
|
|
) |
|
|
self.providers.append(provider) |
|
|
|
|
|
|
|
|
self.providers.sort(key=lambda p: p.priority) |
|
|
|
|
|
|
|
|
if not self.providers: |
|
|
self._add_default_providers() |
|
|
|
|
|
def _parse_text_config(self, content: str) -> Dict: |
|
|
"""Parse text format config into JSON structure""" |
|
|
providers = [] |
|
|
lines = content.strip().split('\n') |
|
|
|
|
|
for line in lines: |
|
|
if line.strip() and not line.startswith('#'): |
|
|
parts = line.split(',') |
|
|
if len(parts) >= 2: |
|
|
providers.append({ |
|
|
'name': parts[0].strip(), |
|
|
'base_url': parts[1].strip(), |
|
|
'api_key': parts[2].strip() if len(parts) > 2 else None, |
|
|
'priority': int(parts[3].strip()) if len(parts) > 3 else 100 |
|
|
}) |
|
|
|
|
|
return {'providers': providers} |
|
|
|
|
|
def _add_default_providers(self): |
|
|
"""Add default fallback providers""" |
|
|
defaults = [ |
|
|
Provider( |
|
|
name="coingecko", |
|
|
base_url="https://api.coingecko.com/api/v3", |
|
|
priority=10, |
|
|
endpoints={ |
|
|
"rate": "/simple/price", |
|
|
"market": "/coins/markets", |
|
|
"history": "/coins/{id}/market_chart" |
|
|
} |
|
|
), |
|
|
Provider( |
|
|
name="binance", |
|
|
base_url="https://api.binance.com/api/v3", |
|
|
priority=20, |
|
|
endpoints={ |
|
|
"rate": "/ticker/price", |
|
|
"history": "/klines", |
|
|
"depth": "/depth" |
|
|
} |
|
|
), |
|
|
Provider( |
|
|
name="coinmarketcap", |
|
|
base_url="https://pro-api.coinmarketcap.com/v1", |
|
|
api_key=os.getenv("CMC_API_KEY"), |
|
|
priority=30, |
|
|
endpoints={ |
|
|
"rate": "/cryptocurrency/quotes/latest", |
|
|
"market": "/cryptocurrency/listings/latest" |
|
|
} |
|
|
) |
|
|
] |
|
|
|
|
|
self.providers.extend(defaults) |
|
|
logger.info(f"Added {len(defaults)} default providers") |
|
|
|
|
|
async def _get_session(self) -> aiohttp.ClientSession: |
|
|
"""Get or create aiohttp session""" |
|
|
if not self._session: |
|
|
self._session = aiohttp.ClientSession( |
|
|
timeout=aiohttp.ClientTimeout(total=TIMEOUT_SECONDS) |
|
|
) |
|
|
return self._session |
|
|
|
|
|
async def close(self): |
|
|
"""Close aiohttp session""" |
|
|
if self._session: |
|
|
await self._session.close() |
|
|
self._session = None |
|
|
|
|
|
async def _call_hf(self, endpoint: str, params: Dict = None) -> Tuple[Optional[Any], Optional[str]]: |
|
|
"""Try to get data from HF first""" |
|
|
if not HF_PRIORITY: |
|
|
return None, None |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
return None, "HF integration pending" |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"HF call failed: {e}") |
|
|
return None, str(e) |
|
|
|
|
|
async def _call_provider( |
|
|
self, |
|
|
provider: Provider, |
|
|
endpoint: str, |
|
|
params: Dict = None, |
|
|
method: str = "GET" |
|
|
) -> Tuple[Optional[Any], Optional[str]]: |
|
|
"""Call a specific provider""" |
|
|
|
|
|
if not provider.is_available(): |
|
|
return None, f"Provider {provider.name} unavailable (circuit open)" |
|
|
|
|
|
try: |
|
|
session = await self._get_session() |
|
|
|
|
|
|
|
|
url = f"{provider.base_url}{endpoint}" |
|
|
|
|
|
|
|
|
headers = {} |
|
|
if provider.api_key: |
|
|
|
|
|
if "coinmarketcap" in provider.name.lower(): |
|
|
headers["X-CMC_PRO_API_KEY"] = provider.api_key |
|
|
elif "alphavantage" in provider.name.lower(): |
|
|
if params is None: |
|
|
params = {} |
|
|
params["apikey"] = provider.api_key |
|
|
else: |
|
|
headers["Authorization"] = f"Bearer {provider.api_key}" |
|
|
|
|
|
|
|
|
start_time = datetime.now(timezone.utc) |
|
|
|
|
|
if method == "GET": |
|
|
async with session.get(url, params=params, headers=headers) as response: |
|
|
latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) |
|
|
|
|
|
if response.status == 200: |
|
|
data = await response.json() |
|
|
provider.record_success() |
|
|
logger.debug(f"Provider {provider.name} succeeded in {latency_ms}ms") |
|
|
return data, None |
|
|
else: |
|
|
error = f"HTTP {response.status}" |
|
|
provider.record_failure() |
|
|
return None, error |
|
|
|
|
|
elif method == "POST": |
|
|
async with session.post(url, json=params, headers=headers) as response: |
|
|
latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) |
|
|
|
|
|
if response.status == 200: |
|
|
data = await response.json() |
|
|
provider.record_success() |
|
|
logger.debug(f"Provider {provider.name} succeeded in {latency_ms}ms") |
|
|
return data, None |
|
|
else: |
|
|
error = f"HTTP {response.status}" |
|
|
provider.record_failure() |
|
|
return None, error |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
provider.record_failure() |
|
|
return None, "Timeout" |
|
|
|
|
|
except Exception as e: |
|
|
provider.record_failure() |
|
|
logger.error(f"Provider {provider.name} error: {e}") |
|
|
return None, str(e) |
|
|
|
|
|
async def fetch_with_fallback( |
|
|
self, |
|
|
endpoint: str, |
|
|
params: Dict = None, |
|
|
method: str = "GET", |
|
|
transform_func: callable = None |
|
|
) -> FallbackResult: |
|
|
""" |
|
|
Fetch data with HF-first then fallback strategy |
|
|
|
|
|
Args: |
|
|
endpoint: API endpoint path |
|
|
params: Query parameters |
|
|
method: HTTP method |
|
|
transform_func: Function to transform provider response to standard format |
|
|
|
|
|
Returns: |
|
|
FallbackResult with data, source, and metadata |
|
|
""" |
|
|
|
|
|
attempted = [] |
|
|
start_time = datetime.now(timezone.utc) |
|
|
|
|
|
|
|
|
if HF_PRIORITY: |
|
|
attempted.append("hf") |
|
|
hf_data, hf_error = await self._call_hf(endpoint, params) |
|
|
|
|
|
if hf_data: |
|
|
latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) |
|
|
return FallbackResult( |
|
|
data=hf_data, |
|
|
source="hf", |
|
|
attempted=attempted, |
|
|
success=True, |
|
|
latency_ms=latency_ms |
|
|
) |
|
|
|
|
|
|
|
|
for provider in self.providers: |
|
|
if not provider.is_available(): |
|
|
logger.debug(f"Skipping unavailable provider {provider.name}") |
|
|
continue |
|
|
|
|
|
attempted.append(provider.base_url) |
|
|
|
|
|
|
|
|
provider_endpoint = endpoint |
|
|
if provider.endpoints: |
|
|
|
|
|
for key, value in provider.endpoints.items(): |
|
|
if key in endpoint: |
|
|
provider_endpoint = value |
|
|
break |
|
|
|
|
|
|
|
|
data, error = await self._call_provider( |
|
|
provider, |
|
|
provider_endpoint, |
|
|
params, |
|
|
method |
|
|
) |
|
|
|
|
|
if data: |
|
|
|
|
|
if transform_func: |
|
|
try: |
|
|
data = transform_func(data, provider.name) |
|
|
except Exception as e: |
|
|
logger.error(f"Transform failed for {provider.name}: {e}") |
|
|
continue |
|
|
|
|
|
latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) |
|
|
return FallbackResult( |
|
|
data=data, |
|
|
source=provider.base_url, |
|
|
attempted=attempted, |
|
|
success=True, |
|
|
latency_ms=latency_ms |
|
|
) |
|
|
|
|
|
|
|
|
latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) |
|
|
return FallbackResult( |
|
|
data=None, |
|
|
source="none", |
|
|
attempted=attempted, |
|
|
success=False, |
|
|
error="All providers failed", |
|
|
latency_ms=latency_ms |
|
|
) |
|
|
|
|
|
def get_provider_status(self) -> Dict[str, Any]: |
|
|
"""Get current status of all providers""" |
|
|
|
|
|
status = { |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"providers": [] |
|
|
} |
|
|
|
|
|
for provider in self.providers: |
|
|
status["providers"].append({ |
|
|
"name": provider.name, |
|
|
"base_url": provider.base_url, |
|
|
"priority": provider.priority, |
|
|
"status": provider.status.value, |
|
|
"failures": provider.failures, |
|
|
"is_available": provider.is_available(), |
|
|
"last_success": provider.last_success.isoformat() if provider.last_success else None, |
|
|
"last_failure": provider.last_failure.isoformat() if provider.last_failure else None, |
|
|
"circuit_open_until": provider.circuit_open_until if provider.circuit_open_until else None |
|
|
}) |
|
|
|
|
|
|
|
|
available_count = sum(1 for p in self.providers if p.is_available()) |
|
|
status["available_providers"] = available_count |
|
|
status["total_providers"] = len(self.providers) |
|
|
status["hf_priority"] = HF_PRIORITY |
|
|
|
|
|
return status |
|
|
|
|
|
def reset_provider(self, provider_name: str) -> bool: |
|
|
"""Reset a specific provider's circuit breaker""" |
|
|
|
|
|
for provider in self.providers: |
|
|
if provider.name == provider_name: |
|
|
provider.status = ProviderStatus.AVAILABLE |
|
|
provider.failures = 0 |
|
|
provider.circuit_open_until = None |
|
|
logger.info(f"Reset provider {provider_name}") |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def reset_all_providers(self): |
|
|
"""Reset all providers' circuit breakers""" |
|
|
|
|
|
for provider in self.providers: |
|
|
provider.status = ProviderStatus.AVAILABLE |
|
|
provider.failures = 0 |
|
|
provider.circuit_open_until = None |
|
|
|
|
|
logger.info("Reset all providers") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def transform_coingecko_rate(data: Dict, provider: str) -> Dict: |
|
|
"""Transform CoinGecko rate response to standard format""" |
|
|
|
|
|
if data and isinstance(data, dict): |
|
|
for coin, prices in data.items(): |
|
|
for currency, price in prices.items(): |
|
|
return { |
|
|
"pair": f"{coin.upper()}/{currency.upper()}", |
|
|
"price": price, |
|
|
"ts": datetime.now(timezone.utc).isoformat() |
|
|
} |
|
|
return data |
|
|
|
|
|
def transform_binance_rate(data: Dict, provider: str) -> Dict: |
|
|
"""Transform Binance rate response to standard format""" |
|
|
|
|
|
if data and "symbol" in data: |
|
|
return { |
|
|
"pair": f"{data['symbol'][:-4]}/{data['symbol'][-4:]}", |
|
|
"price": float(data["price"]), |
|
|
"ts": datetime.now(timezone.utc).isoformat() |
|
|
} |
|
|
return data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fallback_manager = ProviderFallbackManager() |
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
'ProviderFallbackManager', |
|
|
'FallbackResult', |
|
|
'Provider', |
|
|
'ProviderStatus', |
|
|
'fallback_manager', |
|
|
'transform_coingecko_rate', |
|
|
'transform_binance_rate' |
|
|
] |