|
|
|
|
|
""" |
|
|
Comprehensive Data Worker - Collect ALL Data from ALL Sources |
|
|
Uses all resources from crypto_resources_unified_2025-11-11.json |
|
|
|
|
|
This worker ensures ZERO data sources are left unused: |
|
|
- 23 Market Data APIs |
|
|
- 15 News APIs |
|
|
- 12 Sentiment APIs |
|
|
- 13 On-chain Analytics APIs |
|
|
- 9 Whale Tracking APIs |
|
|
- 18 Block Explorers |
|
|
- 1 Community Sentiment API |
|
|
- 24 RPC Nodes |
|
|
- 7 HuggingFace Resources |
|
|
- 13 Free HTTP Endpoints |
|
|
|
|
|
ALL data is uploaded to HuggingFace Datasets |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import time |
|
|
import logging |
|
|
import os |
|
|
from datetime import datetime, timedelta |
|
|
from typing import List, Dict, Any, Optional |
|
|
import httpx |
|
|
|
|
|
from database.cache_queries import get_cache_queries |
|
|
from database.db_manager import db_manager |
|
|
from utils.logger import setup_logger |
|
|
from unified_resource_loader import get_loader |
|
|
|
|
|
logger = setup_logger("comprehensive_worker") |
|
|
|
|
|
|
|
|
resource_loader = get_loader() |
|
|
cache = get_cache_queries(db_manager) |
|
|
|
|
|
|
|
|
HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")) |
|
|
if HF_UPLOAD_ENABLED: |
|
|
try: |
|
|
from hf_dataset_uploader import get_dataset_uploader |
|
|
hf_uploader = get_dataset_uploader() |
|
|
logger.info("β
HuggingFace Dataset upload ENABLED for comprehensive worker") |
|
|
except Exception as e: |
|
|
logger.warning(f"HuggingFace Dataset upload disabled: {e}") |
|
|
HF_UPLOAD_ENABLED = False |
|
|
hf_uploader = None |
|
|
else: |
|
|
logger.info("βΉοΈ HuggingFace Dataset upload DISABLED (no HF_TOKEN)") |
|
|
hf_uploader = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_news_from_cryptopanic() -> List[Dict[str, Any]]: |
|
|
"""Fetch news from CryptoPanic (FREE, no API key)""" |
|
|
try: |
|
|
url = "https://cryptopanic.com/api/v1/posts/" |
|
|
params = {"auth_token": "free", "public": "true", "kind": "news", "filter": "rising"} |
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client: |
|
|
response = await client.get(url, params=params) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
news_items = [] |
|
|
for post in data.get("results", [])[:15]: |
|
|
news_items.append({ |
|
|
"title": post.get("title", ""), |
|
|
"description": post.get("title", ""), |
|
|
"url": post.get("url", ""), |
|
|
"published_at": post.get("created_at", ""), |
|
|
"source": "CryptoPanic", |
|
|
"source_id": "cryptopanic", |
|
|
"category": "news", |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
|
|
|
logger.info(f"β
CryptoPanic: {len(news_items)} articles") |
|
|
return news_items |
|
|
except Exception as e: |
|
|
logger.debug(f"CryptoPanic error: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def fetch_news_from_coinstats() -> List[Dict[str, Any]]: |
|
|
"""Fetch news from CoinStats (FREE, no API key)""" |
|
|
try: |
|
|
url = "https://api.coin-stats.com/v2/news" |
|
|
params = {"limit": 20} |
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client: |
|
|
response = await client.get(url, params=params) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
news_items = [] |
|
|
for article in data.get("news", [])[:15]: |
|
|
news_items.append({ |
|
|
"title": article.get("title", ""), |
|
|
"description": article.get("description", ""), |
|
|
"url": article.get("link", ""), |
|
|
"published_at": article.get("published", ""), |
|
|
"source": "CoinStats", |
|
|
"source_id": "coinstats", |
|
|
"category": "news", |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
|
|
|
logger.info(f"β
CoinStats: {len(news_items)} articles") |
|
|
return news_items |
|
|
except Exception as e: |
|
|
logger.debug(f"CoinStats error: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def fetch_news_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch news from multiple free sources |
|
|
|
|
|
Sources: |
|
|
- CryptoPanic (FREE, no API key) |
|
|
- CoinStats (FREE, no API key) |
|
|
- Other news sources from registry |
|
|
""" |
|
|
news_data = [] |
|
|
|
|
|
|
|
|
try: |
|
|
cryptopanic_news = await fetch_news_from_cryptopanic() |
|
|
news_data.extend(cryptopanic_news) |
|
|
except Exception as e: |
|
|
logger.debug(f"Error fetching CryptoPanic: {e}") |
|
|
|
|
|
try: |
|
|
coinstats_news = await fetch_news_from_coinstats() |
|
|
news_data.extend(coinstats_news) |
|
|
except Exception as e: |
|
|
logger.debug(f"Error fetching CoinStats: {e}") |
|
|
|
|
|
|
|
|
news_resources = resource_loader.get_resources_by_category("news") |
|
|
logger.info(f"π° Fetching news from {len(news_resources)} additional sources...") |
|
|
|
|
|
for resource in news_resources: |
|
|
try: |
|
|
|
|
|
if resource.auth_type != "none" and not resource.api_key: |
|
|
logger.debug(f"Skipping {resource.name} (no API key)") |
|
|
continue |
|
|
|
|
|
|
|
|
url = resource.base_url |
|
|
headers = {} |
|
|
params = {} |
|
|
|
|
|
|
|
|
if resource.auth_type == "apiKeyHeader" and resource.api_key: |
|
|
headers["Authorization"] = f"Bearer {resource.api_key}" |
|
|
elif resource.auth_type == "apiKeyQuery" and resource.api_key: |
|
|
params["apiKey"] = resource.api_key |
|
|
|
|
|
|
|
|
if "newsapi" in resource.id: |
|
|
|
|
|
if not resource.api_key or resource.api_key.startswith("pub_"): |
|
|
logger.debug(f"Skipping {resource.name} (invalid API key)") |
|
|
continue |
|
|
url = f"{resource.base_url}/everything" |
|
|
params.update({ |
|
|
"q": "cryptocurrency OR bitcoin OR ethereum", |
|
|
"language": "en", |
|
|
"sortBy": "publishedAt", |
|
|
"pageSize": 20 |
|
|
}) |
|
|
elif "cryptopanic" in resource.id: |
|
|
|
|
|
continue |
|
|
elif "cryptocontrol" in resource.id: |
|
|
url = f"{resource.base_url}/news" |
|
|
|
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...") |
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: |
|
|
response = await client.get(url, headers=headers, params=params) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
content_type = response.headers.get("content-type", "") |
|
|
if "application/json" not in content_type and "text/json" not in content_type: |
|
|
|
|
|
logger.debug(f"Non-JSON response from {resource.name}: {content_type}") |
|
|
continue |
|
|
|
|
|
data = response.json() |
|
|
|
|
|
|
|
|
articles = [] |
|
|
if "newsapi" in resource.id: |
|
|
articles = data.get("articles", []) |
|
|
elif "cryptopanic" in resource.id: |
|
|
articles = data.get("results", []) |
|
|
else: |
|
|
articles = data if isinstance(data, list) else data.get("news", []) |
|
|
|
|
|
|
|
|
for article in articles[:10]: |
|
|
try: |
|
|
normalized = { |
|
|
"title": article.get("title", article.get("name", "")), |
|
|
"description": article.get("description", article.get("summary", "")), |
|
|
"url": article.get("url", article.get("link", "")), |
|
|
"published_at": article.get("publishedAt", article.get("published_at", article.get("created_at", ""))), |
|
|
"source": resource.name, |
|
|
"source_id": resource.id, |
|
|
"category": "news", |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
news_data.append(normalized) |
|
|
except Exception as e: |
|
|
logger.debug(f"Error parsing article: {e}") |
|
|
continue |
|
|
|
|
|
logger.info(f"β
{resource.name}: {len(articles[:10])} articles") |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
logger.debug(f"HTTP error from {resource.name}: {e}") |
|
|
except Exception as e: |
|
|
logger.debug(f"Error fetching from {resource.name}: {e}") |
|
|
|
|
|
logger.info(f"π° Total news articles collected: {len(news_data)}") |
|
|
return news_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_fear_greed_index() -> List[Dict[str, Any]]: |
|
|
"""Fetch Fear & Greed Index from Alternative.me (FREE, no API key)""" |
|
|
try: |
|
|
url = "https://api.alternative.me/fng/" |
|
|
params = {"limit": "1"} |
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client: |
|
|
response = await client.get(url, params=params) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
fng_list = data.get("data", []) |
|
|
if isinstance(fng_list, list) and len(fng_list) > 0: |
|
|
fng_data = fng_list[0] |
|
|
sentiment = { |
|
|
"metric": "fear_greed_index", |
|
|
"value": float(fng_data.get("value", 0)), |
|
|
"classification": fng_data.get("value_classification", ""), |
|
|
"source": "Alternative.me", |
|
|
"source_id": "alternative-me-fng", |
|
|
"timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z", |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
logger.info(f"β
Fear & Greed Index: {fng_data.get('value')} ({fng_data.get('value_classification')})") |
|
|
return [sentiment] |
|
|
except Exception as e: |
|
|
logger.debug(f"Fear & Greed Index error: {e}") |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
async def fetch_sentiment_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch sentiment data from multiple sources |
|
|
|
|
|
Sources: |
|
|
- Alternative.me Fear & Greed Index (FREE, no API key) |
|
|
- LunarCrush (requires API key) |
|
|
- Santiment (requires API key) |
|
|
- And other sentiment sources from registry |
|
|
""" |
|
|
sentiment_data = [] |
|
|
|
|
|
|
|
|
try: |
|
|
fng_data = await fetch_fear_greed_index() |
|
|
sentiment_data.extend(fng_data) |
|
|
except Exception as e: |
|
|
logger.debug(f"Error fetching Fear & Greed Index: {e}") |
|
|
|
|
|
sentiment_resources = resource_loader.get_resources_by_category("sentiment") |
|
|
logger.info(f"π Fetching sentiment from {len(sentiment_resources)} additional sources...") |
|
|
|
|
|
for resource in sentiment_resources: |
|
|
try: |
|
|
|
|
|
if resource.auth_type != "none" and not resource.api_key: |
|
|
logger.debug(f"Skipping {resource.name} (no API key)") |
|
|
continue |
|
|
|
|
|
url = resource.base_url |
|
|
headers = {} |
|
|
params = {} |
|
|
|
|
|
|
|
|
if resource.auth_type == "apiKeyHeader" and resource.api_key: |
|
|
headers["Authorization"] = f"Bearer {resource.api_key}" |
|
|
elif resource.auth_type == "apiKeyQuery" and resource.api_key: |
|
|
params["api_key"] = resource.api_key |
|
|
|
|
|
|
|
|
if "alternative.me" in resource.id or "alternative-me" in resource.id: |
|
|
|
|
|
continue |
|
|
elif "lunarcrush" in resource.id: |
|
|
url = f"{resource.base_url}/assets" |
|
|
params.update({"symbol": "BTC,ETH,BNB", "data_points": 1}) |
|
|
|
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...") |
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: |
|
|
response = await client.get(url, headers=headers, params=params) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
content_type = response.headers.get("content-type", "") |
|
|
if "application/json" not in content_type and "text/json" not in content_type: |
|
|
logger.debug(f"Non-JSON response from {resource.name}: {content_type}") |
|
|
continue |
|
|
|
|
|
data = response.json() |
|
|
|
|
|
|
|
|
if "alternative.me" in resource.id or "alternative-me" in resource.id: |
|
|
fng_list = data.get("data", []) |
|
|
if isinstance(fng_list, list) and len(fng_list) > 0: |
|
|
fng_data = fng_list[0] |
|
|
sentiment_data.append({ |
|
|
"metric": "fear_greed_index", |
|
|
"value": float(fng_data.get("value", 0)), |
|
|
"classification": fng_data.get("value_classification", ""), |
|
|
"source": resource.name, |
|
|
"source_id": resource.id, |
|
|
"timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z", |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
logger.info(f"β
{resource.name}: FNG = {fng_data.get('value')} ({fng_data.get('value_classification')})") |
|
|
|
|
|
elif "lunarcrush" in resource.id: |
|
|
assets = data.get("data", []) |
|
|
for asset in assets: |
|
|
sentiment_data.append({ |
|
|
"symbol": asset.get("symbol", ""), |
|
|
"metric": "galaxy_score", |
|
|
"value": float(asset.get("galaxy_score", 0)), |
|
|
"alt_rank": asset.get("alt_rank"), |
|
|
"social_volume": asset.get("social_volume"), |
|
|
"source": resource.name, |
|
|
"source_id": resource.id, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
logger.info(f"β
{resource.name}: {len(assets)} assets") |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
logger.debug(f"HTTP error from {resource.name}: {e}") |
|
|
except Exception as e: |
|
|
logger.debug(f"Error fetching from {resource.name}: {e}") |
|
|
|
|
|
logger.info(f"π Total sentiment data collected: {len(sentiment_data)}") |
|
|
return sentiment_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_onchain_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch on-chain analytics from ALL on-chain APIs |
|
|
|
|
|
Sources: |
|
|
- Glassnode |
|
|
- IntoTheBlock |
|
|
- CryptoQuant |
|
|
- And all other on-chain sources (13 total) |
|
|
""" |
|
|
onchain_data = [] |
|
|
onchain_resources = resource_loader.get_resources_by_category("onchain_analytics") |
|
|
|
|
|
logger.info(f"βοΈ Fetching on-chain data from {len(onchain_resources)} sources...") |
|
|
|
|
|
for resource in onchain_resources: |
|
|
try: |
|
|
|
|
|
if resource.auth_type != "none" and not resource.api_key: |
|
|
logger.debug(f"Skipping {resource.name} (no API key)") |
|
|
continue |
|
|
|
|
|
|
|
|
url = resource.base_url |
|
|
headers = {} |
|
|
params = {} |
|
|
|
|
|
if resource.auth_type == "apiKeyQuery" and resource.api_key: |
|
|
params["api_key"] = resource.api_key |
|
|
elif resource.auth_type == "apiKeyHeader" and resource.api_key: |
|
|
headers["Authorization"] = f"Bearer {resource.api_key}" |
|
|
|
|
|
|
|
|
logger.debug(f"Attempting {resource.name}...") |
|
|
async with httpx.AsyncClient(timeout=10.0) as client: |
|
|
response = await client.get(url, headers=headers, params=params) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
|
|
|
onchain_data.append({ |
|
|
"source": resource.name, |
|
|
"source_id": resource.id, |
|
|
"data": data, |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
logger.info(f"β
{resource.name}: Data received") |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
logger.debug(f"HTTP error from {resource.name}: {e}") |
|
|
except Exception as e: |
|
|
logger.debug(f"Error from {resource.name}: {e}") |
|
|
|
|
|
logger.info(f"βοΈ Total on-chain data points: {len(onchain_data)}") |
|
|
return onchain_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_whale_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch whale transactions from ALL whale tracking APIs |
|
|
|
|
|
Sources: |
|
|
- Whale Alert |
|
|
- Whale Watcher |
|
|
- And all other whale tracking sources (9 total) |
|
|
""" |
|
|
whale_data = [] |
|
|
whale_resources = resource_loader.get_resources_by_category("whale_tracking") |
|
|
|
|
|
logger.info(f"π Fetching whale data from {len(whale_resources)} sources...") |
|
|
|
|
|
for resource in whale_resources: |
|
|
try: |
|
|
if resource.auth_type != "none" and not resource.api_key: |
|
|
logger.debug(f"Skipping {resource.name} (no API key)") |
|
|
continue |
|
|
|
|
|
url = resource.base_url |
|
|
headers = {} |
|
|
params = {} |
|
|
|
|
|
if resource.auth_type == "apiKeyQuery" and resource.api_key: |
|
|
params["api_key"] = resource.api_key |
|
|
elif resource.auth_type == "apiKeyHeader" and resource.api_key: |
|
|
headers["X-API-Key"] = resource.api_key |
|
|
|
|
|
|
|
|
if "whale-alert" in resource.id and resource.endpoints: |
|
|
url = f"{resource.base_url}/transactions" |
|
|
params["min_value"] = 500000 |
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...") |
|
|
async with httpx.AsyncClient(timeout=10.0) as client: |
|
|
response = await client.get(url, headers=headers, params=params) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
transactions = data.get("transactions", []) if isinstance(data, dict) else data |
|
|
|
|
|
for tx in transactions[:20]: |
|
|
whale_data.append({ |
|
|
"source": resource.name, |
|
|
"source_id": resource.id, |
|
|
"transaction": tx, |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
|
|
|
logger.info(f"β
{resource.name}: {len(transactions[:20])} transactions") |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
logger.debug(f"HTTP error from {resource.name}: {e}") |
|
|
except Exception as e: |
|
|
logger.debug(f"Error from {resource.name}: {e}") |
|
|
|
|
|
logger.info(f"π Total whale transactions: {len(whale_data)}") |
|
|
return whale_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_block_explorer_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch blockchain data from ALL block explorers |
|
|
|
|
|
Sources: |
|
|
- Etherscan |
|
|
- BscScan |
|
|
- Polygonscan |
|
|
- And all other block explorers (18 total) |
|
|
""" |
|
|
explorer_data = [] |
|
|
explorer_resources = resource_loader.get_resources_by_category("block_explorers") |
|
|
|
|
|
logger.info(f"π Fetching from {len(explorer_resources)} block explorers...") |
|
|
|
|
|
for resource in explorer_resources: |
|
|
try: |
|
|
if resource.auth_type != "none" and not resource.api_key: |
|
|
logger.debug(f"Skipping {resource.name} (no API key)") |
|
|
continue |
|
|
|
|
|
url = f"{resource.base_url}/api" |
|
|
params = { |
|
|
"module": "stats", |
|
|
"action": "ethprice", |
|
|
} |
|
|
|
|
|
if resource.api_key: |
|
|
params["apikey"] = resource.api_key |
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...") |
|
|
async with httpx.AsyncClient(timeout=10.0) as client: |
|
|
response = await client.get(url, params=params) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
if data.get("status") == "1": |
|
|
result = data.get("result", {}) |
|
|
explorer_data.append({ |
|
|
"chain": resource.chain if hasattr(resource, 'chain') else "unknown", |
|
|
"source": resource.name, |
|
|
"source_id": resource.id, |
|
|
"price_usd": result.get("ethusd"), |
|
|
"price_btc": result.get("ethbtc"), |
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
logger.info(f"β
{resource.name}: Price data received") |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
logger.debug(f"HTTP error from {resource.name}: {e}") |
|
|
except Exception as e: |
|
|
logger.debug(f"Error from {resource.name}: {e}") |
|
|
|
|
|
logger.info(f"π Total block explorer data: {len(explorer_data)}") |
|
|
return explorer_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def save_and_upload_news(news_data: List[Dict[str, Any]]) -> bool: |
|
|
"""Save news data and upload to HuggingFace""" |
|
|
if not news_data: |
|
|
return False |
|
|
|
|
|
logger.info(f"πΎ Saving {len(news_data)} news articles...") |
|
|
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader: |
|
|
try: |
|
|
logger.info(f"π€ Uploading {len(news_data)} news articles to HuggingFace...") |
|
|
success = await hf_uploader.upload_news_data(news_data, append=True) |
|
|
|
|
|
if success: |
|
|
logger.info(f"β
Successfully uploaded news to HuggingFace") |
|
|
return True |
|
|
else: |
|
|
logger.warning(f"β οΈ Failed to upload news to HuggingFace") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error uploading news to HuggingFace: {e}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
async def save_and_upload_sentiment(sentiment_data: List[Dict[str, Any]]) -> bool: |
|
|
"""Save sentiment data and upload to HuggingFace""" |
|
|
if not sentiment_data: |
|
|
return False |
|
|
|
|
|
logger.info(f"πΎ Saving {len(sentiment_data)} sentiment records...") |
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader: |
|
|
try: |
|
|
logger.info(f"π€ Uploading {len(sentiment_data)} sentiment records to HuggingFace...") |
|
|
success = await hf_uploader.upload_sentiment_data(sentiment_data, append=True) |
|
|
|
|
|
if success: |
|
|
logger.info(f"β
Successfully uploaded sentiment to HuggingFace") |
|
|
return True |
|
|
else: |
|
|
logger.warning(f"β οΈ Failed to upload sentiment to HuggingFace") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error uploading sentiment: {e}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
async def save_and_upload_onchain(onchain_data: List[Dict[str, Any]]) -> bool: |
|
|
"""Save on-chain data and upload to HuggingFace""" |
|
|
if not onchain_data: |
|
|
return False |
|
|
|
|
|
logger.info(f"πΎ Saving {len(onchain_data)} on-chain records...") |
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader: |
|
|
try: |
|
|
logger.info(f"π€ Uploading {len(onchain_data)} on-chain records to HuggingFace...") |
|
|
success = await hf_uploader.upload_onchain_data(onchain_data, append=True) |
|
|
|
|
|
if success: |
|
|
logger.info(f"β
Successfully uploaded on-chain data to HuggingFace") |
|
|
return True |
|
|
else: |
|
|
logger.warning(f"β οΈ Failed to upload on-chain data to HuggingFace") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error uploading on-chain data: {e}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
async def save_and_upload_whale(whale_data: List[Dict[str, Any]]) -> bool: |
|
|
"""Save whale data and upload to HuggingFace""" |
|
|
if not whale_data: |
|
|
return False |
|
|
|
|
|
logger.info(f"πΎ Saving {len(whale_data)} whale records...") |
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader: |
|
|
try: |
|
|
logger.info(f"π€ Uploading {len(whale_data)} whale records to HuggingFace...") |
|
|
success = await hf_uploader.upload_whale_data(whale_data, append=True) |
|
|
|
|
|
if success: |
|
|
logger.info(f"β
Successfully uploaded whale data to HuggingFace") |
|
|
return True |
|
|
else: |
|
|
logger.warning(f"β οΈ Failed to upload whale data to HuggingFace") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error uploading whale data: {e}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
async def save_and_upload_explorer(explorer_data: List[Dict[str, Any]]) -> bool: |
|
|
"""Save explorer data and upload to HuggingFace""" |
|
|
if not explorer_data: |
|
|
return False |
|
|
|
|
|
logger.info(f"πΎ Saving {len(explorer_data)} explorer records...") |
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader: |
|
|
try: |
|
|
logger.info(f"π€ Uploading {len(explorer_data)} explorer records to HuggingFace...") |
|
|
success = await hf_uploader.upload_explorer_data(explorer_data, append=True) |
|
|
|
|
|
if success: |
|
|
logger.info(f"β
Successfully uploaded explorer data to HuggingFace") |
|
|
return True |
|
|
else: |
|
|
logger.warning(f"β οΈ Failed to upload explorer data to HuggingFace") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error uploading explorer data: {e}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def comprehensive_worker_loop(): |
|
|
""" |
|
|
Main worker loop - Fetch ALL data from ALL sources |
|
|
|
|
|
Runs every 5 minutes to avoid rate limits |
|
|
""" |
|
|
logger.info("π Starting comprehensive data worker") |
|
|
logger.info(f"π Resource statistics: {resource_loader.get_stats()}") |
|
|
|
|
|
iteration = 0 |
|
|
|
|
|
while True: |
|
|
try: |
|
|
iteration += 1 |
|
|
start_time = time.time() |
|
|
|
|
|
logger.info(f"\n{'='*80}") |
|
|
logger.info(f"[Iteration {iteration}] Starting comprehensive data collection") |
|
|
logger.info(f"{'='*80}") |
|
|
|
|
|
|
|
|
results = await asyncio.gather( |
|
|
fetch_news_data(), |
|
|
fetch_sentiment_data(), |
|
|
fetch_onchain_data(), |
|
|
fetch_whale_data(), |
|
|
fetch_block_explorer_data(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
news_data, sentiment_data, onchain_data, whale_data, explorer_data = results |
|
|
|
|
|
|
|
|
await asyncio.gather( |
|
|
save_and_upload_news(news_data if not isinstance(news_data, Exception) else []), |
|
|
save_and_upload_sentiment(sentiment_data if not isinstance(sentiment_data, Exception) else []), |
|
|
save_and_upload_onchain(onchain_data if not isinstance(onchain_data, Exception) else []), |
|
|
save_and_upload_whale(whale_data if not isinstance(whale_data, Exception) else []), |
|
|
save_and_upload_explorer(explorer_data if not isinstance(explorer_data, Exception) else []), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
total_records = sum([ |
|
|
len(news_data) if not isinstance(news_data, Exception) else 0, |
|
|
len(sentiment_data) if not isinstance(sentiment_data, Exception) else 0, |
|
|
len(onchain_data) if not isinstance(onchain_data, Exception) else 0, |
|
|
len(whale_data) if not isinstance(whale_data, Exception) else 0, |
|
|
len(explorer_data) if not isinstance(explorer_data, Exception) else 0, |
|
|
]) |
|
|
|
|
|
logger.info(f"\n{'='*80}") |
|
|
logger.info(f"[Iteration {iteration}] Completed in {elapsed:.2f}s") |
|
|
logger.info(f"Total records collected: {total_records}") |
|
|
logger.info(f"{'='*80}\n") |
|
|
|
|
|
|
|
|
await asyncio.sleep(300) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True) |
|
|
await asyncio.sleep(300) |
|
|
|
|
|
|
|
|
async def start_comprehensive_worker(): |
|
|
"""Start comprehensive data worker""" |
|
|
try: |
|
|
logger.info("Initializing comprehensive data worker...") |
|
|
|
|
|
|
|
|
logger.info("Running initial data fetch...") |
|
|
asyncio.create_task(comprehensive_worker_loop()) |
|
|
logger.info("Comprehensive data worker started successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to start comprehensive worker: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def test(): |
|
|
"""Test the worker""" |
|
|
logger.info("Testing comprehensive data worker...") |
|
|
|
|
|
|
|
|
news = await fetch_news_data() |
|
|
logger.info(f"\nβ
News: {len(news)} articles") |
|
|
|
|
|
sentiment = await fetch_sentiment_data() |
|
|
logger.info(f"β
Sentiment: {len(sentiment)} records") |
|
|
|
|
|
onchain = await fetch_onchain_data() |
|
|
logger.info(f"β
On-chain: {len(onchain)} records") |
|
|
|
|
|
whale = await fetch_whale_data() |
|
|
logger.info(f"β
Whale: {len(whale)} transactions") |
|
|
|
|
|
explorer = await fetch_block_explorer_data() |
|
|
logger.info(f"β
Explorer: {len(explorer)} records") |
|
|
|
|
|
asyncio.run(test()) |
|
|
|