|
|
|
|
|
""" |
|
|
Configuration Manager with Hot Reload |
|
|
====================================== |
|
|
مدیریت فایلهای پیکربندی با قابلیت reload خودکار در صورت تغییر |
|
|
""" |
|
|
|
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any, Optional, Callable |
|
|
from datetime import datetime |
|
|
from watchdog.observers import Observer |
|
|
from watchdog.events import FileSystemEventHandler, FileModifiedEvent |
|
|
import threading |
|
|
import time |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ConfigFileHandler(FileSystemEventHandler): |
|
|
"""Handler for config file changes.""" |
|
|
|
|
|
def __init__(self, config_manager: 'ConfigManager'): |
|
|
""" |
|
|
Initialize config file handler. |
|
|
|
|
|
Args: |
|
|
config_manager: Reference to ConfigManager instance |
|
|
""" |
|
|
self.config_manager = config_manager |
|
|
self.last_modified = {} |
|
|
|
|
|
def on_modified(self, event: FileModifiedEvent): |
|
|
"""Handle file modification event.""" |
|
|
if event.is_directory: |
|
|
return |
|
|
|
|
|
file_path = Path(event.src_path) |
|
|
|
|
|
|
|
|
if file_path in self.config_manager.config_files: |
|
|
|
|
|
current_time = time.time() |
|
|
last_time = self.last_modified.get(file_path, 0) |
|
|
|
|
|
|
|
|
if current_time - last_time < 2.0: |
|
|
return |
|
|
|
|
|
self.last_modified[file_path] = current_time |
|
|
|
|
|
logger.info(f"Config file modified: {file_path}") |
|
|
self.config_manager.reload_config(file_path) |
|
|
|
|
|
|
|
|
class ConfigManager: |
|
|
"""Manager for configuration files with hot reload support.""" |
|
|
|
|
|
def __init__(self, config_dir: str = "config"): |
|
|
""" |
|
|
Initialize configuration manager. |
|
|
|
|
|
Args: |
|
|
config_dir: Directory containing config files |
|
|
""" |
|
|
self.config_dir = Path(config_dir) |
|
|
self.configs: Dict[str, Dict[str, Any]] = {} |
|
|
self.config_files: Dict[Path, str] = {} |
|
|
self.observers: Dict[str, Observer] = {} |
|
|
self.reload_callbacks: Dict[str, list] = {} |
|
|
self.lock = threading.Lock() |
|
|
|
|
|
|
|
|
self._setup_config_files() |
|
|
|
|
|
|
|
|
self.load_all_configs() |
|
|
|
|
|
|
|
|
self.start_watching() |
|
|
|
|
|
def _setup_config_files(self): |
|
|
"""Setup config file paths.""" |
|
|
self.config_files = { |
|
|
self.config_dir / "scoring.config.json": "scoring", |
|
|
self.config_dir / "strategy.config.json": "strategy" |
|
|
} |
|
|
|
|
|
def load_config(self, config_name: str) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Load a configuration file. |
|
|
|
|
|
Args: |
|
|
config_name: Name of the config (e.g., "scoring", "strategy") |
|
|
|
|
|
Returns: |
|
|
Config dictionary or None if not found |
|
|
""" |
|
|
config_path = None |
|
|
for path, name in self.config_files.items(): |
|
|
if name == config_name: |
|
|
config_path = path |
|
|
break |
|
|
|
|
|
if not config_path or not config_path.exists(): |
|
|
logger.warning(f"Config file not found: {config_name}") |
|
|
return None |
|
|
|
|
|
try: |
|
|
with open(config_path, 'r', encoding='utf-8') as f: |
|
|
config = json.load(f) |
|
|
|
|
|
with self.lock: |
|
|
self.configs[config_name] = config |
|
|
|
|
|
logger.info(f"Loaded config: {config_name}") |
|
|
return config |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error loading config {config_name}: {e}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def load_all_configs(self): |
|
|
"""Load all configuration files.""" |
|
|
logger.info("Loading all configuration files...") |
|
|
|
|
|
for config_path, config_name in self.config_files.items(): |
|
|
self.load_config(config_name) |
|
|
|
|
|
logger.info(f"Loaded {len(self.configs)} configuration files") |
|
|
|
|
|
def reload_config(self, config_path: Path): |
|
|
""" |
|
|
Reload a specific configuration file. |
|
|
|
|
|
Args: |
|
|
config_path: Path to the config file |
|
|
""" |
|
|
if config_path not in self.config_files: |
|
|
return |
|
|
|
|
|
config_name = self.config_files[config_path] |
|
|
logger.info(f"Reloading config: {config_name}") |
|
|
|
|
|
old_config = self.configs.get(config_name) |
|
|
new_config = self.load_config(config_name) |
|
|
|
|
|
if new_config and new_config != old_config: |
|
|
logger.info(f"Config {config_name} reloaded successfully") |
|
|
|
|
|
|
|
|
if config_name in self.reload_callbacks: |
|
|
for callback in self.reload_callbacks[config_name]: |
|
|
try: |
|
|
callback(new_config, old_config) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in reload callback: {e}", exc_info=True) |
|
|
|
|
|
def get_config(self, config_name: str) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Get a configuration by name. |
|
|
|
|
|
Args: |
|
|
config_name: Name of the config |
|
|
|
|
|
Returns: |
|
|
Config dictionary or None |
|
|
""" |
|
|
with self.lock: |
|
|
return self.configs.get(config_name) |
|
|
|
|
|
def register_reload_callback( |
|
|
self, |
|
|
config_name: str, |
|
|
callback: Callable[[Dict[str, Any], Optional[Dict[str, Any]]], None] |
|
|
): |
|
|
""" |
|
|
Register a callback to be called when config is reloaded. |
|
|
|
|
|
Args: |
|
|
config_name: Name of the config |
|
|
callback: Callback function (new_config, old_config) -> None |
|
|
""" |
|
|
if config_name not in self.reload_callbacks: |
|
|
self.reload_callbacks[config_name] = [] |
|
|
|
|
|
self.reload_callbacks[config_name].append(callback) |
|
|
logger.info(f"Registered reload callback for {config_name}") |
|
|
|
|
|
def start_watching(self): |
|
|
"""Start watching config files for changes.""" |
|
|
if not self.config_dir.exists(): |
|
|
logger.warning(f"Config directory does not exist: {self.config_dir}") |
|
|
return |
|
|
|
|
|
event_handler = ConfigFileHandler(self) |
|
|
|
|
|
|
|
|
watched_dirs = set(path.parent for path in self.config_files.keys()) |
|
|
|
|
|
try: |
|
|
for watch_dir in watched_dirs: |
|
|
observer = Observer() |
|
|
observer.schedule(event_handler, str(watch_dir), recursive=False) |
|
|
observer.start() |
|
|
|
|
|
self.observers[str(watch_dir)] = observer |
|
|
logger.info(f"Started watching directory: {watch_dir}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to start file watcher (hot reload disabled): {e}") |
|
|
|
|
|
|
|
|
def stop_watching(self): |
|
|
"""Stop watching config files.""" |
|
|
for observer in self.observers.values(): |
|
|
observer.stop() |
|
|
observer.join() |
|
|
|
|
|
self.observers.clear() |
|
|
logger.info("Stopped watching config files") |
|
|
|
|
|
def manual_reload(self, config_name: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Manually reload configuration files. |
|
|
|
|
|
Args: |
|
|
config_name: Optional specific config to reload (reloads all if None) |
|
|
|
|
|
Returns: |
|
|
Dict with reload status |
|
|
""" |
|
|
if config_name: |
|
|
config_path = None |
|
|
for path, name in self.config_files.items(): |
|
|
if name == config_name: |
|
|
config_path = path |
|
|
break |
|
|
|
|
|
if config_path: |
|
|
self.reload_config(config_path) |
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Config {config_name} reloaded", |
|
|
"config": config_name |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"success": False, |
|
|
"message": f"Config {config_name} not found" |
|
|
} |
|
|
else: |
|
|
|
|
|
for config_name in self.config_files.values(): |
|
|
self.load_config(config_name) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": "All configs reloaded", |
|
|
"configs": list(self.config_files.values()) |
|
|
} |
|
|
|
|
|
def get_all_configs(self) -> Dict[str, Dict[str, Any]]: |
|
|
"""Get all loaded configurations.""" |
|
|
with self.lock: |
|
|
return self.configs.copy() |
|
|
|
|
|
|
|
|
|
|
|
_config_manager: Optional[ConfigManager] = None |
|
|
|
|
|
|
|
|
def get_config_manager(config_dir: str = "config") -> ConfigManager: |
|
|
""" |
|
|
Get or create global config manager instance. |
|
|
|
|
|
Args: |
|
|
config_dir: Config directory path |
|
|
|
|
|
Returns: |
|
|
ConfigManager instance |
|
|
""" |
|
|
global _config_manager |
|
|
|
|
|
if _config_manager is None: |
|
|
_config_manager = ConfigManager(config_dir) |
|
|
|
|
|
return _config_manager |
|
|
|
|
|
|