Spaces:
Running
Running
| import json | |
| import yaml | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| _FILE_PATH = Path(__file__).parents[2] | |
| DATA_DIR = Path(__file__).parents[2] / "data" | |
| MEETINGS_FILE = DATA_DIR / "fed_processed_meetings.json" | |
| def load_meetings_data() -> List[Dict[str, Any]]: | |
| """Load processed meetings data from JSON file""" | |
| try: | |
| if MEETINGS_FILE.exists(): | |
| with open(MEETINGS_FILE, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return sorted(data, key=lambda x: x.get('date', ''), reverse=True) | |
| else: | |
| return [] | |
| except Exception as e: | |
| print(f"Error loading meetings data: {e}") | |
| return [] | |
| def create_meeting_list(meeting_data: list) -> list[dict]: | |
| return [ | |
| { | |
| "date": meeting.get("date", ""), | |
| "title": meeting.get("title", ""), | |
| "rate": meeting.get("rate", ""), | |
| "rate_decision": meeting.get("rate", ""), | |
| "summary": meeting.get("forward_guidance", ""), | |
| "forward_guidance": meeting.get("forward_guidance", ""), | |
| "action": meeting.get("action", ""), | |
| "magnitude": meeting.get("magnitude", ""), | |
| "key_economic_factors": meeting.get("key_economic_factors", []), | |
| "economic_outlook": meeting.get("economic_outlook", ""), | |
| "market_impact": meeting.get("market_impact", ""), | |
| "full_text": meeting.get("full_text", "")[:500] + "..." if meeting.get("full_text") else "", | |
| "url": meeting.get("url", "") | |
| } | |
| for meeting in meeting_data | |
| ] | |
| def load_processed_meetings(): | |
| """Load processed FOMC meetings from JSON file""" | |
| try: | |
| data = load_meetings_data() | |
| return create_meeting_list(meeting_data=data) | |
| except FileNotFoundError: | |
| print("Fed processed meetings file not found. Using fallback data.") | |
| return [ | |
| { | |
| "date": "2025-06-18", | |
| "title": "FOMC Meeting 2025-06-18", | |
| "rate_decision": "4.25%-4.50%", | |
| "summary": "No processed data available. Please run the data pipeline first.", | |
| "action": "Unknown", | |
| "magnitude": "Unknown", | |
| "key_economic_factors": [], | |
| "economic_outlook": "Data not available", | |
| "market_impact": "Data not available", | |
| "full_text": "No data available", | |
| "url": "" | |
| } | |
| ] | |
| except Exception as e: | |
| print(f"Error loading processed meetings: {e}") | |
| return [] | |
| def load_prompt_library(): | |
| """Load prompts from the YAML library""" | |
| try: | |
| prompt_file = _FILE_PATH / "configs" / "prompt_library.yaml" | |
| with open(prompt_file, 'r', encoding='utf-8') as f: | |
| return yaml.safe_load(f) | |
| except Exception as e: | |
| print(f"Error loading prompt library: {e}") | |
| return {} | |