File size: 5,399 Bytes
9fa24be
 
 
 
 
 
 
 
 
 
 
 
 
 
fd52006
 
 
 
9fa24be
 
fd52006
 
 
 
 
 
 
9fa24be
 
 
 
fd52006
9fa24be
 
 
 
 
fd52006
9fa24be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d07685c
9fa24be
 
4939391
9fa24be
 
4939391
9fa24be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
Utility functions for Token Journey Visualizer
Extracts and analyzes token representations through LLM layers
"""

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import numpy as np
from typing import Dict, List, Tuple

# Al inicio del archivo, después de los imports
_model_cache = {}

def get_model(model_name: str):
    """
    Load model only once and cache it.
    Uses float16 on GPU, float32 on CPU.
    """
    if model_name not in _model_cache:
        print(f"Loading model: {model_name}")
        
        # Detect if GPU is available
        device = "cuda" if torch.cuda.is_available() else "cpu"
        dtype = torch.float16 if device == "cuda" else torch.float32
        
        print(f"Using device: {device}, dtype: {dtype}")
        
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            output_hidden_states=True,
            torch_dtype=dtype,  # ← float32 en CPU, float16 en GPU
            device_map="auto",
            low_cpu_mem_usage=True
        )
        model.eval()
        _model_cache[model_name] = (tokenizer, model)
        print(f"Model loaded and cached")
    
    return _model_cache[model_name]


def extract_single_transformation(
    text: str,
    token_index: int,
    component: str,
    layer: int = 0,
    model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
) -> Dict:
    """
    Extract a single transformation: input vector → weight matrix → output vector.
    
    Args:
        text: Input text
        token_index: Which token to track
        component: Which transformation ("q_proj", "k_proj", "v_proj", "o_proj", etc.)
        layer: Which layer (0-21 for TinyLlama)
        model_name: Model identifier
    
    Returns:
        dict: {
            'input_vector': np.array (2048,),
            'weight_matrix': np.array (2048, 2048),
            'output_vector': np.array (2048,),
            'token_text': str,
            'component_name': str
        }
    """
    
    # Load model and tokenizer
    tokenizer, model = get_model(model_name)
    model.eval()
    
    # Tokenize
    tokens = tokenizer(text, return_tensors="pt")
    token_ids = tokens.input_ids[0]
    
    if token_index >= len(token_ids):
        raise ValueError(f"token_index {token_index} out of range")
    
    token_text = tokenizer.decode([token_ids[token_index]])
    
    # Forward pass
    with torch.no_grad():
        outputs = model(**tokens)
    
    # Get the input to the selected layer (normalized embeddings or previous layer output)
    hidden_states = outputs.hidden_states
    
    # CASE: Q Projection in a specific layer
    if component == "q_proj":
        # Input: after input_layernorm
        input_hidden = hidden_states[layer + 1][0, token_index]  # +1 because hidden_states[0] is embeddings
        
        # Get normalized input (what actually goes into Q projection)
        layer_module = model.model.layers[layer]
        normalized_input = layer_module.input_layernorm(hidden_states[layer + 1][0:1, token_index:token_index+1, :])
        input_vector = normalized_input[0, 0].detach().cpu().float().numpy()
        
        # Weight matrix
        weight_matrix = layer_module.self_attn.q_proj.weight.detach().cpu().numpy()
        
        # Output vector
        output_vector = layer_module.self_attn.q_proj(normalized_input)[0, 0].detach().cpu().numpy()
        
        component_name = f"Layer {layer} - Q Projection"
    
    # TODO: Add more components (k_proj, v_proj, o_proj, mlp, etc.)
    else:
        raise NotImplementedError(f"Component '{component}' not implemented yet")
    
    return {
        'input_vector': input_vector,
        'weight_matrix': weight_matrix,
        'output_vector': output_vector,
        'token_text': token_text,
        'component_name': component_name
    }

def get_token_choices(text: str, model_name: str) -> Tuple[List[str], List[int]]:
    """
    Tokenize text and return choices for dropdown.
    
    Args:
        text (str): Input text
        model_name (str): HuggingFace model identifier
    
    Returns:
        Tuple[List[str], List[int]]: 
            - List of formatted token choices for UI
            - List of corresponding token indices
    """
    tokenizer, model = get_model(model_name)
    tokens = tokenizer(text, return_tensors="pt")
    token_ids = tokens.input_ids[0]
    
    choices = []
    indices = []
    
    for idx, tid in enumerate(token_ids):
        token_text = tokenizer.decode([tid])
        choices.append(f"{idx}: `{token_text}`")
        indices.append(idx)
    
    return choices, indices

# Test function
if __name__ == "__main__":
    result = extract_single_transformation(
        text="The cat sat on the mat",
        token_index=1,
        component="q_proj",
        layer=0
    )
    
    print(f"Token: {result['token_text']}")
    print(f"Component: {result['component_name']}")
    print(f"Input shape: {result['input_vector'].shape}")
    print(f"Weight shape: {result['weight_matrix'].shape}")
    print(f"Output shape: {result['output_vector'].shape}")
    
    # Verify matrix multiplication
    manual_output = result['input_vector'] @ result['weight_matrix'].T
    print(f"\nVerification (should be close to 0): {np.linalg.norm(manual_output - result['output_vector']):.6f}")