Spaces:
Sleeping
Sleeping
| # Sample CSV values: | |
| # apr,adjusted_apr,timestamp,portfolio_snapshot,calculation_metrics,roi,agent_id,is_dummy,address,agent_name,metric_type,first_investment_timestamp,agent_hash,volume,trading_type,selected_protocols | |
| # -0.03,1.75,2025-05-15 21:37:27.000000,"{'portfolio': {'portfolio_value': 29.34506065817397, 'allocations': [{'chain': 'optimism', 'type': 'velodrome', 'id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'assets': ['FRAX', 'alUSD'], 'apr': 11.9, 'details': 'Velodrome Pool', 'ratio': 100.0, 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}], 'portfolio_breakdown': [{'asset': 'FRAX', 'address': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'balance': 12.498312351563191, 'price': 0.999924, 'value_usd': 12.497362479824472, 'ratio': 0.425876}, {'asset': 'alUSD', 'address': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'balance': 17.023792285753334, 'price': 0.989656, 'value_usd': 16.8476981783495, 'ratio': 0.574124}], 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}, 'positons': [{'chain': 'optimism', 'pool_address': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'dex_type': 'velodrome', 'token0': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'token1': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'token0_symbol': 'FRAX', 'token1_symbol': 'alUSD', 'apr': 11.901789131732096, 'pool_id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'is_stable': True, 'is_cl_pool': False, 'amount0': 12549523370531409633, 'amount1': 16972223462662011900, 'timestamp': 1747319387, 'status': 'open', 'tx_hash': '0xb487bb4a45bcd7bb3b9e9e3fabe76bf6594828091598ffab69704754b4c8bea8'}]}","{'initial_value': 29.353178464538146, 'final_value': 29.34506065817397, 'f_i_ratio': -0.0002765562977782299, 'last_investment_timestamp': 1747319387, 'time_ratio': 5380.753851502806}",-0.0002765562977782299,86,False,0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758,nusus-tayar25,APR,,,,, | |
| # Parse the optimus_apr_values.csv file | |
| # Iterate on the rows: For each row: | |
| # Parse address, final_value | |
| # Compute initial_value using the parsed address similar to an Optimus function | |
| # Compute the APR and ROI similar to an Optimus function | |
| # Write the row with initial_value, APR, and ROI to a new CSV file | |
| from datetime import datetime | |
| from decimal import Decimal | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from typing import Dict, Optional, Tuple, List | |
| from pandas import DataFrame | |
| import requests | |
| from web3 import Web3 | |
| ETHERSCAN_API_KEY = "" | |
| EXCLUDED_ADDRESSES = { # Testnet agents of Gaurav, Divya, and Priyanshu | |
| "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7DC8", # agent_id 84 | |
| "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7CB5", # agent_id 86 | |
| "0x3B3AbC1604fAd139F841Da5c3Cad73a72621fee4", # agent_id 102 | |
| } | |
| COINGECKO_PRICE_API_URL = "https://api.coingecko.com/api/v3/coins/{coin_id}/history?date={date}}" | |
| WHITELISTED_TOKENS = { | |
| # Optimism tokens - stablecoins | |
| "0x0b2c639c533813f4aa9d7837caf62653d097ff85": ("USDC", 6), | |
| "0x01bff41798a0bcf287b996046ca68b395dbc1071": ("USDT0", 6), | |
| "0x94b008aa00579c1307b0ef2c499ad98a8ce58e58": ("USDT", 6), | |
| "0x7f5c764cbc14f9669b88837ca1490cca17c31607": ("USDC.e", 6), | |
| "0x8ae125e8653821e851f12a49f7765db9a9ce7384": ("DOLA", 18), | |
| "0xc40f949f8a4e094d1b49a23ea9241d289b7b2819": ("LUSD", 18), | |
| "0xda10009cbd5d07dd0cecc66161fc93d7c9000da1": ("DAI", 18), | |
| "0x087c440f251ff6cfe62b86dde1be558b95b4bb9b": ("BOLD", 18), | |
| "0x2e3d870790dc77a83dd1d18184acc7439a53f475": ("FRAX", 18), | |
| "0x2218a117083f5b482b0bb821d27056ba9c04b1d3": ("sDAI", 18), | |
| "0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189": ("oUSDT", 6), | |
| "0x4f604735c1cf31399c6e711d5962b2b3e0225ad3": ("USDGLO", 18), | |
| "0xFC2E6e6BCbd49ccf3A5f029c79984372DcBFE527": ("OLAS", 18) | |
| } | |
| COIN_ID_MAPPING = { | |
| "usdc": "usd-coin", | |
| "alusd": "alchemix-usd", | |
| "usdt0": "usdt0", | |
| "usdt": "bridged-usdt", | |
| "usdc.e": "bridged-usd-coin-optimism", | |
| "usx": "token-dforce-usd", | |
| "dola": "dola-usd", | |
| "lusd": "liquity-usd", | |
| "dai": "makerdao-optimism-bridged-dai-optimism", | |
| "bold": "liquity-bold", | |
| "frax": "frax", | |
| "sdai": "savings-dai", | |
| "usd+": "overnight-fi-usd-optimism", | |
| "ousdt": "openusdt", | |
| "usdglo": "glo-dollar", | |
| "olas": "autonolas" | |
| } | |
| ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| w3 = Web3(Web3.HTTPProvider("https://rpc-gate.autonolas.tech/optimism-rpc/")) | |
| def get_coin_id_from_symbol(symbol: str, chain: str) -> Optional[str]: | |
| """Map token symbol to CoinGecko ID.""" | |
| if chain == "optimism": | |
| coin_id_map = { | |
| "USDC": "usd-coin", | |
| "ALUSD": "alchemix-usd", | |
| "USDT0": "usdt0", | |
| "USDT": "bridged-usdt", | |
| "MSUSD": None, | |
| "USDC.E": "bridged-usd-coin-optimism", | |
| "USX": "token-dforce-usd", | |
| "DOLA": "dola-usd", | |
| "LUSD": "liquity-usd", | |
| "DAI": "makerdao-optimism-bridged-dai-optimism", | |
| "BOLD": "liquity-bold", | |
| "FRAX": "frax", | |
| "SDAI": "savings-dai", | |
| "USD+": "overnight-fi-usd-optimism", | |
| "OUSDT": "openusdt", | |
| "USDGLO": "glo-dollar", | |
| "ETH": "ethereum", | |
| "WETH": "ethereum", | |
| "WBTC": "wrapped-bitcoin", | |
| } | |
| return coin_id_map.get(symbol.upper()) | |
| return None | |
| def load_cache(name: str) -> Dict: | |
| """Load price cache from JSON file.""" | |
| cache_file = f"{name}_cache.json" | |
| if os.path.exists(cache_file): | |
| try: | |
| with open(cache_file, 'r') as f: | |
| return json.load(f) | |
| except json.JSONDecodeError: | |
| logger.warning("Cache file corrupted, creating new cache") | |
| return {} | |
| return {} | |
| def save_cache(name: str, cache: Dict): | |
| """Save price cache to JSON file.""" | |
| cache_file = f"{name}_cache.json" | |
| with open(cache_file, 'w') as f: | |
| json.dump(cache, f, indent=2) | |
| def get_cached_price(date_str: str, token_symbol: str) -> Optional[float]: | |
| """Get price from cache if available.""" | |
| cache = load_cache(name="price") | |
| return cache.get(date_str, {}).get(token_symbol) | |
| def update_price_cache(date_str: str, token_symbol: str, price: float): | |
| """Update price cache with new value.""" | |
| cache = load_cache(name="price") | |
| if date_str not in cache: | |
| cache[date_str] = {} | |
| cache[date_str][token_symbol] = price | |
| save_cache(name="price", cache=cache) | |
| def get_cached_request(cache_key: str) -> Optional[Dict]: | |
| """Get cached request response if available.""" | |
| cache = load_cache(name="request") | |
| return cache.get(cache_key) | |
| def update_request_cache(cache_key: str, response: Dict): | |
| """Update request cache with new response.""" | |
| cache = load_cache(name="request") | |
| cache[cache_key] = response | |
| save_cache(name="request", cache=cache) | |
| def fetch_historical_eth_price(date_str: str) -> float: | |
| """Fetch historical ETH price from CoinGecko with caching.""" | |
| # Check cache first | |
| cached_price = get_cached_price(date_str, "ETH") | |
| if cached_price is not None: | |
| return cached_price | |
| try: | |
| url = "https://api.coingecko.com/api/v3/coins/ethereum/history" | |
| params = {"date": date_str, "localization": "false"} | |
| # Add delay to respect rate limits | |
| time.sleep(1.2) | |
| response = requests.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| if "market_data" in data and "current_price" in data["market_data"]: | |
| price = data["market_data"]["current_price"]["usd"] | |
| # Update cache | |
| update_price_cache(date_str, "ETH", price) | |
| return price | |
| return 0.0 | |
| except Exception as e: | |
| print(f"Error fetching ETH price for {date_str}: {str(e)}") | |
| return 0.0 | |
| def fetch_historical_token_price(coin_id: str, date_str: str, token_symbol: str) -> float: | |
| """Fetch historical token price from CoinGecko with caching.""" | |
| # Check cache first | |
| cached_price = get_cached_price(date_str, token_symbol) | |
| if cached_price is not None: | |
| return cached_price | |
| try: | |
| success, data = request_with_retries( | |
| endpoint=f"https://api.coingecko.com/api/v3/coins/{coin_id}/history", | |
| params={"date": date_str, "localization": "false"}, | |
| ) | |
| if not success: | |
| logger.error(f"Failed to fetch historical price for {coin_id} on {date_str}") | |
| return 0.0 | |
| # Add delay to respect rate limits | |
| time.sleep(1.2) | |
| if "market_data" in data and "current_price" in data["market_data"]: | |
| price = data["market_data"]["current_price"]["usd"] | |
| # Update cache | |
| update_price_cache(date_str, token_symbol, price) | |
| return price | |
| return 0.0 | |
| except Exception as e: | |
| print(f"Error fetching price for {coin_id} on {date_str}: {str(e)}") | |
| return 0.0 | |
| def get_block_at_timestamp( | |
| timestamp: int, | |
| chain: str = "optimism" | |
| ) -> Optional[int]: | |
| success, res = request_with_retries( | |
| endpoint=f"https://api-optimistic.etherscan.io/api?module=block&action=getblocknobytime×tamp={timestamp}&closest=before&apikey={ETHERSCAN_API_KEY}", | |
| ) | |
| if success and res.get("status") == "1" and "result" in res: | |
| return int(res.get("result")) | |
| else: | |
| logger.error(f"Failed to fetch block at timestamp {timestamp} for {chain}: {res.get('message', 'Unknown error')}") | |
| return None | |
| def fetch_eth_balance(address: str, timestamp: float) -> float: | |
| key = "eth_balance" | |
| cache = load_cache(name=key) | |
| if f"{address}_{timestamp}" in cache: | |
| return cache[f"{address}_{timestamp}"] / (10 ** 18) | |
| balance = w3.eth.get_balance( | |
| account=Web3.to_checksum_address(address), | |
| block_identifier=get_block_at_timestamp(int(timestamp)) | |
| ) | |
| cache[f"{address}_{timestamp}"] = balance | |
| save_cache(name=key, cache=cache) | |
| return balance / (10 ** 18) | |
| def fetch_token_balance( | |
| address: str, | |
| token_address: str, | |
| timestamp: int, | |
| decimals: int = 18 | |
| ) -> Optional[float]: | |
| contract = w3.eth.contract( | |
| address=Web3.to_checksum_address(token_address), | |
| abi=[ | |
| { | |
| "constant": True, | |
| "inputs": [{"name": "_owner", "type": "address"}], | |
| "name": "balanceOf", | |
| "outputs": [{"name": "", "type": "uint256"}], | |
| "payable": False, | |
| "stateMutability": "view", | |
| "type": "function", | |
| } | |
| ] | |
| ) | |
| try: | |
| cache_key = f"token_balance_{address}_{token_address}_{timestamp}" | |
| cache = load_cache(name="token_balance") | |
| if cache_key in cache: | |
| return cache[cache_key] / (10 ** decimals) | |
| balance = contract.functions.balanceOf(address).call(block_identifier=get_block_at_timestamp(int(timestamp))) | |
| cache[cache_key] = balance | |
| save_cache(name="token_balance", cache=cache) | |
| return balance / (10 ** decimals) if balance else 0.0 | |
| except Exception as e: | |
| logger.error(f"Error fetching token balance for {address} at {timestamp}: {e}") | |
| return None | |
| def get_datetime_from_timestamp(timestamp: str) -> Optional[datetime]: | |
| """Convert timestamp string to datetime object.""" | |
| try: | |
| return datetime.fromisoformat(timestamp.replace("Z", "+00:00")) | |
| except (ValueError, TypeError): | |
| logger.warning(f"Invalid timestamp format: {timestamp}") | |
| return None | |
| def request_with_retries( | |
| endpoint: str, | |
| params: Dict = None, | |
| headers: Dict = None, | |
| method: str = "GET", | |
| body: Dict = None, | |
| rate_limited_code: int = 429, | |
| retry_wait: int = 5, | |
| max_retries: int = 3 | |
| ) -> Tuple[bool, Dict]: | |
| for attempt in range(max_retries): | |
| try: | |
| if method.upper() == "POST": | |
| cache_key = f"POST_{endpoint}_{str(body or {})}" | |
| cached_response = get_cached_request(cache_key) | |
| if cached_response is not None: | |
| return len(cached_response) > 0, cached_response | |
| response = requests.post(endpoint, headers=headers, json=body) | |
| if response.ok: | |
| update_request_cache(cache_key, response.json()) | |
| else: | |
| # Check cache first for GET requests | |
| cache_key = f"{endpoint}_{str(params or {})}" | |
| cached_response = get_cached_request(cache_key) | |
| if cached_response is not None: | |
| return len(cached_response) > 0, cached_response | |
| response = requests.get(endpoint, headers=headers, params=params or {}) | |
| # Cache successful responses | |
| if response.status_code == 200: | |
| update_request_cache(cache_key, response.json()) | |
| elif response.status_code == 404: | |
| update_request_cache(cache_key, {}) | |
| if response.status_code == rate_limited_code: | |
| logger.warning(f"Rate limited. Waiting {retry_wait} seconds...") | |
| time.sleep(retry_wait) | |
| continue | |
| if response.status_code != 200: | |
| logger.error(f"Request failed with status {response.status_code}") | |
| return False, {} | |
| return True, response.json() | |
| except Exception as e: | |
| logger.error(f"Request failed: {str(e)}") | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_wait) | |
| continue | |
| return False, {} | |
| return False, {} | |
| def should_include_transfer_optimism( | |
| from_address: str | |
| ) -> bool: | |
| """Determine if an Optimism transfer should be included based on from address type.""" | |
| if not from_address: | |
| return False | |
| # Exclude zero address | |
| if from_address.lower() in [ | |
| "0x0000000000000000000000000000000000000000", | |
| "0x0", | |
| "", | |
| ]: | |
| return False | |
| try: | |
| # Use Optimism RPC to check if address is a contract | |
| payload = { | |
| "jsonrpc": "2.0", | |
| "method": "eth_getCode", | |
| "params": [from_address, "latest"], | |
| "id": 1, | |
| } | |
| success, result = request_with_retries( | |
| endpoint="https://mainnet.optimism.io", | |
| method="POST", | |
| headers={"Content-Type": "application/json"}, | |
| body=payload, | |
| rate_limited_code=429, | |
| retry_wait=5, | |
| ) | |
| if not success: | |
| logger.error("Failed to check contract code") | |
| return False | |
| code = result.get("result", "0x") | |
| # If code is '0x', it's an EOA | |
| if code == "0x": | |
| return True | |
| # If it has code, check if it's a GnosisSafe | |
| safe_check_url = f"https://safe-transaction-optimism.safe.global/api/v1/safes/{from_address}/" | |
| success, _ = request_with_retries( | |
| endpoint=safe_check_url, | |
| headers={"Accept": "application/json"}, | |
| rate_limited_code=429, | |
| retry_wait=5, | |
| ) | |
| if success: | |
| return True | |
| logger.info( | |
| f"Excluding transfer from contract: {from_address}" | |
| ) | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error checking address {from_address}: {e}") | |
| return False | |
| def fetch_optimism_incoming_transfers( | |
| address: str, | |
| last_timestamp: int | |
| ) -> Dict: | |
| """ | |
| Fetch Optimism transfers for a given address with improved error handling and rate limiting. | |
| """ | |
| base_url = "https://safe-transaction-optimism.safe.global/api/v1" | |
| all_transfers_by_date = {} | |
| try: | |
| logger.info(f"Fetching Optimism transfers for address {address}...") | |
| # Fetch incoming transfers | |
| transfers_url = f"{base_url}/safes/{address}/incoming-transfers/" | |
| processed_count = 0 | |
| page_count = 0 | |
| max_pages = 10 # Limit to prevent infinite loops | |
| while page_count < max_pages: | |
| page_count += 1 | |
| logger.info(f"Fetching page {page_count} for address {address}") | |
| success, response_json = request_with_retries( | |
| endpoint=transfers_url, | |
| headers={"Accept": "application/json"}, | |
| rate_limited_code=429, | |
| retry_wait=5 | |
| ) | |
| if not success: | |
| logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") | |
| break | |
| transfers = response_json.get("results", []) | |
| if not transfers: | |
| logger.info(f"No more transfers found for address {address} on page {page_count}") | |
| break | |
| print("incoming transfers",response_json) | |
| for transfer in transfers: | |
| # Parse timestamp | |
| timestamp = transfer.get("executionDate") | |
| if not timestamp: | |
| continue | |
| tx_datetime = get_datetime_from_timestamp(timestamp) | |
| tx_date = tx_datetime.strftime("%Y-%m-%d") if tx_datetime else None | |
| if not tx_date: | |
| continue | |
| if tx_datetime.timestamp() > last_timestamp: | |
| continue | |
| # Process the transfer | |
| from_address = transfer.get("from", address) | |
| transfer_type = transfer.get("type", "") | |
| if from_address.lower() == address.lower(): | |
| continue | |
| # Initialize date in transfers dict if not exists | |
| if tx_date not in all_transfers_by_date: | |
| all_transfers_by_date[tx_date] = [] | |
| should_include = should_include_transfer_optimism(from_address) | |
| if not should_include: | |
| continue | |
| # Process different transfer types | |
| if transfer_type == "ERC20_TRANSFER": | |
| # Token transfer | |
| token_info = transfer.get("tokenInfo", {}) | |
| token_address = transfer.get("tokenAddress", "") | |
| if not token_info: | |
| if not token_address: | |
| continue | |
| # You might want to add token decimal and symbol fetching here | |
| symbol = "Unknown" | |
| decimals = 18 | |
| else: | |
| symbol = token_info.get("symbol", "Unknown") | |
| decimals = int(token_info.get("decimals", 18) or 18) | |
| if symbol.lower() not in ["usdc", "eth"]: | |
| continue | |
| value_raw = int(transfer.get("value", "0") or "0") | |
| amount = value_raw / (10**decimals) | |
| transfer_data = { | |
| "from_address": from_address, | |
| "amount": amount, | |
| "token_address": token_address, | |
| "symbol": symbol, | |
| "timestamp": timestamp, | |
| "tx_hash": transfer.get("transactionHash", ""), | |
| "type": "token" | |
| } | |
| elif transfer_type == "ETHER_TRANSFER": | |
| # ETH transfer | |
| try: | |
| value_wei = int(transfer.get("value", "0") or "0") | |
| amount_eth = value_wei / 10**18 | |
| if amount_eth <= 0: | |
| continue | |
| except (ValueError, TypeError): | |
| logger.warning(f"Skipping transfer with invalid value: {transfer.get('value')}") | |
| continue | |
| transfer_data = { | |
| "from_address": from_address, | |
| "amount": amount_eth, | |
| "token_address": "", | |
| "symbol": "ETH", | |
| "timestamp": timestamp, | |
| "tx_hash": transfer.get("transactionHash", ""), | |
| "type": "eth" | |
| } | |
| else: | |
| # Skip other transfer types | |
| continue | |
| all_transfers_by_date[tx_date].append(transfer_data) | |
| processed_count += 1 | |
| # Show progress | |
| if processed_count % 50 == 0: | |
| logger.info(f"Processed {processed_count} Optimism transfers for address {address}...") | |
| # Check for next page | |
| cursor = response_json.get("next") | |
| if not cursor: | |
| logger.info(f"No more pages for address {address}") | |
| break | |
| else: | |
| transfers_url = cursor # Update URL for next page | |
| logger.info(f"Completed Optimism transfers for address {address}: {processed_count} found") | |
| return all_transfers_by_date | |
| except Exception as e: | |
| logger.error(f"Error fetching Optimism transfers for address {address}: {e}") | |
| return {} | |
| def fetch_optimism_outgoing_transfers( | |
| address: str, | |
| final_timestamp: int, | |
| from_address: str, | |
| ) -> Dict: | |
| """Fetch all outgoing transfers from the safe address on Optimism until a specific date. | |
| Args: | |
| address: The safe address to fetch transfers for | |
| final_timestamp: The timestamp until which to fetch transfers | |
| from_address: The master address to check for reversions | |
| Returns: | |
| Dict: Dictionary of transfers organized by date | |
| """ | |
| all_transfers = {} | |
| if not address: | |
| logger.warning( | |
| "No address provided for fetching Optimism outgoing transfers" | |
| ) | |
| return all_transfers | |
| try: | |
| # Use SafeGlobal API for Optimism transfers | |
| base_url = "https://safe-transaction-optimism.safe.global/api/v1" | |
| transfers_url = f"{base_url}/safes/{address}/transfers/" | |
| processed_count = 0 | |
| page_count = 0 | |
| max_pages = 50 # Increased limit to handle more transactions | |
| while page_count < max_pages: | |
| page_count += 1 | |
| logger.info(f"Fetching outgoing transfers page {page_count} for address {address}") | |
| success, response_json = request_with_retries( | |
| endpoint=transfers_url, | |
| headers={"Accept": "application/json"}, | |
| rate_limited_code=429, | |
| retry_wait=5, | |
| ) | |
| if not success: | |
| logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") | |
| break | |
| transfers = response_json.get("results", []) | |
| if not transfers: | |
| logger.info(f"No more transfers found for address {address} on page {page_count}") | |
| break | |
| print("outgoing_transfers", response_json) | |
| for transfer in transfers: | |
| # Parse timestamp | |
| timestamp = transfer.get("executionDate") | |
| if not timestamp: | |
| continue | |
| # Handle ISO format timestamp | |
| try: | |
| tx_datetime = datetime.fromisoformat( | |
| timestamp.replace("Z", "+00:00") | |
| ) | |
| tx_date = tx_datetime.strftime("%Y-%m-%d") | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"Invalid timestamp format: {timestamp}" | |
| ) | |
| continue | |
| if tx_datetime.timestamp() > final_timestamp: | |
| continue | |
| # Only process outgoing transfers (where from address is equal to our safe address) | |
| # OR transfers going back to the master address (reversions) | |
| if transfer.get("from").lower() == address.lower(): | |
| transfer_type = transfer.get("type", "") | |
| if transfer_type == "ETHER_TRANSFER": | |
| try: | |
| value_wei = int(transfer.get("value", "0") or "0") | |
| amount_eth = value_wei / 10**18 | |
| if amount_eth <= 0: | |
| continue | |
| except (ValueError, TypeError): | |
| continue | |
| transfer_data = { | |
| "from_address": address, | |
| "to_address": transfer.get("to"), | |
| "amount": amount_eth, | |
| "token_address": ZERO_ADDRESS, | |
| "symbol": "ETH", | |
| "timestamp": timestamp, | |
| "tx_hash": transfer.get("transactionHash", ""), | |
| "type": "eth", | |
| } | |
| if tx_date not in all_transfers: | |
| all_transfers[tx_date] = [] | |
| all_transfers[tx_date].append(transfer_data) | |
| processed_count += 1 | |
| # Also process ERC20 transfers for completeness | |
| token_info = transfer.get("tokenInfo", {}) | |
| token_address = transfer.get("tokenAddress", "") | |
| if token_info: | |
| symbol = token_info.get("symbol", "Unknown") | |
| decimals = int(token_info.get("decimals", 18) or 18) | |
| else: | |
| symbol = "Unknown" | |
| decimals = 18 | |
| try: | |
| value_raw = int(transfer.get("value", "0") or "0") | |
| amount = value_raw / (10**decimals) | |
| if amount <= 0: | |
| continue | |
| except (ValueError, TypeError): | |
| continue | |
| transfer_data = { | |
| "from_address": transfer.get("from"), | |
| "to_address": transfer.get("to"), | |
| "amount": amount, | |
| "token_address": token_address, | |
| "symbol": symbol, | |
| "timestamp": timestamp, | |
| "tx_hash": transfer.get("transactionHash", ""), | |
| "type": "token", | |
| } | |
| if tx_date not in all_transfers: | |
| all_transfers[tx_date] = [] | |
| all_transfers[tx_date].append(transfer_data) | |
| processed_count += 1 | |
| # Show progress | |
| if processed_count % 50 == 0: | |
| logger.info(f"Processed {processed_count} outgoing transfers for address {address}...") | |
| # Check for next page | |
| cursor = response_json.get("next") | |
| if not cursor: | |
| logger.info(f"No more pages for address {address}") | |
| break | |
| else: | |
| transfers_url = cursor # Update URL for next page | |
| logger.info(f"Completed Optimism outgoing transfers: {processed_count} found") | |
| return all_transfers | |
| except Exception as e: | |
| logger.error(f"Error fetching Optimism outgoing transfers: {e}") | |
| return {} | |
| def track_and_calculate_reversion_value( | |
| safe_address: str, | |
| chain: str, | |
| incoming_transfers: Dict, | |
| outgoing_transfers: Dict, | |
| ) -> float: | |
| """Track ETH transfers to safe address and handle reversion logic.""" | |
| try: | |
| if not incoming_transfers: | |
| logger.warning(f"No transfers found for {chain} chain") | |
| return 0.0 | |
| # Track ETH transfers | |
| eth_transfers = [] | |
| initial_funding = None | |
| master_safe_address = None | |
| reversion_transfers = [] | |
| reversion_value = 0.0 | |
| # Sort transfers by timestamp | |
| sorted_incoming_transfers = [] | |
| for _, transfers in incoming_transfers.items(): | |
| for transfer in transfers: | |
| if isinstance(transfer, dict) and "timestamp" in transfer: | |
| sorted_incoming_transfers.append(transfer) | |
| sorted_incoming_transfers.sort(key=lambda x: x["timestamp"]) | |
| sorted_outgoing_transfers = [] | |
| for _, transfers in outgoing_transfers.items(): | |
| for transfer in transfers: | |
| if isinstance(transfer, dict) and "timestamp" in transfer: | |
| sorted_outgoing_transfers.append(transfer) | |
| sorted_outgoing_transfers.sort(key=lambda x: x["timestamp"]) | |
| # Process transfers | |
| for transfer in sorted_incoming_transfers: | |
| # Check if it's an ETH transfer | |
| if transfer.get("symbol") == "ETH": | |
| # If this is the first transfer, store it as initial funding | |
| if not initial_funding: | |
| initial_funding = { | |
| "amount": transfer.get("amount", 0), | |
| "from_address": transfer.get("from_address"), | |
| "timestamp": transfer.get("timestamp"), | |
| } | |
| if transfer.get("from_address"): | |
| master_safe_address = transfer.get("from_address").lower() | |
| eth_transfers.append(transfer) | |
| # If it's from the same address as initial funding | |
| elif ( | |
| transfer.get("from_address", "").lower() == master_safe_address | |
| ): | |
| eth_transfers.append(transfer) | |
| for transfer in sorted_outgoing_transfers: | |
| if transfer.get("symbol") == "ETH": | |
| if ( | |
| transfer.get("to_address", "").lower() == master_safe_address | |
| and transfer.get("from_address", "").lower() | |
| == safe_address.lower() | |
| ): | |
| reversion_transfers.append(transfer) | |
| reversion_value = calculate_total_reversion_value( | |
| eth_transfers, reversion_transfers | |
| ) | |
| return reversion_value | |
| except Exception as e: | |
| logger.error(f"Error tracking ETH transfers: {str(e)}") | |
| return 0.0 | |
| def calculate_total_reversion_value( | |
| eth_transfers: List[Dict], reversion_transfers: List[Dict] | |
| ) -> float: | |
| """Calculate the total reversion value from the reversion transfers.""" | |
| reversion_amount = 0.0 | |
| reversion_date = None | |
| reversion_value = 0.0 | |
| last_transfer = eth_transfers[-1] | |
| try: | |
| # Handle ISO format timestamp | |
| timestamp = last_transfer.get("timestamp", "") | |
| if timestamp.endswith("Z"): | |
| # Convert ISO format to datetime | |
| tx_datetime = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) | |
| reversion_date = tx_datetime.strftime("%d-%m-%Y") | |
| else: | |
| # Try parsing as Unix timestamp | |
| reversion_date = datetime.fromtimestamp(int(timestamp)).strftime( | |
| "%d-%m-%Y" | |
| ) | |
| except (ValueError, TypeError) as e: | |
| logger.warning(f"Error parsing timestamp: {e}") | |
| # Use current date as fallback | |
| transfer = reversion_transfers[0] | |
| reversion_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") | |
| for index, transfer in enumerate(reversion_transfers): | |
| transfer_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") | |
| if index == 0: | |
| eth_price = fetch_historical_eth_price(reversion_date) | |
| else: | |
| eth_price = fetch_historical_eth_price(transfer_date) | |
| if eth_price: | |
| reversion_amount = transfer.get("amount", 0) | |
| reversion_value += reversion_amount * eth_price | |
| return reversion_value | |
| def calculate_initial_investment_value_from_funding_events( | |
| chain: str, | |
| address: str, | |
| incoming_transfers: Dict, | |
| outgoing_transfers: Dict, | |
| ) -> float: | |
| total_investment = 0.0 | |
| if not incoming_transfers: | |
| print(f"No transfers found for {chain} chain") | |
| return 0.0 | |
| if chain == "optimism": | |
| print("Using Optimism-specific transfer processing") | |
| for date, date_transfers in incoming_transfers.items(): | |
| for transfer in date_transfers: | |
| try: | |
| amount = transfer.get("amount", 0) | |
| token_symbol = transfer.get("symbol", "").upper() | |
| if amount <= 0: | |
| continue | |
| # Get historical price for the transfer date | |
| date_str = datetime.strptime(date, "%Y-%m-%d").strftime("%d-%m-%Y") | |
| if token_symbol == "ETH": # nosec B105 | |
| price = fetch_historical_eth_price(date_str) | |
| else: | |
| coingecko_id = get_coin_id_from_symbol(token_symbol, chain) | |
| if coingecko_id: | |
| price = fetch_historical_token_price( | |
| coingecko_id, date_str, token_symbol | |
| ) | |
| else: | |
| price = None | |
| transfer_value = amount * price | |
| total_investment += transfer_value | |
| print(f"Processed transfer on {date}: {amount} {token_symbol} @ ${price} = ${transfer_value}") | |
| except Exception as e: | |
| print(f"Error processing transfer: {str(e)}") | |
| continue | |
| else: | |
| print(f"Unsupported chain: {chain}, skipping") | |
| return 0.0 | |
| reversion_value = track_and_calculate_reversion_value( | |
| safe_address=address, | |
| chain=chain, | |
| incoming_transfers=incoming_transfers, | |
| outgoing_transfers=outgoing_transfers, | |
| ) | |
| logger.info(f"Total investment: {total_investment}") | |
| logger.info(f"Reversion value: {reversion_value}") | |
| total_investment = total_investment - reversion_value | |
| logger.info(f"Total investment after reversion: {total_investment}") | |
| print(f"Total initial investment from {chain} chain: ${total_investment}") | |
| return total_investment if total_investment > 0 else 0.0 | |
| def calculate_initial_value_from_address_and_timestamp( | |
| address: str, | |
| final_timestamp: int, | |
| ) -> Tuple[float, int]: | |
| # First fetch the transfers | |
| incoming_transfers = fetch_optimism_incoming_transfers(address, final_timestamp) | |
| logger.info("Fetched incoming transfers") | |
| # Find the first transfer to get the from_address | |
| from_address = None | |
| for date_transfers in incoming_transfers.values(): | |
| if date_transfers: # Check if the list is not empty | |
| from_address = date_transfers[0].get('from_address') | |
| break | |
| if from_address is None: | |
| logger.warning("No from_address found in incoming transfers") | |
| from_address = "" | |
| outgoing_transfers = fetch_optimism_outgoing_transfers(address, final_timestamp, from_address) | |
| logger.info(f"Fetched outgoing transfers {outgoing_transfers}") | |
| initial_timestamp = final_timestamp | |
| for _transfers in incoming_transfers.values(): | |
| for _transfer in _transfers: | |
| if "timestamp" not in _transfer: | |
| continue | |
| transfer_timestamp = datetime.fromisoformat(_transfer["timestamp"].replace('Z', '+00:00')).timestamp() | |
| if transfer_timestamp < initial_timestamp: | |
| initial_timestamp = int(transfer_timestamp) | |
| # Then calculate initial investment | |
| initial_investment = calculate_initial_investment_value_from_funding_events( | |
| chain="optimism", | |
| address=address, | |
| incoming_transfers=incoming_transfers, | |
| outgoing_transfers=outgoing_transfers, | |
| ) | |
| return initial_investment, int(initial_timestamp) | |
| def calculate_final_value_from_address_and_timestamp( | |
| address: str, | |
| timestamp: int, | |
| ) -> float: | |
| """ | |
| Calculate the final portfolio value at a specific timestamp by fetching | |
| ETH and token balances and multiplying by historical prices. | |
| """ | |
| final_value = 0.0 | |
| try: | |
| # Get ETH balance and price | |
| eth_balance = fetch_eth_balance(address, timestamp) | |
| if eth_balance > 0: | |
| eth_price = fetch_historical_eth_price( | |
| datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y") | |
| ) | |
| if eth_price and eth_price > 0: | |
| eth_value = eth_balance * eth_price | |
| final_value += eth_value | |
| logger.info(f"ETH value: {eth_balance:.6f} ETH @ ${eth_price:.2f} = ${eth_value:.2f}") | |
| else: | |
| logger.warning(f"Could not fetch ETH price for timestamp {timestamp}") | |
| # Get token balances and prices | |
| for token_address, (symbol, decimals) in WHITELISTED_TOKENS.items(): | |
| try: | |
| token_balance = fetch_token_balance( | |
| address=address, | |
| token_address=token_address, | |
| decimals=decimals, | |
| timestamp=timestamp, | |
| ) | |
| if token_balance is not None and token_balance > 0: | |
| token_price = fetch_historical_token_price( | |
| coin_id=COIN_ID_MAPPING.get(symbol.lower(), symbol.lower()), | |
| date_str=datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y"), | |
| token_symbol=symbol | |
| ) | |
| if token_price is not None and token_price > 0: | |
| token_value = token_balance * token_price | |
| final_value += token_value | |
| logger.info(f"{symbol} value: {token_balance:.6f} @ ${token_price:.6f} = ${token_value:.2f}") | |
| else: | |
| logger.warning(f"Could not fetch price for {symbol} at timestamp {timestamp}") | |
| except Exception as e: | |
| logger.error(f"Error processing token {symbol} ({token_address}): {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Error calculating final value for address {address}: {e}") | |
| return 0.0 | |
| logger.info(f"Total final value for {address}: ${final_value:.2f}") | |
| return final_value | |
| def _calculate_adjusted_apr( | |
| apr: float, | |
| initial_timestamp: int, | |
| final_timestamp: int | |
| ) -> float: | |
| if apr is None or apr == 0: | |
| return 0.0 | |
| intial_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(initial_timestamp).strftime("%d-%m-%Y")) | |
| final_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(final_timestamp).strftime("%d-%m-%Y")) | |
| if ( | |
| final_eth_price is not None | |
| and intial_eth_price is not None | |
| ): | |
| adjustment_factor = Decimal("1") - ( | |
| Decimal(str(final_eth_price)) / Decimal(str(intial_eth_price)) | |
| ) | |
| adjusted_apr = round( | |
| float(apr) | |
| + float(adjustment_factor * Decimal("100")), | |
| 2, | |
| ) | |
| return adjusted_apr | |
| else: | |
| logger.warning( | |
| f"Could not fetch ETH prices for timestamps {initial_timestamp} and {final_timestamp}. Returning original APR: {apr}" | |
| ) | |
| return apr | |
| def calculate_apr_and_roi( | |
| initial_value: float, | |
| final_value: float, | |
| initial_timestamp: int, | |
| final_timestamp: int | |
| ) -> Tuple[float, float, float]: | |
| if final_value <= 0: | |
| logger.warning("Final value is non-positive, returning 0.0 for APR and ROI.") | |
| return 0.0, 0.0, 0.0 | |
| # Calculate ROI (Return on Investment) | |
| roi = ((final_value / initial_value) - 1) * 100 | |
| # Calculate hours since investment | |
| hours = max(1, (final_timestamp - int(initial_timestamp)) / 3600) | |
| # Calculate time ratio (hours in a year / hours since investment) | |
| hours_in_year = 8760 | |
| time_ratio = hours_in_year / hours | |
| # Calculate APR (Annualized ROI) | |
| apr = float(roi * time_ratio) | |
| if apr < 0: | |
| apr = roi | |
| adjust_apr = _calculate_adjusted_apr( | |
| apr=apr, | |
| initial_timestamp=initial_timestamp, | |
| final_timestamp=final_timestamp | |
| ) | |
| return float(round(apr, 2)), float(round(adjust_apr, 2)), float(round(roi, 2)) | |
| def fix_apr_and_roi(df: DataFrame) -> DataFrame: | |
| """ | |
| Fix APR and ROI values by recalculating them based on actual blockchain data. | |
| This function processes each row only once and includes proper error handling. | |
| """ | |
| if df.empty: | |
| logger.info("Empty DataFrame provided to fix_apr_and_roi, returning as-is") | |
| return df | |
| logger.info(f"Starting fix_apr_and_roi with {len(df)} rows") | |
| # Remove rows with excluded addresses | |
| original_count = len(df) | |
| df = df[~df['address'].isin(EXCLUDED_ADDRESSES)] | |
| excluded_count = original_count - len(df) | |
| if excluded_count > 0: | |
| logger.info(f"Excluded {excluded_count} rows with excluded addresses") | |
| # Remove rows with timestamps before 2025-06-06 | |
| original_count = len(df) | |
| df = df[df['timestamp'] >= '2025-06-06 00:00:00.000000'] | |
| old_data_count = original_count - len(df) | |
| if old_data_count > 0: | |
| logger.info(f"Excluded {old_data_count} rows with timestamps before 2025-06-06") | |
| # Check for future timestamps and filter them out | |
| current_time = datetime.now().timestamp() | |
| future_rows = df[df['timestamp'].apply(lambda x: x.timestamp()) > current_time] | |
| if not future_rows.empty: | |
| logger.warning(f"Found {len(future_rows)} rows with future timestamps, excluding them") | |
| for idx, row in future_rows.iterrows(): | |
| logger.warning(f"Future timestamp found: {row['timestamp']} (timestamp: {row['timestamp'].timestamp()})") | |
| df = df[df['timestamp'].apply(lambda x: x.timestamp()) <= current_time] | |
| if df.empty: | |
| logger.warning("No valid rows remaining after filtering, returning empty DataFrame") | |
| return df | |
| logger.info(f"Processing {len(df)} valid rows") | |
| # Create a copy to avoid modifying the original DataFrame during iteration | |
| df_copy = df.copy() | |
| rows_to_drop = [] | |
| processed_count = 0 | |
| for idx, row in df_copy.iterrows(): | |
| try: | |
| if row['is_dummy']: | |
| logger.debug(f"Skipping dummy row {idx}") | |
| continue | |
| processed_count += 1 | |
| logger.info(f"Processing row {processed_count}/{len(df_copy)} - Address: {row['address']}") | |
| final_timestamp = int(row['timestamp'].timestamp()) | |
| # Validate timestamp is not in the future | |
| if final_timestamp > current_time: | |
| logger.warning(f"Skipping row {idx} with future timestamp: {final_timestamp}") | |
| rows_to_drop.append(idx) | |
| continue | |
| calculation_metrics = row['calculation_metrics'] | |
| # Calculate initial value and timestamp with error handling | |
| try: | |
| initial_value, initial_timestamp = calculate_initial_value_from_address_and_timestamp( | |
| row['address'], final_timestamp | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error calculating initial value for address {row['address']}: {e}") | |
| rows_to_drop.append(idx) | |
| continue | |
| # Calculate final value with error handling | |
| try: | |
| final_value = calculate_final_value_from_address_and_timestamp( | |
| row['address'], final_timestamp | |
| ) | |
| # Add volume if it exists and is positive | |
| volume = row.get("volume", 0) | |
| if volume and volume > 0: | |
| final_value += volume | |
| logger.info(f"Added volume ${volume:.2f} to final value for address {row['address']}") | |
| except Exception as e: | |
| logger.error(f"Error calculating final value for address {row['address']}: {e}") | |
| rows_to_drop.append(idx) | |
| continue | |
| if initial_value <= 0: | |
| logger.warning(f"Initial value for address {row['address']} is non-positive ({initial_value}), skipping row.") | |
| rows_to_drop.append(idx) | |
| continue | |
| # Update calculation metrics | |
| calculation_metrics['initial_value'] = initial_value | |
| calculation_metrics['final_value'] = final_value | |
| df.at[idx, 'calculation_metrics'] = calculation_metrics | |
| # Calculate APR and ROI with error handling | |
| try: | |
| apr, adjusted_apr, roi = calculate_apr_and_roi( | |
| initial_value=initial_value, | |
| final_value=final_value, | |
| initial_timestamp=initial_timestamp, | |
| final_timestamp=final_timestamp | |
| ) | |
| df.at[idx, 'apr'] = apr | |
| df.at[idx, 'adjusted_apr'] = adjusted_apr | |
| df.at[idx, 'roi'] = roi | |
| logger.info(f"Successfully processed address {row['address']}: APR={apr:.2f}, ROI={roi:.2f}") | |
| except Exception as e: | |
| logger.error(f"Error calculating APR/ROI for address {row['address']}: {e}") | |
| rows_to_drop.append(idx) | |
| continue | |
| except Exception as e: | |
| logger.error(f"Unexpected error processing row {idx}: {e}") | |
| rows_to_drop.append(idx) | |
| continue | |
| # Drop rows that had errors | |
| if rows_to_drop: | |
| logger.info(f"Dropping {len(rows_to_drop)} rows due to errors") | |
| df = df.drop(rows_to_drop) | |
| logger.info(f"Completed fix_apr_and_roi: {len(df)} rows remaining") | |
| return df | |
| if __name__ == "__main__": | |
| test_address = "0xc8E264f402Ae94f69bDEf8B1f035F7200cD2B0c7" | |
| test_final_timestamp = 1750711233 | |
| v = calculate_initial_value_from_address_and_timestamp( | |
| test_address, | |
| test_final_timestamp | |
| ) | |
| print(v) | |