Spaces:
Sleeping
Sleeping
| import pytest | |
| from collections import namedtuple | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from torch.utils.data import DataLoader | |
| import treetensor.torch as ttorch | |
| from ding.torch_utils import CudaFetcher, to_device, to_dtype, to_tensor, to_ndarray, to_list, \ | |
| tensor_to_list, same_shape, build_log_buffer, get_tensor_data, to_item | |
| from ding.utils import EasyTimer | |
| def setup_data_dict(): | |
| return { | |
| 'tensor': torch.randn(4), | |
| 'list': [True, False, False], | |
| 'tuple': (4, 5, 6), | |
| 'bool': True, | |
| 'int': 10, | |
| 'float': 10., | |
| 'array': np.random.randn(4), | |
| 'str': "asdf", | |
| 'none': None, | |
| } | |
| class TestDataFunction: | |
| def test_to_dtype(self): | |
| t = torch.randint(0, 10, (3, 5)) | |
| tfloat = to_dtype(t, torch.float) | |
| assert tfloat.dtype == torch.float | |
| tlist = [t] | |
| tlfloat = to_dtype(tlist, torch.float) | |
| assert tlfloat[0].dtype == torch.float | |
| tdict = {'t': t} | |
| tdictf = to_dtype(tdict, torch.float) | |
| assert tdictf['t'].dtype == torch.float | |
| with pytest.raises(TypeError): | |
| to_dtype(EasyTimer(), torch.float) | |
| def test_to_tensor(self, setup_data_dict): | |
| i = 10 | |
| t = to_tensor(i) | |
| assert t.item() == i | |
| d = {'i': i} | |
| dt = to_tensor(d, torch.int) | |
| assert dt['i'].item() == i | |
| with pytest.raises(TypeError): | |
| _ = to_tensor({1, 2}, torch.int) | |
| data_type = namedtuple('data_type', ['x', 'y']) | |
| inputs = data_type(np.random.random(3), 4) | |
| outputs = to_tensor(inputs, torch.float32) | |
| assert type(outputs) == data_type | |
| assert isinstance(outputs.x, torch.Tensor) | |
| assert isinstance(outputs.y, torch.Tensor) | |
| assert outputs.x.dtype == torch.float32 | |
| assert outputs.y.dtype == torch.float32 | |
| transformed_tensor = to_tensor(setup_data_dict) | |
| with pytest.raises(TypeError): | |
| to_tensor(EasyTimer(), torch.float) | |
| def test_to_ndarray(self, setup_data_dict): | |
| t = torch.randn(3, 5) | |
| tarray1 = to_ndarray(t) | |
| assert tarray1.shape == (3, 5) | |
| assert isinstance(tarray1, np.ndarray) | |
| t = [torch.randn(5, ) for i in range(3)] | |
| tarray1 = to_ndarray(t, np.float32) | |
| assert isinstance(tarray1, list) | |
| assert tarray1[0].shape == (5, ) | |
| assert isinstance(tarray1[0], np.ndarray) | |
| transformed_array = to_ndarray(setup_data_dict) | |
| with pytest.raises(TypeError): | |
| to_ndarray(EasyTimer(), np.float32) | |
| def test_to_list(self, setup_data_dict): | |
| # tensor_to_list | |
| t = torch.randn(3, 5) | |
| tlist1 = tensor_to_list(t) | |
| assert len(tlist1) == 3 | |
| assert len(tlist1[0]) == 5 | |
| t = torch.randn(3, ) | |
| tlist1 = tensor_to_list(t) | |
| assert len(tlist1) == 3 | |
| t = [torch.randn(5, ) for i in range(3)] | |
| tlist1 = tensor_to_list(t) | |
| assert len(tlist1) == 3 | |
| assert len(tlist1[0]) == 5 | |
| td = {'t': t} | |
| tdlist1 = tensor_to_list(td) | |
| assert len(tdlist1['t']) == 3 | |
| assert len(tdlist1['t'][0]) == 5 | |
| tback = to_tensor(tlist1, torch.float) | |
| for i in range(3): | |
| assert (tback[i] == t[i]).all() | |
| with pytest.raises(TypeError): | |
| tensor_to_list(EasyTimer()) | |
| # to_list | |
| transformed_list = to_list(setup_data_dict) | |
| with pytest.raises(TypeError): | |
| to_ndarray(EasyTimer()) | |
| def test_to_item(self): | |
| data = { | |
| 'tensor': torch.randn(1), | |
| 'list': [True, False, torch.randn(1)], | |
| 'tuple': (4, 5, 6), | |
| 'bool': True, | |
| 'int': 10, | |
| 'float': 10., | |
| 'array': np.random.randn(1), | |
| 'str': "asdf", | |
| 'none': None, | |
| } | |
| assert not np.isscalar(data['tensor']) | |
| assert not np.isscalar(data['array']) | |
| assert not np.isscalar(data['list'][-1]) | |
| new_data = to_item(data) | |
| assert np.isscalar(new_data['tensor']) | |
| assert np.isscalar(new_data['array']) | |
| assert np.isscalar(new_data['list'][-1]) | |
| data = ttorch.randn({'a': 1}) | |
| new_data = to_item(data) | |
| assert np.isscalar(new_data.a) | |
| with pytest.raises((ValueError, RuntimeError)): | |
| to_item({'a': torch.randn(4), 'b': torch.rand(1)}, ignore_error=False) | |
| output = to_item({'a': torch.randn(4), 'b': torch.rand(1)}, ignore_error=True) | |
| assert 'a' not in output | |
| assert 'b' in output | |
| def test_same_shape(self): | |
| tlist = [torch.randn(3, 5) for i in range(5)] | |
| assert same_shape(tlist) | |
| tlist = [torch.randn(3, 5), torch.randn(4, 5)] | |
| assert not same_shape(tlist) | |
| def test_get_tensor_data(self): | |
| a = { | |
| 'tensor': torch.tensor([1, 2, 3.], requires_grad=True), | |
| 'list': [torch.tensor([1, 2, 3.], requires_grad=True) for _ in range(2)], | |
| 'none': None | |
| } | |
| tensor_a = get_tensor_data(a) | |
| assert not tensor_a['tensor'].requires_grad | |
| for t in tensor_a['list']: | |
| assert not t.requires_grad | |
| with pytest.raises(TypeError): | |
| get_tensor_data(EasyTimer()) | |
| def test_log_dict(): | |
| log_buffer = build_log_buffer() | |
| log_buffer['not_tensor'] = torch.randn(3) | |
| assert isinstance(log_buffer['not_tensor'], list) | |
| assert len(log_buffer['not_tensor']) == 3 | |
| log_buffer.update({'not_tensor': 4, 'a': 5}) | |
| assert log_buffer['not_tensor'] == 4 | |
| class TestCudaFetcher: | |
| def get_dataloader(self): | |
| class Dataset(object): | |
| def __init__(self): | |
| self.data = torch.randn(2560, 2560) | |
| def __len__(self): | |
| return 100 | |
| def __getitem__(self, idx): | |
| return self.data | |
| return DataLoader(Dataset(), batch_size=3) | |
| def get_model(self): | |
| class Model(nn.Module): | |
| def __init__(self): | |
| super(Model, self).__init__() | |
| self.main = [nn.Linear(2560, 2560) for _ in range(100)] | |
| self.main = nn.Sequential(*self.main) | |
| def forward(self, x): | |
| x = self.main(x) | |
| return x | |
| return Model() | |
| def test_naive(self): | |
| model = self.get_model() | |
| model.cuda() | |
| timer = EasyTimer() | |
| dataloader = iter(self.get_dataloader()) | |
| dataloader = CudaFetcher(dataloader, device='cuda', sleep=0.1) | |
| dataloader.run() | |
| count = 0 | |
| while True: | |
| with timer: | |
| data = next(dataloader) | |
| model(data) | |
| print('count {}, run_time: {}'.format(count, timer.value)) | |
| count += 1 | |
| if count == 10: | |
| break | |
| dataloader.close() | |
| def test_to_device_cuda(setup_data_dict): | |
| setup_data_dict['module'] = nn.Linear(3, 5) | |
| device = 'cuda' | |
| cuda_d = to_device(setup_data_dict, device, ignore_keys=['module']) | |
| assert cuda_d['module'].weight.device == torch.device('cpu') | |
| other = EasyTimer() | |
| with pytest.raises(TypeError): | |
| to_device(other) | |
| def test_to_device_cpu(setup_data_dict): | |
| setup_data_dict['module'] = nn.Linear(3, 5) | |
| device = 'cpu' | |
| cuda_d = to_device(setup_data_dict, device, ignore_keys=['module']) | |
| assert cuda_d['module'].weight.device == torch.device('cpu') | |
| other = EasyTimer() | |
| with pytest.raises(TypeError): | |
| to_device(other) | |