|
|
|
|
|
"""
|
|
|
Comprehensive Data Worker - Collect ALL Data from ALL Sources
|
|
|
Uses all resources from crypto_resources_unified_2025-11-11.json
|
|
|
|
|
|
This worker ensures ZERO data sources are left unused:
|
|
|
- 23 Market Data APIs
|
|
|
- 15 News APIs
|
|
|
- 12 Sentiment APIs
|
|
|
- 13 On-chain Analytics APIs
|
|
|
- 9 Whale Tracking APIs
|
|
|
- 18 Block Explorers
|
|
|
- 1 Community Sentiment API
|
|
|
- 24 RPC Nodes
|
|
|
- 7 HuggingFace Resources
|
|
|
- 13 Free HTTP Endpoints
|
|
|
|
|
|
ALL data is uploaded to HuggingFace Datasets
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import time
|
|
|
import logging
|
|
|
import os
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
import httpx
|
|
|
|
|
|
from database.cache_queries import get_cache_queries
|
|
|
from database.db_manager import db_manager
|
|
|
from utils.logger import setup_logger
|
|
|
from unified_resource_loader import get_loader
|
|
|
|
|
|
logger = setup_logger("comprehensive_worker")
|
|
|
|
|
|
|
|
|
resource_loader = get_loader()
|
|
|
cache = get_cache_queries(db_manager)
|
|
|
|
|
|
|
|
|
HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN"))
|
|
|
if HF_UPLOAD_ENABLED:
|
|
|
try:
|
|
|
from hf_dataset_uploader import get_dataset_uploader
|
|
|
hf_uploader = get_dataset_uploader()
|
|
|
logger.info("β
HuggingFace Dataset upload ENABLED for comprehensive worker")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"HuggingFace Dataset upload disabled: {e}")
|
|
|
HF_UPLOAD_ENABLED = False
|
|
|
hf_uploader = None
|
|
|
else:
|
|
|
logger.info("βΉοΈ HuggingFace Dataset upload DISABLED (no HF_TOKEN)")
|
|
|
hf_uploader = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_news_from_cryptopanic() -> List[Dict[str, Any]]:
|
|
|
"""Fetch news from CryptoPanic (FREE, no API key)"""
|
|
|
try:
|
|
|
url = "https://cryptopanic.com/api/v1/posts/"
|
|
|
params = {"auth_token": "free", "public": "true", "kind": "news", "filter": "rising"}
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
news_items = []
|
|
|
for post in data.get("results", [])[:15]:
|
|
|
news_items.append({
|
|
|
"title": post.get("title", ""),
|
|
|
"description": post.get("title", ""),
|
|
|
"url": post.get("url", ""),
|
|
|
"published_at": post.get("created_at", ""),
|
|
|
"source": "CryptoPanic",
|
|
|
"source_id": "cryptopanic",
|
|
|
"category": "news",
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
|
|
|
logger.info(f"β
CryptoPanic: {len(news_items)} articles")
|
|
|
return news_items
|
|
|
except Exception as e:
|
|
|
logger.debug(f"CryptoPanic error: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def fetch_news_from_coinstats() -> List[Dict[str, Any]]:
|
|
|
"""Fetch news from CoinStats (FREE, no API key)"""
|
|
|
try:
|
|
|
url = "https://api.coin-stats.com/v2/news"
|
|
|
params = {"limit": 20}
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
news_items = []
|
|
|
for article in data.get("news", [])[:15]:
|
|
|
news_items.append({
|
|
|
"title": article.get("title", ""),
|
|
|
"description": article.get("description", ""),
|
|
|
"url": article.get("link", ""),
|
|
|
"published_at": article.get("published", ""),
|
|
|
"source": "CoinStats",
|
|
|
"source_id": "coinstats",
|
|
|
"category": "news",
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
|
|
|
logger.info(f"β
CoinStats: {len(news_items)} articles")
|
|
|
return news_items
|
|
|
except Exception as e:
|
|
|
logger.debug(f"CoinStats error: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def fetch_news_data() -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch news from multiple free sources
|
|
|
|
|
|
Sources:
|
|
|
- CryptoPanic (FREE, no API key)
|
|
|
- CoinStats (FREE, no API key)
|
|
|
- Other news sources from registry
|
|
|
"""
|
|
|
news_data = []
|
|
|
|
|
|
|
|
|
try:
|
|
|
cryptopanic_news = await fetch_news_from_cryptopanic()
|
|
|
news_data.extend(cryptopanic_news)
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error fetching CryptoPanic: {e}")
|
|
|
|
|
|
try:
|
|
|
coinstats_news = await fetch_news_from_coinstats()
|
|
|
news_data.extend(coinstats_news)
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error fetching CoinStats: {e}")
|
|
|
|
|
|
|
|
|
news_resources = resource_loader.get_resources_by_category("news")
|
|
|
logger.info(f"π° Fetching news from {len(news_resources)} additional sources...")
|
|
|
|
|
|
for resource in news_resources:
|
|
|
try:
|
|
|
|
|
|
if resource.auth_type != "none" and not resource.api_key:
|
|
|
logger.debug(f"Skipping {resource.name} (no API key)")
|
|
|
continue
|
|
|
|
|
|
|
|
|
url = resource.base_url
|
|
|
headers = {}
|
|
|
params = {}
|
|
|
|
|
|
|
|
|
if resource.auth_type == "apiKeyHeader" and resource.api_key:
|
|
|
headers["Authorization"] = f"Bearer {resource.api_key}"
|
|
|
elif resource.auth_type == "apiKeyQuery" and resource.api_key:
|
|
|
params["apiKey"] = resource.api_key
|
|
|
|
|
|
|
|
|
if "newsapi" in resource.id:
|
|
|
|
|
|
if not resource.api_key or resource.api_key.startswith("pub_"):
|
|
|
logger.debug(f"Skipping {resource.name} (invalid API key)")
|
|
|
continue
|
|
|
url = f"{resource.base_url}/everything"
|
|
|
params.update({
|
|
|
"q": "cryptocurrency OR bitcoin OR ethereum",
|
|
|
"language": "en",
|
|
|
"sortBy": "publishedAt",
|
|
|
"pageSize": 20
|
|
|
})
|
|
|
elif "cryptopanic" in resource.id:
|
|
|
|
|
|
continue
|
|
|
elif "cryptocontrol" in resource.id:
|
|
|
url = f"{resource.base_url}/news"
|
|
|
|
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...")
|
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
|
|
|
response = await client.get(url, headers=headers, params=params)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
|
content_type = response.headers.get("content-type", "")
|
|
|
if "application/json" not in content_type and "text/json" not in content_type:
|
|
|
|
|
|
logger.debug(f"Non-JSON response from {resource.name}: {content_type}")
|
|
|
continue
|
|
|
|
|
|
data = response.json()
|
|
|
|
|
|
|
|
|
articles = []
|
|
|
if "newsapi" in resource.id:
|
|
|
articles = data.get("articles", [])
|
|
|
elif "cryptopanic" in resource.id:
|
|
|
articles = data.get("results", [])
|
|
|
else:
|
|
|
articles = data if isinstance(data, list) else data.get("news", [])
|
|
|
|
|
|
|
|
|
for article in articles[:10]:
|
|
|
try:
|
|
|
normalized = {
|
|
|
"title": article.get("title", article.get("name", "")),
|
|
|
"description": article.get("description", article.get("summary", "")),
|
|
|
"url": article.get("url", article.get("link", "")),
|
|
|
"published_at": article.get("publishedAt", article.get("published_at", article.get("created_at", ""))),
|
|
|
"source": resource.name,
|
|
|
"source_id": resource.id,
|
|
|
"category": "news",
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
news_data.append(normalized)
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error parsing article: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.info(f"β
{resource.name}: {len(articles[:10])} articles")
|
|
|
|
|
|
except httpx.HTTPError as e:
|
|
|
logger.debug(f"HTTP error from {resource.name}: {e}")
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error fetching from {resource.name}: {e}")
|
|
|
|
|
|
logger.info(f"π° Total news articles collected: {len(news_data)}")
|
|
|
return news_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_fear_greed_index() -> List[Dict[str, Any]]:
|
|
|
"""Fetch Fear & Greed Index from Alternative.me (FREE, no API key)"""
|
|
|
try:
|
|
|
url = "https://api.alternative.me/fng/"
|
|
|
params = {"limit": "1"}
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
fng_list = data.get("data", [])
|
|
|
if isinstance(fng_list, list) and len(fng_list) > 0:
|
|
|
fng_data = fng_list[0]
|
|
|
sentiment = {
|
|
|
"metric": "fear_greed_index",
|
|
|
"value": float(fng_data.get("value", 0)),
|
|
|
"classification": fng_data.get("value_classification", ""),
|
|
|
"source": "Alternative.me",
|
|
|
"source_id": "alternative-me-fng",
|
|
|
"timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z",
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
logger.info(f"β
Fear & Greed Index: {fng_data.get('value')} ({fng_data.get('value_classification')})")
|
|
|
return [sentiment]
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Fear & Greed Index error: {e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def fetch_sentiment_data() -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch sentiment data from multiple sources
|
|
|
|
|
|
Sources:
|
|
|
- Alternative.me Fear & Greed Index (FREE, no API key)
|
|
|
- LunarCrush (requires API key)
|
|
|
- Santiment (requires API key)
|
|
|
- And other sentiment sources from registry
|
|
|
"""
|
|
|
sentiment_data = []
|
|
|
|
|
|
|
|
|
try:
|
|
|
fng_data = await fetch_fear_greed_index()
|
|
|
sentiment_data.extend(fng_data)
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error fetching Fear & Greed Index: {e}")
|
|
|
|
|
|
sentiment_resources = resource_loader.get_resources_by_category("sentiment")
|
|
|
logger.info(f"π Fetching sentiment from {len(sentiment_resources)} additional sources...")
|
|
|
|
|
|
for resource in sentiment_resources:
|
|
|
try:
|
|
|
|
|
|
if resource.auth_type != "none" and not resource.api_key:
|
|
|
logger.debug(f"Skipping {resource.name} (no API key)")
|
|
|
continue
|
|
|
|
|
|
url = resource.base_url
|
|
|
headers = {}
|
|
|
params = {}
|
|
|
|
|
|
|
|
|
if resource.auth_type == "apiKeyHeader" and resource.api_key:
|
|
|
headers["Authorization"] = f"Bearer {resource.api_key}"
|
|
|
elif resource.auth_type == "apiKeyQuery" and resource.api_key:
|
|
|
params["api_key"] = resource.api_key
|
|
|
|
|
|
|
|
|
if "alternative.me" in resource.id or "alternative-me" in resource.id:
|
|
|
|
|
|
continue
|
|
|
elif "lunarcrush" in resource.id:
|
|
|
url = f"{resource.base_url}/assets"
|
|
|
params.update({"symbol": "BTC,ETH,BNB", "data_points": 1})
|
|
|
|
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...")
|
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
|
|
|
response = await client.get(url, headers=headers, params=params)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
|
content_type = response.headers.get("content-type", "")
|
|
|
if "application/json" not in content_type and "text/json" not in content_type:
|
|
|
logger.debug(f"Non-JSON response from {resource.name}: {content_type}")
|
|
|
continue
|
|
|
|
|
|
data = response.json()
|
|
|
|
|
|
|
|
|
if "alternative.me" in resource.id or "alternative-me" in resource.id:
|
|
|
fng_list = data.get("data", [])
|
|
|
if isinstance(fng_list, list) and len(fng_list) > 0:
|
|
|
fng_data = fng_list[0]
|
|
|
sentiment_data.append({
|
|
|
"metric": "fear_greed_index",
|
|
|
"value": float(fng_data.get("value", 0)),
|
|
|
"classification": fng_data.get("value_classification", ""),
|
|
|
"source": resource.name,
|
|
|
"source_id": resource.id,
|
|
|
"timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z",
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
logger.info(f"β
{resource.name}: FNG = {fng_data.get('value')} ({fng_data.get('value_classification')})")
|
|
|
|
|
|
elif "lunarcrush" in resource.id:
|
|
|
assets = data.get("data", [])
|
|
|
for asset in assets:
|
|
|
sentiment_data.append({
|
|
|
"symbol": asset.get("symbol", ""),
|
|
|
"metric": "galaxy_score",
|
|
|
"value": float(asset.get("galaxy_score", 0)),
|
|
|
"alt_rank": asset.get("alt_rank"),
|
|
|
"social_volume": asset.get("social_volume"),
|
|
|
"source": resource.name,
|
|
|
"source_id": resource.id,
|
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
logger.info(f"β
{resource.name}: {len(assets)} assets")
|
|
|
|
|
|
except httpx.HTTPError as e:
|
|
|
logger.debug(f"HTTP error from {resource.name}: {e}")
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error fetching from {resource.name}: {e}")
|
|
|
|
|
|
logger.info(f"π Total sentiment data collected: {len(sentiment_data)}")
|
|
|
return sentiment_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_onchain_data() -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch on-chain analytics from ALL on-chain APIs
|
|
|
|
|
|
Sources:
|
|
|
- Glassnode
|
|
|
- IntoTheBlock
|
|
|
- CryptoQuant
|
|
|
- And all other on-chain sources (13 total)
|
|
|
"""
|
|
|
onchain_data = []
|
|
|
onchain_resources = resource_loader.get_resources_by_category("onchain_analytics")
|
|
|
|
|
|
logger.info(f"βοΈ Fetching on-chain data from {len(onchain_resources)} sources...")
|
|
|
|
|
|
for resource in onchain_resources:
|
|
|
try:
|
|
|
|
|
|
if resource.auth_type != "none" and not resource.api_key:
|
|
|
logger.debug(f"Skipping {resource.name} (no API key)")
|
|
|
continue
|
|
|
|
|
|
|
|
|
url = resource.base_url
|
|
|
headers = {}
|
|
|
params = {}
|
|
|
|
|
|
if resource.auth_type == "apiKeyQuery" and resource.api_key:
|
|
|
params["api_key"] = resource.api_key
|
|
|
elif resource.auth_type == "apiKeyHeader" and resource.api_key:
|
|
|
headers["Authorization"] = f"Bearer {resource.api_key}"
|
|
|
|
|
|
|
|
|
logger.debug(f"Attempting {resource.name}...")
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(url, headers=headers, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
|
|
|
onchain_data.append({
|
|
|
"source": resource.name,
|
|
|
"source_id": resource.id,
|
|
|
"data": data,
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
logger.info(f"β
{resource.name}: Data received")
|
|
|
|
|
|
except httpx.HTTPError as e:
|
|
|
logger.debug(f"HTTP error from {resource.name}: {e}")
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error from {resource.name}: {e}")
|
|
|
|
|
|
logger.info(f"βοΈ Total on-chain data points: {len(onchain_data)}")
|
|
|
return onchain_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_whale_data() -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch whale transactions from ALL whale tracking APIs
|
|
|
|
|
|
Sources:
|
|
|
- Whale Alert
|
|
|
- Whale Watcher
|
|
|
- And all other whale tracking sources (9 total)
|
|
|
"""
|
|
|
whale_data = []
|
|
|
whale_resources = resource_loader.get_resources_by_category("whale_tracking")
|
|
|
|
|
|
logger.info(f"π Fetching whale data from {len(whale_resources)} sources...")
|
|
|
|
|
|
for resource in whale_resources:
|
|
|
try:
|
|
|
if resource.auth_type != "none" and not resource.api_key:
|
|
|
logger.debug(f"Skipping {resource.name} (no API key)")
|
|
|
continue
|
|
|
|
|
|
url = resource.base_url
|
|
|
headers = {}
|
|
|
params = {}
|
|
|
|
|
|
if resource.auth_type == "apiKeyQuery" and resource.api_key:
|
|
|
params["api_key"] = resource.api_key
|
|
|
elif resource.auth_type == "apiKeyHeader" and resource.api_key:
|
|
|
headers["X-API-Key"] = resource.api_key
|
|
|
|
|
|
|
|
|
if "whale-alert" in resource.id and resource.endpoints:
|
|
|
url = f"{resource.base_url}/transactions"
|
|
|
params["min_value"] = 500000
|
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...")
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(url, headers=headers, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
transactions = data.get("transactions", []) if isinstance(data, dict) else data
|
|
|
|
|
|
for tx in transactions[:20]:
|
|
|
whale_data.append({
|
|
|
"source": resource.name,
|
|
|
"source_id": resource.id,
|
|
|
"transaction": tx,
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
|
|
|
logger.info(f"β
{resource.name}: {len(transactions[:20])} transactions")
|
|
|
|
|
|
except httpx.HTTPError as e:
|
|
|
logger.debug(f"HTTP error from {resource.name}: {e}")
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error from {resource.name}: {e}")
|
|
|
|
|
|
logger.info(f"π Total whale transactions: {len(whale_data)}")
|
|
|
return whale_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_block_explorer_data() -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch blockchain data from ALL block explorers
|
|
|
|
|
|
Sources:
|
|
|
- Etherscan
|
|
|
- BscScan
|
|
|
- Polygonscan
|
|
|
- And all other block explorers (18 total)
|
|
|
"""
|
|
|
explorer_data = []
|
|
|
explorer_resources = resource_loader.get_resources_by_category("block_explorers")
|
|
|
|
|
|
logger.info(f"π Fetching from {len(explorer_resources)} block explorers...")
|
|
|
|
|
|
for resource in explorer_resources:
|
|
|
try:
|
|
|
if resource.auth_type != "none" and not resource.api_key:
|
|
|
logger.debug(f"Skipping {resource.name} (no API key)")
|
|
|
continue
|
|
|
|
|
|
url = f"{resource.base_url}/api"
|
|
|
params = {
|
|
|
"module": "stats",
|
|
|
"action": "ethprice",
|
|
|
}
|
|
|
|
|
|
if resource.api_key:
|
|
|
params["apikey"] = resource.api_key
|
|
|
|
|
|
logger.debug(f"Fetching from {resource.name}...")
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
if data.get("status") == "1":
|
|
|
result = data.get("result", {})
|
|
|
explorer_data.append({
|
|
|
"chain": resource.chain if hasattr(resource, 'chain') else "unknown",
|
|
|
"source": resource.name,
|
|
|
"source_id": resource.id,
|
|
|
"price_usd": result.get("ethusd"),
|
|
|
"price_btc": result.get("ethbtc"),
|
|
|
"fetched_at": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
logger.info(f"β
{resource.name}: Price data received")
|
|
|
|
|
|
except httpx.HTTPError as e:
|
|
|
logger.debug(f"HTTP error from {resource.name}: {e}")
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error from {resource.name}: {e}")
|
|
|
|
|
|
logger.info(f"π Total block explorer data: {len(explorer_data)}")
|
|
|
return explorer_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def save_and_upload_news(news_data: List[Dict[str, Any]]) -> bool:
|
|
|
"""Save news data and upload to HuggingFace"""
|
|
|
if not news_data:
|
|
|
return False
|
|
|
|
|
|
logger.info(f"πΎ Saving {len(news_data)} news articles...")
|
|
|
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader:
|
|
|
try:
|
|
|
logger.info(f"π€ Uploading {len(news_data)} news articles to HuggingFace...")
|
|
|
success = await hf_uploader.upload_news_data(news_data, append=True)
|
|
|
|
|
|
if success:
|
|
|
logger.info(f"β
Successfully uploaded news to HuggingFace")
|
|
|
return True
|
|
|
else:
|
|
|
logger.warning(f"β οΈ Failed to upload news to HuggingFace")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error uploading news to HuggingFace: {e}")
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
async def save_and_upload_sentiment(sentiment_data: List[Dict[str, Any]]) -> bool:
|
|
|
"""Save sentiment data and upload to HuggingFace"""
|
|
|
if not sentiment_data:
|
|
|
return False
|
|
|
|
|
|
logger.info(f"πΎ Saving {len(sentiment_data)} sentiment records...")
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader:
|
|
|
try:
|
|
|
logger.info(f"π€ Uploading {len(sentiment_data)} sentiment records to HuggingFace...")
|
|
|
success = await hf_uploader.upload_sentiment_data(sentiment_data, append=True)
|
|
|
|
|
|
if success:
|
|
|
logger.info(f"β
Successfully uploaded sentiment to HuggingFace")
|
|
|
return True
|
|
|
else:
|
|
|
logger.warning(f"β οΈ Failed to upload sentiment to HuggingFace")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error uploading sentiment: {e}")
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
async def save_and_upload_onchain(onchain_data: List[Dict[str, Any]]) -> bool:
|
|
|
"""Save on-chain data and upload to HuggingFace"""
|
|
|
if not onchain_data:
|
|
|
return False
|
|
|
|
|
|
logger.info(f"πΎ Saving {len(onchain_data)} on-chain records...")
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader:
|
|
|
try:
|
|
|
logger.info(f"π€ Uploading {len(onchain_data)} on-chain records to HuggingFace...")
|
|
|
success = await hf_uploader.upload_onchain_data(onchain_data, append=True)
|
|
|
|
|
|
if success:
|
|
|
logger.info(f"β
Successfully uploaded on-chain data to HuggingFace")
|
|
|
return True
|
|
|
else:
|
|
|
logger.warning(f"β οΈ Failed to upload on-chain data to HuggingFace")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error uploading on-chain data: {e}")
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
async def save_and_upload_whale(whale_data: List[Dict[str, Any]]) -> bool:
|
|
|
"""Save whale data and upload to HuggingFace"""
|
|
|
if not whale_data:
|
|
|
return False
|
|
|
|
|
|
logger.info(f"πΎ Saving {len(whale_data)} whale records...")
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader:
|
|
|
try:
|
|
|
logger.info(f"π€ Uploading {len(whale_data)} whale records to HuggingFace...")
|
|
|
success = await hf_uploader.upload_whale_data(whale_data, append=True)
|
|
|
|
|
|
if success:
|
|
|
logger.info(f"β
Successfully uploaded whale data to HuggingFace")
|
|
|
return True
|
|
|
else:
|
|
|
logger.warning(f"β οΈ Failed to upload whale data to HuggingFace")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error uploading whale data: {e}")
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
async def save_and_upload_explorer(explorer_data: List[Dict[str, Any]]) -> bool:
|
|
|
"""Save explorer data and upload to HuggingFace"""
|
|
|
if not explorer_data:
|
|
|
return False
|
|
|
|
|
|
logger.info(f"πΎ Saving {len(explorer_data)} explorer records...")
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader:
|
|
|
try:
|
|
|
logger.info(f"π€ Uploading {len(explorer_data)} explorer records to HuggingFace...")
|
|
|
success = await hf_uploader.upload_explorer_data(explorer_data, append=True)
|
|
|
|
|
|
if success:
|
|
|
logger.info(f"β
Successfully uploaded explorer data to HuggingFace")
|
|
|
return True
|
|
|
else:
|
|
|
logger.warning(f"β οΈ Failed to upload explorer data to HuggingFace")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error uploading explorer data: {e}")
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def comprehensive_worker_loop():
|
|
|
"""
|
|
|
Main worker loop - Fetch ALL data from ALL sources
|
|
|
|
|
|
Runs every 5 minutes to avoid rate limits
|
|
|
"""
|
|
|
logger.info("π Starting comprehensive data worker")
|
|
|
logger.info(f"π Resource statistics: {resource_loader.get_stats()}")
|
|
|
|
|
|
iteration = 0
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
iteration += 1
|
|
|
start_time = time.time()
|
|
|
|
|
|
logger.info(f"\n{'='*80}")
|
|
|
logger.info(f"[Iteration {iteration}] Starting comprehensive data collection")
|
|
|
logger.info(f"{'='*80}")
|
|
|
|
|
|
|
|
|
results = await asyncio.gather(
|
|
|
fetch_news_data(),
|
|
|
fetch_sentiment_data(),
|
|
|
fetch_onchain_data(),
|
|
|
fetch_whale_data(),
|
|
|
fetch_block_explorer_data(),
|
|
|
return_exceptions=True
|
|
|
)
|
|
|
|
|
|
news_data, sentiment_data, onchain_data, whale_data, explorer_data = results
|
|
|
|
|
|
|
|
|
await asyncio.gather(
|
|
|
save_and_upload_news(news_data if not isinstance(news_data, Exception) else []),
|
|
|
save_and_upload_sentiment(sentiment_data if not isinstance(sentiment_data, Exception) else []),
|
|
|
save_and_upload_onchain(onchain_data if not isinstance(onchain_data, Exception) else []),
|
|
|
save_and_upload_whale(whale_data if not isinstance(whale_data, Exception) else []),
|
|
|
save_and_upload_explorer(explorer_data if not isinstance(explorer_data, Exception) else []),
|
|
|
return_exceptions=True
|
|
|
)
|
|
|
|
|
|
elapsed = time.time() - start_time
|
|
|
total_records = sum([
|
|
|
len(news_data) if not isinstance(news_data, Exception) else 0,
|
|
|
len(sentiment_data) if not isinstance(sentiment_data, Exception) else 0,
|
|
|
len(onchain_data) if not isinstance(onchain_data, Exception) else 0,
|
|
|
len(whale_data) if not isinstance(whale_data, Exception) else 0,
|
|
|
len(explorer_data) if not isinstance(explorer_data, Exception) else 0,
|
|
|
])
|
|
|
|
|
|
logger.info(f"\n{'='*80}")
|
|
|
logger.info(f"[Iteration {iteration}] Completed in {elapsed:.2f}s")
|
|
|
logger.info(f"Total records collected: {total_records}")
|
|
|
logger.info(f"{'='*80}\n")
|
|
|
|
|
|
|
|
|
await asyncio.sleep(300)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True)
|
|
|
await asyncio.sleep(300)
|
|
|
|
|
|
|
|
|
async def start_comprehensive_worker():
|
|
|
"""Start comprehensive data worker"""
|
|
|
try:
|
|
|
logger.info("Initializing comprehensive data worker...")
|
|
|
|
|
|
|
|
|
logger.info("Running initial data fetch...")
|
|
|
asyncio.create_task(comprehensive_worker_loop())
|
|
|
logger.info("Comprehensive data worker started successfully")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to start comprehensive worker: {e}", exc_info=True)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
async def test():
|
|
|
"""Test the worker"""
|
|
|
logger.info("Testing comprehensive data worker...")
|
|
|
|
|
|
|
|
|
news = await fetch_news_data()
|
|
|
logger.info(f"\nβ
News: {len(news)} articles")
|
|
|
|
|
|
sentiment = await fetch_sentiment_data()
|
|
|
logger.info(f"β
Sentiment: {len(sentiment)} records")
|
|
|
|
|
|
onchain = await fetch_onchain_data()
|
|
|
logger.info(f"β
On-chain: {len(onchain)} records")
|
|
|
|
|
|
whale = await fetch_whale_data()
|
|
|
logger.info(f"β
Whale: {len(whale)} transactions")
|
|
|
|
|
|
explorer = await fetch_block_explorer_data()
|
|
|
logger.info(f"β
Explorer: {len(explorer)} records")
|
|
|
|
|
|
asyncio.run(test())
|
|
|
|