|
|
|
|
|
""" |
|
|
Data Sources Database Model |
|
|
مدل دیتابیس برای مدیریت منابع داده |
|
|
|
|
|
این مدل برای ذخیره و مدیریت منابع داده استفاده میشود. |
|
|
شامل اطلاعات منبع، وضعیت فعال/غیرفعال، و آمار استفاده. |
|
|
""" |
|
|
|
|
|
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text, Enum, Index |
|
|
from sqlalchemy.orm import relationship |
|
|
from datetime import datetime |
|
|
import enum |
|
|
from typing import Dict, Any, List, Optional |
|
|
import json |
|
|
|
|
|
|
|
|
try: |
|
|
from database.models import Base |
|
|
except ImportError: |
|
|
from sqlalchemy.ext.declarative import declarative_base |
|
|
Base = declarative_base() |
|
|
|
|
|
|
|
|
class DataSourceType(enum.Enum): |
|
|
"""نوع منبع داده""" |
|
|
MARKET = "market" |
|
|
NEWS = "news" |
|
|
SENTIMENT = "sentiment" |
|
|
SOCIAL = "social" |
|
|
ONCHAIN = "onchain" |
|
|
DEFI = "defi" |
|
|
HISTORICAL = "historical" |
|
|
TECHNICAL = "technical" |
|
|
AGGREGATED = "aggregated" |
|
|
|
|
|
|
|
|
class DataSourceStatus(enum.Enum): |
|
|
"""وضعیت منبع داده""" |
|
|
ACTIVE = "active" |
|
|
INACTIVE = "inactive" |
|
|
RATE_LIMITED = "rate_limited" |
|
|
ERROR = "error" |
|
|
MAINTENANCE = "maintenance" |
|
|
|
|
|
|
|
|
class CollectionInterval(enum.Enum): |
|
|
"""بازه جمعآوری داده""" |
|
|
REALTIME = "realtime" |
|
|
MINUTES_1 = "1m" |
|
|
MINUTES_5 = "5m" |
|
|
MINUTES_15 = "15m" |
|
|
MINUTES_30 = "30m" |
|
|
HOURLY = "1h" |
|
|
HOURS_4 = "4h" |
|
|
DAILY = "1d" |
|
|
|
|
|
|
|
|
class DataSource(Base): |
|
|
""" |
|
|
Data Source Model - منبع داده |
|
|
ذخیره اطلاعات و وضعیت منابع داده در دیتابیس |
|
|
""" |
|
|
__tablename__ = 'data_sources' |
|
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True) |
|
|
|
|
|
|
|
|
source_id = Column(String(100), nullable=False, unique=True, index=True) |
|
|
name = Column(String(255), nullable=False) |
|
|
source_type = Column(String(50), nullable=False, index=True) |
|
|
description = Column(Text, nullable=True) |
|
|
|
|
|
|
|
|
base_url = Column(String(500), nullable=False) |
|
|
api_version = Column(String(20), nullable=True) |
|
|
|
|
|
|
|
|
requires_api_key = Column(Boolean, default=False) |
|
|
api_key_env_var = Column(String(100), nullable=True) |
|
|
has_api_key_configured = Column(Boolean, default=False) |
|
|
|
|
|
|
|
|
rate_limit_description = Column(String(100), nullable=True) |
|
|
rate_limit_per_minute = Column(Integer, nullable=True) |
|
|
rate_limit_per_hour = Column(Integer, nullable=True) |
|
|
rate_limit_per_day = Column(Integer, nullable=True) |
|
|
|
|
|
|
|
|
collection_interval = Column(String(20), default="30m") |
|
|
supports_realtime = Column(Boolean, default=False) |
|
|
|
|
|
|
|
|
supported_timeframes = Column(Text, nullable=True) |
|
|
categories = Column(Text, nullable=True) |
|
|
features = Column(Text, nullable=True) |
|
|
|
|
|
|
|
|
is_active = Column(Boolean, default=True, index=True) |
|
|
status = Column(String(50), default="active", index=True) |
|
|
status_message = Column(Text, nullable=True) |
|
|
|
|
|
|
|
|
priority = Column(Integer, default=5) |
|
|
weight = Column(Integer, default=1) |
|
|
|
|
|
|
|
|
is_verified = Column(Boolean, default=False) |
|
|
is_free_tier = Column(Boolean, default=True) |
|
|
|
|
|
|
|
|
total_requests = Column(Integer, default=0) |
|
|
successful_requests = Column(Integer, default=0) |
|
|
failed_requests = Column(Integer, default=0) |
|
|
avg_response_time_ms = Column(Float, default=0.0) |
|
|
last_success_at = Column(DateTime, nullable=True) |
|
|
last_failure_at = Column(DateTime, nullable=True) |
|
|
last_error_message = Column(Text, nullable=True) |
|
|
|
|
|
|
|
|
created_at = Column(DateTime, default=datetime.utcnow) |
|
|
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) |
|
|
last_checked_at = Column(DateTime, nullable=True) |
|
|
|
|
|
|
|
|
__table_args__ = ( |
|
|
Index('idx_source_type_active', 'source_type', 'is_active'), |
|
|
Index('idx_status_priority', 'status', 'priority'), |
|
|
) |
|
|
|
|
|
def __repr__(self): |
|
|
return f"<DataSource(id={self.source_id}, name={self.name}, active={self.is_active})>" |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""تبدیل به دیکشنری""" |
|
|
return { |
|
|
"id": self.id, |
|
|
"source_id": self.source_id, |
|
|
"name": self.name, |
|
|
"source_type": self.source_type, |
|
|
"description": self.description, |
|
|
"base_url": self.base_url, |
|
|
"api_version": self.api_version, |
|
|
"requires_api_key": self.requires_api_key, |
|
|
"api_key_env_var": self.api_key_env_var, |
|
|
"has_api_key_configured": self.has_api_key_configured, |
|
|
"rate_limit_description": self.rate_limit_description, |
|
|
"collection_interval": self.collection_interval, |
|
|
"supports_realtime": self.supports_realtime, |
|
|
"supported_timeframes": json.loads(self.supported_timeframes) if self.supported_timeframes else [], |
|
|
"categories": json.loads(self.categories) if self.categories else [], |
|
|
"features": json.loads(self.features) if self.features else [], |
|
|
"is_active": self.is_active, |
|
|
"status": self.status, |
|
|
"status_message": self.status_message, |
|
|
"priority": self.priority, |
|
|
"weight": self.weight, |
|
|
"is_verified": self.is_verified, |
|
|
"is_free_tier": self.is_free_tier, |
|
|
"total_requests": self.total_requests, |
|
|
"successful_requests": self.successful_requests, |
|
|
"failed_requests": self.failed_requests, |
|
|
"success_rate": (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0, |
|
|
"avg_response_time_ms": self.avg_response_time_ms, |
|
|
"last_success_at": self.last_success_at.isoformat() if self.last_success_at else None, |
|
|
"last_failure_at": self.last_failure_at.isoformat() if self.last_failure_at else None, |
|
|
"created_at": self.created_at.isoformat() if self.created_at else None, |
|
|
"updated_at": self.updated_at.isoformat() if self.updated_at else None, |
|
|
"last_checked_at": self.last_checked_at.isoformat() if self.last_checked_at else None |
|
|
} |
|
|
|
|
|
|
|
|
class DataCollectionLog(Base): |
|
|
""" |
|
|
Data Collection Log - لاگ جمعآوری داده |
|
|
ثبت تاریخچه جمعآوری داده از منابع |
|
|
""" |
|
|
__tablename__ = 'data_collection_logs' |
|
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True) |
|
|
source_id = Column(String(100), nullable=False, index=True) |
|
|
|
|
|
|
|
|
collection_type = Column(String(50), nullable=False) |
|
|
interval_used = Column(String(20), nullable=True) |
|
|
|
|
|
|
|
|
started_at = Column(DateTime, nullable=False, default=datetime.utcnow) |
|
|
completed_at = Column(DateTime, nullable=True) |
|
|
duration_ms = Column(Integer, nullable=True) |
|
|
|
|
|
|
|
|
success = Column(Boolean, default=False) |
|
|
records_fetched = Column(Integer, default=0) |
|
|
records_stored = Column(Integer, default=0) |
|
|
|
|
|
|
|
|
error_type = Column(String(100), nullable=True) |
|
|
error_message = Column(Text, nullable=True) |
|
|
|
|
|
|
|
|
http_status_code = Column(Integer, nullable=True) |
|
|
response_size_bytes = Column(Integer, nullable=True) |
|
|
|
|
|
|
|
|
__table_args__ = ( |
|
|
Index('idx_collection_source_time', 'source_id', 'started_at'), |
|
|
Index('idx_collection_success', 'success', 'started_at'), |
|
|
) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""تبدیل به دیکشنری""" |
|
|
return { |
|
|
"id": self.id, |
|
|
"source_id": self.source_id, |
|
|
"collection_type": self.collection_type, |
|
|
"interval_used": self.interval_used, |
|
|
"started_at": self.started_at.isoformat() if self.started_at else None, |
|
|
"completed_at": self.completed_at.isoformat() if self.completed_at else None, |
|
|
"duration_ms": self.duration_ms, |
|
|
"success": self.success, |
|
|
"records_fetched": self.records_fetched, |
|
|
"records_stored": self.records_stored, |
|
|
"error_type": self.error_type, |
|
|
"error_message": self.error_message, |
|
|
"http_status_code": self.http_status_code, |
|
|
"response_size_bytes": self.response_size_bytes |
|
|
} |
|
|
|
|
|
|
|
|
class CollectionSchedule(Base): |
|
|
""" |
|
|
Collection Schedule - زمانبندی جمعآوری |
|
|
تنظیم بازههای جمعآوری داده برای هر منبع |
|
|
""" |
|
|
__tablename__ = 'collection_schedules' |
|
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True) |
|
|
source_id = Column(String(100), nullable=False, unique=True, index=True) |
|
|
|
|
|
|
|
|
collection_interval = Column(String(20), nullable=False, default="30m") |
|
|
is_enabled = Column(Boolean, default=True) |
|
|
|
|
|
|
|
|
last_run_at = Column(DateTime, nullable=True) |
|
|
next_run_at = Column(DateTime, nullable=True) |
|
|
|
|
|
|
|
|
consecutive_failures = Column(Integer, default=0) |
|
|
total_runs = Column(Integer, default=0) |
|
|
successful_runs = Column(Integer, default=0) |
|
|
|
|
|
|
|
|
backoff_until = Column(DateTime, nullable=True) |
|
|
backoff_multiplier = Column(Float, default=1.0) |
|
|
|
|
|
|
|
|
created_at = Column(DateTime, default=datetime.utcnow) |
|
|
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""تبدیل به دیکشنری""" |
|
|
return { |
|
|
"id": self.id, |
|
|
"source_id": self.source_id, |
|
|
"collection_interval": self.collection_interval, |
|
|
"is_enabled": self.is_enabled, |
|
|
"last_run_at": self.last_run_at.isoformat() if self.last_run_at else None, |
|
|
"next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, |
|
|
"consecutive_failures": self.consecutive_failures, |
|
|
"total_runs": self.total_runs, |
|
|
"successful_runs": self.successful_runs, |
|
|
"success_rate": (self.successful_runs / self.total_runs * 100) if self.total_runs > 0 else 0, |
|
|
"backoff_until": self.backoff_until.isoformat() if self.backoff_until else None, |
|
|
"backoff_multiplier": self.backoff_multiplier, |
|
|
"created_at": self.created_at.isoformat() if self.created_at else None, |
|
|
"updated_at": self.updated_at.isoformat() if self.updated_at else None |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DataSourceManager: |
|
|
""" |
|
|
مدیریت منابع داده در دیتابیس |
|
|
Data Source Manager for database operations |
|
|
""" |
|
|
|
|
|
def __init__(self, session): |
|
|
self.session = session |
|
|
|
|
|
def create_source(self, source_data: Dict[str, Any]) -> Optional[DataSource]: |
|
|
"""ایجاد منبع جدید""" |
|
|
try: |
|
|
source = DataSource( |
|
|
source_id=source_data["source_id"], |
|
|
name=source_data["name"], |
|
|
source_type=source_data.get("source_type", "market"), |
|
|
description=source_data.get("description"), |
|
|
base_url=source_data["base_url"], |
|
|
api_version=source_data.get("api_version"), |
|
|
requires_api_key=source_data.get("requires_api_key", False), |
|
|
api_key_env_var=source_data.get("api_key_env_var"), |
|
|
rate_limit_description=source_data.get("rate_limit_description"), |
|
|
collection_interval=source_data.get("collection_interval", "30m"), |
|
|
supports_realtime=source_data.get("supports_realtime", False), |
|
|
supported_timeframes=json.dumps(source_data.get("supported_timeframes", [])), |
|
|
categories=json.dumps(source_data.get("categories", [])), |
|
|
features=json.dumps(source_data.get("features", [])), |
|
|
is_active=source_data.get("is_active", True), |
|
|
status=source_data.get("status", "active"), |
|
|
priority=source_data.get("priority", 5), |
|
|
weight=source_data.get("weight", 1), |
|
|
is_verified=source_data.get("is_verified", False), |
|
|
is_free_tier=source_data.get("is_free_tier", True) |
|
|
) |
|
|
self.session.add(source) |
|
|
self.session.commit() |
|
|
return source |
|
|
except Exception as e: |
|
|
self.session.rollback() |
|
|
print(f"Error creating source: {e}") |
|
|
return None |
|
|
|
|
|
def get_source(self, source_id: str) -> Optional[DataSource]: |
|
|
"""دریافت منبع با شناسه""" |
|
|
return self.session.query(DataSource).filter_by(source_id=source_id).first() |
|
|
|
|
|
def get_all_sources(self) -> List[DataSource]: |
|
|
"""دریافت همه منابع""" |
|
|
return self.session.query(DataSource).all() |
|
|
|
|
|
def get_active_sources(self) -> List[DataSource]: |
|
|
"""دریافت منابع فعال""" |
|
|
return self.session.query(DataSource).filter_by(is_active=True).all() |
|
|
|
|
|
def get_sources_by_type(self, source_type: str) -> List[DataSource]: |
|
|
"""دریافت منابع بر اساس نوع""" |
|
|
return self.session.query(DataSource).filter_by(source_type=source_type, is_active=True).all() |
|
|
|
|
|
def update_source_status(self, source_id: str, is_active: bool, status: str = None, status_message: str = None) -> bool: |
|
|
"""بهروزرسانی وضعیت منبع""" |
|
|
try: |
|
|
source = self.get_source(source_id) |
|
|
if source: |
|
|
source.is_active = is_active |
|
|
if status: |
|
|
source.status = status |
|
|
if status_message: |
|
|
source.status_message = status_message |
|
|
source.updated_at = datetime.utcnow() |
|
|
self.session.commit() |
|
|
return True |
|
|
return False |
|
|
except Exception as e: |
|
|
self.session.rollback() |
|
|
print(f"Error updating source status: {e}") |
|
|
return False |
|
|
|
|
|
def record_request(self, source_id: str, success: bool, response_time_ms: float, error_message: str = None) -> bool: |
|
|
"""ثبت درخواست""" |
|
|
try: |
|
|
source = self.get_source(source_id) |
|
|
if source: |
|
|
source.total_requests += 1 |
|
|
if success: |
|
|
source.successful_requests += 1 |
|
|
source.last_success_at = datetime.utcnow() |
|
|
else: |
|
|
source.failed_requests += 1 |
|
|
source.last_failure_at = datetime.utcnow() |
|
|
if error_message: |
|
|
source.last_error_message = error_message |
|
|
|
|
|
|
|
|
if source.avg_response_time_ms > 0: |
|
|
source.avg_response_time_ms = (source.avg_response_time_ms + response_time_ms) / 2 |
|
|
else: |
|
|
source.avg_response_time_ms = response_time_ms |
|
|
|
|
|
source.last_checked_at = datetime.utcnow() |
|
|
self.session.commit() |
|
|
return True |
|
|
return False |
|
|
except Exception as e: |
|
|
self.session.rollback() |
|
|
print(f"Error recording request: {e}") |
|
|
return False |
|
|
|
|
|
def get_sources_for_collection(self, interval: str) -> List[DataSource]: |
|
|
"""دریافت منابع برای جمعآوری بر اساس بازه""" |
|
|
return self.session.query(DataSource).filter( |
|
|
DataSource.is_active == True, |
|
|
DataSource.collection_interval == interval, |
|
|
DataSource.status != "error" |
|
|
).order_by(DataSource.priority).all() |
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]: |
|
|
"""آمار منابع""" |
|
|
all_sources = self.get_all_sources() |
|
|
active_sources = [s for s in all_sources if s.is_active] |
|
|
|
|
|
total_requests = sum(s.total_requests for s in all_sources) |
|
|
successful_requests = sum(s.successful_requests for s in all_sources) |
|
|
|
|
|
return { |
|
|
"total_sources": len(all_sources), |
|
|
"active_sources": len(active_sources), |
|
|
"by_type": {}, |
|
|
"total_requests": total_requests, |
|
|
"successful_requests": successful_requests, |
|
|
"success_rate": (successful_requests / total_requests * 100) if total_requests > 0 else 0, |
|
|
"sources_with_errors": len([s for s in all_sources if s.status == "error"]) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_data_sources_from_registry(session, registry): |
|
|
""" |
|
|
Initialize data sources in database from registry |
|
|
پر کردن جدول منابع از رجیستری |
|
|
""" |
|
|
manager = DataSourceManager(session) |
|
|
|
|
|
for source_id, source_info in registry.to_dict().items(): |
|
|
existing = manager.get_source(source_id) |
|
|
if not existing: |
|
|
source_data = { |
|
|
"source_id": source_id, |
|
|
"name": source_info["name"], |
|
|
"source_type": source_info["source_type"], |
|
|
"description": source_info.get("description"), |
|
|
"base_url": source_info["url"], |
|
|
"requires_api_key": source_info.get("requires_api_key", False), |
|
|
"api_key_env_var": source_info.get("api_key_env"), |
|
|
"rate_limit_description": source_info.get("rate_limit"), |
|
|
"collection_interval": "30m", |
|
|
"supports_realtime": "realtime" in source_info.get("supported_timeframes", []), |
|
|
"supported_timeframes": source_info.get("supported_timeframes", []), |
|
|
"categories": source_info.get("categories", []), |
|
|
"features": source_info.get("features", []), |
|
|
"is_active": source_info.get("is_active", True), |
|
|
"priority": source_info.get("priority", 5), |
|
|
"is_verified": source_info.get("verified", False), |
|
|
"is_free_tier": source_info.get("free_tier", True) |
|
|
} |
|
|
manager.create_source(source_data) |
|
|
print(f"Created data source: {source_id}") |
|
|
else: |
|
|
print(f"Data source already exists: {source_id}") |
|
|
|
|
|
return manager |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
COLLECTION_INTERVALS = { |
|
|
|
|
|
"market": "15m", |
|
|
"historical": "30m", |
|
|
"onchain": "30m", |
|
|
"defi": "15m", |
|
|
|
|
|
|
|
|
"news": "15m", |
|
|
"social": "30m", |
|
|
|
|
|
|
|
|
"sentiment": "15m", |
|
|
|
|
|
|
|
|
"technical": "15m", |
|
|
|
|
|
|
|
|
"aggregated": "15m" |
|
|
} |
|
|
|
|
|
|
|
|
REALTIME_SOURCES = [ |
|
|
"binance_historical", |
|
|
"coingecko_historical", |
|
|
"coincap_realtime", |
|
|
"fear_greed_index" |
|
|
] |
|
|
|