|
|
""" |
|
|
Market Data Background Worker - REAL DATA FROM FREE APIs ONLY |
|
|
|
|
|
CRITICAL RULES: |
|
|
- MUST fetch REAL data from CoinGecko API (FREE tier) |
|
|
- MUST store actual prices, not fake data |
|
|
- MUST use actual timestamps from API responses |
|
|
- NEVER generate or fake any data |
|
|
- If API fails, log error and retry (don't fake it) |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import time |
|
|
import logging |
|
|
import os |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Any |
|
|
import httpx |
|
|
|
|
|
from database.cache_queries import get_cache_queries |
|
|
from database.db_manager import db_manager |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger("market_worker") |
|
|
|
|
|
|
|
|
cache = get_cache_queries(db_manager) |
|
|
|
|
|
|
|
|
HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")) |
|
|
if HF_UPLOAD_ENABLED: |
|
|
try: |
|
|
from hf_dataset_uploader import get_dataset_uploader |
|
|
hf_uploader = get_dataset_uploader() |
|
|
logger.info("✅ HuggingFace Dataset upload ENABLED") |
|
|
except Exception as e: |
|
|
logger.warning(f"HuggingFace Dataset upload disabled: {e}") |
|
|
HF_UPLOAD_ENABLED = False |
|
|
hf_uploader = None |
|
|
else: |
|
|
logger.info("ℹ️ HuggingFace Dataset upload DISABLED (no HF_TOKEN)") |
|
|
hf_uploader = None |
|
|
|
|
|
|
|
|
COINGECKO_BASE_URL = "https://api.coingecko.com/api/v3" |
|
|
|
|
|
|
|
|
TOP_SYMBOLS = [ |
|
|
"bitcoin", "ethereum", "binancecoin", "ripple", "cardano", |
|
|
"solana", "polkadot", "dogecoin", "polygon", "avalanche", |
|
|
"chainlink", "litecoin", "uniswap", "algorand", "stellar", |
|
|
"cosmos", "tron", "monero", "ethereum-classic", "tezos" |
|
|
] |
|
|
|
|
|
|
|
|
SYMBOL_MAP = { |
|
|
"bitcoin": "BTC", |
|
|
"ethereum": "ETH", |
|
|
"binancecoin": "BNB", |
|
|
"ripple": "XRP", |
|
|
"cardano": "ADA", |
|
|
"solana": "SOL", |
|
|
"polkadot": "DOT", |
|
|
"dogecoin": "DOGE", |
|
|
"polygon": "MATIC", |
|
|
"avalanche": "AVAX", |
|
|
"chainlink": "LINK", |
|
|
"litecoin": "LTC", |
|
|
"uniswap": "UNI", |
|
|
"algorand": "ALGO", |
|
|
"stellar": "XLM", |
|
|
"cosmos": "ATOM", |
|
|
"tron": "TRX", |
|
|
"monero": "XMR", |
|
|
"ethereum-classic": "ETC", |
|
|
"tezos": "XTZ" |
|
|
} |
|
|
|
|
|
|
|
|
async def fetch_coingecko_prices() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch REAL market prices from CoinGecko API (FREE tier) |
|
|
|
|
|
CRITICAL RULES: |
|
|
1. MUST call actual CoinGecko API |
|
|
2. MUST return actual data from API response |
|
|
3. NEVER generate fake prices |
|
|
4. If API fails, return empty list (not fake data) |
|
|
|
|
|
Returns: |
|
|
List of dictionaries with REAL market data |
|
|
""" |
|
|
try: |
|
|
|
|
|
ids = ",".join(TOP_SYMBOLS) |
|
|
url = f"{COINGECKO_BASE_URL}/coins/markets" |
|
|
params = { |
|
|
"vs_currency": "usd", |
|
|
"ids": ids, |
|
|
"order": "market_cap_desc", |
|
|
"per_page": 100, |
|
|
"page": 1, |
|
|
"sparkline": False, |
|
|
"price_change_percentage": "24h" |
|
|
} |
|
|
|
|
|
logger.info(f"Fetching REAL data from CoinGecko API: {url}") |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client: |
|
|
response = await client.get(url, params=params) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
coins = response.json() |
|
|
|
|
|
if not coins or not isinstance(coins, list): |
|
|
logger.error(f"Invalid response from CoinGecko: {coins}") |
|
|
return [] |
|
|
|
|
|
logger.info(f"Successfully fetched {len(coins)} coins from CoinGecko") |
|
|
|
|
|
|
|
|
market_data = [] |
|
|
for coin in coins: |
|
|
try: |
|
|
coin_id = coin.get("id", "") |
|
|
symbol = SYMBOL_MAP.get(coin_id, coin.get("symbol", "").upper()) |
|
|
|
|
|
|
|
|
data = { |
|
|
"symbol": symbol, |
|
|
"price": float(coin.get("current_price", 0)), |
|
|
"market_cap": float(coin.get("market_cap", 0)) if coin.get("market_cap") else None, |
|
|
"volume_24h": float(coin.get("total_volume", 0)) if coin.get("total_volume") else None, |
|
|
"change_24h": float(coin.get("price_change_percentage_24h", 0)) if coin.get("price_change_percentage_24h") else None, |
|
|
"high_24h": float(coin.get("high_24h", 0)) if coin.get("high_24h") else None, |
|
|
"low_24h": float(coin.get("low_24h", 0)) if coin.get("low_24h") else None, |
|
|
"provider": "coingecko" |
|
|
} |
|
|
|
|
|
market_data.append(data) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing coin data for {coin.get('id')}: {e}") |
|
|
continue |
|
|
|
|
|
return market_data |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
logger.error(f"HTTP error fetching from CoinGecko: {e}") |
|
|
return [] |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching from CoinGecko: {e}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
async def save_market_data_to_cache(market_data: List[Dict[str, Any]]) -> int: |
|
|
""" |
|
|
Save REAL market data to database cache AND upload to HuggingFace Datasets |
|
|
|
|
|
Data Flow: |
|
|
1. Save to SQLite cache (local persistence) |
|
|
2. Upload to HuggingFace Datasets (cloud storage & hub) |
|
|
3. Clients can fetch from HuggingFace Datasets |
|
|
|
|
|
Args: |
|
|
market_data: List of REAL market data dictionaries |
|
|
|
|
|
Returns: |
|
|
int: Number of records saved |
|
|
""" |
|
|
saved_count = 0 |
|
|
|
|
|
|
|
|
for data in market_data: |
|
|
try: |
|
|
success = cache.save_market_data( |
|
|
symbol=data["symbol"], |
|
|
price=data["price"], |
|
|
market_cap=data.get("market_cap"), |
|
|
volume_24h=data.get("volume_24h"), |
|
|
change_24h=data.get("change_24h"), |
|
|
high_24h=data.get("high_24h"), |
|
|
low_24h=data.get("low_24h"), |
|
|
provider=data["provider"] |
|
|
) |
|
|
|
|
|
if success: |
|
|
saved_count += 1 |
|
|
logger.debug(f"Saved market data for {data['symbol']}: ${data['price']:.2f}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving market data for {data.get('symbol')}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader and market_data: |
|
|
try: |
|
|
logger.info(f"📤 Uploading {len(market_data)} market records to HuggingFace Datasets...") |
|
|
upload_success = await hf_uploader.upload_market_data( |
|
|
market_data, |
|
|
append=True |
|
|
) |
|
|
|
|
|
if upload_success: |
|
|
logger.info(f"✅ Successfully uploaded market data to HuggingFace Datasets") |
|
|
else: |
|
|
logger.warning(f"⚠️ Failed to upload market data to HuggingFace Datasets") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error uploading to HuggingFace Datasets: {e}") |
|
|
|
|
|
|
|
|
return saved_count |
|
|
|
|
|
|
|
|
async def market_data_worker_loop(): |
|
|
""" |
|
|
Background worker loop - Fetch REAL market data periodically |
|
|
|
|
|
CRITICAL RULES: |
|
|
1. Run continuously in background |
|
|
2. Fetch REAL data from CoinGecko every 60 seconds |
|
|
3. Store REAL data in database |
|
|
4. NEVER generate fake data as fallback |
|
|
5. If API fails, log error and retry on next iteration |
|
|
""" |
|
|
|
|
|
logger.info("Starting market data background worker") |
|
|
iteration = 0 |
|
|
|
|
|
while True: |
|
|
try: |
|
|
iteration += 1 |
|
|
start_time = time.time() |
|
|
|
|
|
logger.info(f"[Iteration {iteration}] Fetching REAL market data from CoinGecko...") |
|
|
|
|
|
|
|
|
market_data = await fetch_coingecko_prices() |
|
|
|
|
|
if not market_data or len(market_data) == 0: |
|
|
logger.warning(f"[Iteration {iteration}] No data received from CoinGecko API") |
|
|
|
|
|
await asyncio.sleep(60) |
|
|
continue |
|
|
|
|
|
|
|
|
saved_count = await save_market_data_to_cache(market_data) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
logger.info( |
|
|
f"[Iteration {iteration}] Successfully saved {saved_count}/{len(market_data)} " |
|
|
f"REAL market records from CoinGecko in {elapsed:.2f}s" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
|
|
|
async def start_market_data_worker(): |
|
|
""" |
|
|
Start market data background worker |
|
|
|
|
|
This should be called during application startup |
|
|
""" |
|
|
try: |
|
|
logger.info("Initializing market data worker...") |
|
|
|
|
|
|
|
|
logger.info("Running initial market data fetch...") |
|
|
market_data = await fetch_coingecko_prices() |
|
|
|
|
|
if market_data and len(market_data) > 0: |
|
|
saved_count = await save_market_data_to_cache(market_data) |
|
|
logger.info(f"Initial fetch: Saved {saved_count} REAL market records") |
|
|
else: |
|
|
logger.warning("Initial fetch returned no data") |
|
|
|
|
|
|
|
|
asyncio.create_task(market_data_worker_loop()) |
|
|
logger.info("Market data worker started successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to start market data worker: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import sys |
|
|
sys.path.append("/workspace") |
|
|
|
|
|
async def test(): |
|
|
"""Test the worker""" |
|
|
logger.info("Testing market data worker...") |
|
|
|
|
|
|
|
|
data = await fetch_coingecko_prices() |
|
|
logger.info(f"Fetched {len(data)} coins from CoinGecko") |
|
|
|
|
|
if data: |
|
|
|
|
|
for coin in data[:5]: |
|
|
logger.info(f" {coin['symbol']}: ${coin['price']:.2f}") |
|
|
|
|
|
|
|
|
saved = await save_market_data_to_cache(data) |
|
|
logger.info(f"Saved {saved} records to database") |
|
|
|
|
|
asyncio.run(test()) |
|
|
|