Spaces:
Sleeping
Sleeping
| """ | |
| Utility functions for Token Journey Visualizer | |
| Extracts and analyzes token representations through LLM layers | |
| """ | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import numpy as np | |
| from typing import Dict, List, Tuple | |
| # Al inicio del archivo, después de los imports | |
| _model_cache = {} | |
| def get_model(model_name: str): | |
| """ | |
| Load model only once and cache it. | |
| Uses float16 on GPU, float32 on CPU. | |
| """ | |
| if model_name not in _model_cache: | |
| print(f"Loading model: {model_name}") | |
| # Detect if GPU is available | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| dtype = torch.float16 if device == "cuda" else torch.float32 | |
| print(f"Using device: {device}, dtype: {dtype}") | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| output_hidden_states=True, | |
| torch_dtype=dtype, # ← float32 en CPU, float16 en GPU | |
| device_map="auto", | |
| low_cpu_mem_usage=True | |
| ) | |
| model.eval() | |
| _model_cache[model_name] = (tokenizer, model) | |
| print(f"Model loaded and cached") | |
| return _model_cache[model_name] | |
| def extract_single_transformation( | |
| text: str, | |
| token_index: int, | |
| component: str, | |
| layer: int = 0, | |
| model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" | |
| ) -> Dict: | |
| """ | |
| Extract a single transformation: input vector → weight matrix → output vector. | |
| Args: | |
| text: Input text | |
| token_index: Which token to track | |
| component: Which transformation ("q_proj", "k_proj", "v_proj", "o_proj", etc.) | |
| layer: Which layer (0-21 for TinyLlama) | |
| model_name: Model identifier | |
| Returns: | |
| dict: { | |
| 'input_vector': np.array (2048,), | |
| 'weight_matrix': np.array (2048, 2048), | |
| 'output_vector': np.array (2048,), | |
| 'token_text': str, | |
| 'component_name': str | |
| } | |
| """ | |
| # Load model and tokenizer | |
| tokenizer, model = get_model(model_name) | |
| model.eval() | |
| # Tokenize | |
| tokens = tokenizer(text, return_tensors="pt") | |
| token_ids = tokens.input_ids[0] | |
| if token_index >= len(token_ids): | |
| raise ValueError(f"token_index {token_index} out of range") | |
| token_text = tokenizer.decode([token_ids[token_index]]) | |
| # Forward pass | |
| with torch.no_grad(): | |
| outputs = model(**tokens) | |
| # Get the input to the selected layer (normalized embeddings or previous layer output) | |
| hidden_states = outputs.hidden_states | |
| # CASE: Q Projection in a specific layer | |
| if component == "q_proj": | |
| # Input: after input_layernorm | |
| input_hidden = hidden_states[layer + 1][0, token_index] # +1 because hidden_states[0] is embeddings | |
| # Get normalized input (what actually goes into Q projection) | |
| layer_module = model.model.layers[layer] | |
| normalized_input = layer_module.input_layernorm(hidden_states[layer + 1][0:1, token_index:token_index+1, :]) | |
| input_vector = normalized_input[0, 0].detach().cpu().float().numpy() | |
| # Weight matrix | |
| weight_matrix = layer_module.self_attn.q_proj.weight.detach().cpu().numpy() | |
| # Output vector | |
| output_vector = layer_module.self_attn.q_proj(normalized_input)[0, 0].detach().cpu().numpy() | |
| component_name = f"Layer {layer} - Q Projection" | |
| # TODO: Add more components (k_proj, v_proj, o_proj, mlp, etc.) | |
| else: | |
| raise NotImplementedError(f"Component '{component}' not implemented yet") | |
| return { | |
| 'input_vector': input_vector, | |
| 'weight_matrix': weight_matrix, | |
| 'output_vector': output_vector, | |
| 'token_text': token_text, | |
| 'component_name': component_name | |
| } | |
| def get_token_choices(text: str, model_name: str) -> Tuple[List[str], List[int]]: | |
| """ | |
| Tokenize text and return choices for dropdown. | |
| Args: | |
| text (str): Input text | |
| model_name (str): HuggingFace model identifier | |
| Returns: | |
| Tuple[List[str], List[int]]: | |
| - List of formatted token choices for UI | |
| - List of corresponding token indices | |
| """ | |
| tokenizer, model = get_model(model_name) | |
| tokens = tokenizer(text, return_tensors="pt") | |
| token_ids = tokens.input_ids[0] | |
| choices = [] | |
| indices = [] | |
| for idx, tid in enumerate(token_ids): | |
| token_text = tokenizer.decode([tid]) | |
| choices.append(f"{idx}: `{token_text}`") | |
| indices.append(idx) | |
| return choices, indices | |
| # Test function | |
| if __name__ == "__main__": | |
| result = extract_single_transformation( | |
| text="The cat sat on the mat", | |
| token_index=1, | |
| component="q_proj", | |
| layer=0 | |
| ) | |
| print(f"Token: {result['token_text']}") | |
| print(f"Component: {result['component_name']}") | |
| print(f"Input shape: {result['input_vector'].shape}") | |
| print(f"Weight shape: {result['weight_matrix'].shape}") | |
| print(f"Output shape: {result['output_vector'].shape}") | |
| # Verify matrix multiplication | |
| manual_output = result['input_vector'] @ result['weight_matrix'].T | |
| print(f"\nVerification (should be close to 0): {np.linalg.norm(manual_output - result['output_vector']):.6f}") |