Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any | |
| from easydict import EasyDict | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import torch.nn.functional as F | |
| from torch.distributions import Independent, Normal | |
| from ding.utils import REWARD_MODEL_REGISTRY | |
| from ding.utils.data import default_collate | |
| from .base_reward_model import BaseRewardModel | |
| class GuidedCostNN(nn.Module): | |
| def __init__( | |
| self, | |
| input_size, | |
| hidden_size=128, | |
| output_size=1, | |
| ): | |
| super(GuidedCostNN, self).__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(input_size, hidden_size), | |
| nn.ReLU(), | |
| nn.Linear(hidden_size, hidden_size), | |
| nn.ReLU(), | |
| nn.Linear(hidden_size, output_size), | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| class GuidedCostRewardModel(BaseRewardModel): | |
| """ | |
| Overview: | |
| Policy class of Guided cost algorithm. (https://arxiv.org/pdf/1603.00448.pdf) | |
| Interface: | |
| ``estimate``, ``train``, ``collect_data``, ``clear_date``, \ | |
| ``__init__``, ``state_dict``, ``load_state_dict``, ``learn``\ | |
| ``state_dict_reward_model``, ``load_state_dict_reward_model`` | |
| Config: | |
| == ==================== ======== ============= ======================================== ================ | |
| ID Symbol Type Default Value Description Other(Shape) | |
| == ==================== ======== ============= ======================================== ================ | |
| 1 ``type`` str guided_cost | Reward model register name, refer | | |
| | to registry ``REWARD_MODEL_REGISTRY`` | | |
| 2 | ``continuous`` bool True | Whether action is continuous | | |
| 3 | ``learning_rate`` float 0.001 | learning rate for optimizer | | |
| 4 | ``update_per_`` int 100 | Number of updates per collect | | |
| | ``collect`` | | | |
| 5 | ``batch_size`` int 64 | Training batch size | | |
| 6 | ``hidden_size`` int 128 | Linear model hidden size | | |
| 7 | ``action_shape`` int 1 | Action space shape | | |
| 8 | ``log_every_n`` int 50 | add loss to log every n iteration | | |
| | ``_train`` | | | |
| 9 | ``store_model_`` int 100 | save model every n iteration | | |
| | ``every_n_train`` | | |
| == ==================== ======== ============= ======================================== ================ | |
| """ | |
| config = dict( | |
| # (str) Reward model register name, refer to registry ``REWARD_MODEL_REGISTRY``. | |
| type='guided_cost', | |
| # (float) The step size of gradient descent. | |
| learning_rate=1e-3, | |
| # (int) Action space shape, such as 1. | |
| action_shape=1, | |
| # (bool) Whether action is continuous. | |
| continuous=True, | |
| # (int) How many samples in a training batch. | |
| batch_size=64, | |
| # (int) Linear model hidden size. | |
| hidden_size=128, | |
| # (int) How many updates(iterations) to train after collector's one collection. | |
| # Bigger "update_per_collect" means bigger off-policy. | |
| # collect data -> update policy-> collect data -> ... | |
| update_per_collect=100, | |
| # (int) Add loss to log every n iteration. | |
| log_every_n_train=50, | |
| # (int) Save model every n iteration. | |
| store_model_every_n_train=100, | |
| ) | |
| def __init__(self, config: EasyDict, device: str, tb_logger: 'SummaryWriter') -> None: # noqa | |
| super(GuidedCostRewardModel, self).__init__() | |
| self.cfg = config | |
| self.action_shape = self.cfg.action_shape | |
| assert device == "cpu" or device.startswith("cuda") | |
| self.device = device | |
| self.tb_logger = tb_logger | |
| self.reward_model = GuidedCostNN(config.input_size, config.hidden_size) | |
| self.reward_model.to(self.device) | |
| self.opt = optim.Adam(self.reward_model.parameters(), lr=config.learning_rate) | |
| def train(self, expert_demo: torch.Tensor, samp: torch.Tensor, iter, step): | |
| device_0 = expert_demo[0]['obs'].device | |
| device_1 = samp[0]['obs'].device | |
| for i in range(len(expert_demo)): | |
| expert_demo[i]['prob'] = torch.FloatTensor([1]).to(device_0) | |
| if self.cfg.continuous: | |
| for i in range(len(samp)): | |
| (mu, sigma) = samp[i]['logit'] | |
| dist = Independent(Normal(mu, sigma), 1) | |
| next_action = samp[i]['action'] | |
| log_prob = dist.log_prob(next_action) | |
| samp[i]['prob'] = torch.exp(log_prob).unsqueeze(0).to(device_1) | |
| else: | |
| for i in range(len(samp)): | |
| probs = F.softmax(samp[i]['logit'], dim=-1) | |
| prob = probs[samp[i]['action']] | |
| samp[i]['prob'] = prob.to(device_1) | |
| # Mix the expert data and sample data to train the reward model. | |
| samp.extend(expert_demo) | |
| expert_demo = default_collate(expert_demo) | |
| samp = default_collate(samp) | |
| cost_demo = self.reward_model( | |
| torch.cat([expert_demo['obs'], expert_demo['action'].float().reshape(-1, self.action_shape)], dim=-1) | |
| ) | |
| cost_samp = self.reward_model( | |
| torch.cat([samp['obs'], samp['action'].float().reshape(-1, self.action_shape)], dim=-1) | |
| ) | |
| prob = samp['prob'].unsqueeze(-1) | |
| loss_IOC = torch.mean(cost_demo) + \ | |
| torch.log(torch.mean(torch.exp(-cost_samp)/(prob+1e-7))) | |
| # UPDATING THE COST FUNCTION | |
| self.opt.zero_grad() | |
| loss_IOC.backward() | |
| self.opt.step() | |
| if iter % self.cfg.log_every_n_train == 0: | |
| self.tb_logger.add_scalar('reward_model/loss_iter', loss_IOC, iter) | |
| self.tb_logger.add_scalar('reward_model/loss_step', loss_IOC, step) | |
| def estimate(self, data: list) -> List[Dict]: | |
| # NOTE: this estimate method of gcl alg. is a little different from the one in other irl alg., | |
| # because its deepcopy is operated before learner train loop. | |
| train_data_augmented = data | |
| for i in range(len(train_data_augmented)): | |
| with torch.no_grad(): | |
| reward = self.reward_model( | |
| torch.cat([train_data_augmented[i]['obs'], train_data_augmented[i]['action'].float()]).unsqueeze(0) | |
| ).squeeze(0) | |
| train_data_augmented[i]['reward'] = -reward | |
| return train_data_augmented | |
| def collect_data(self, data) -> None: | |
| """ | |
| Overview: | |
| Collecting training data, not implemented if reward model (i.e. online_net) is only trained ones, \ | |
| if online_net is trained continuously, there should be some implementations in collect_data method | |
| """ | |
| # if online_net is trained continuously, there should be some implementations in collect_data method | |
| pass | |
| def clear_data(self): | |
| """ | |
| Overview: | |
| Collecting clearing data, not implemented if reward model (i.e. online_net) is only trained ones, \ | |
| if online_net is trained continuously, there should be some implementations in clear_data method | |
| """ | |
| # if online_net is trained continuously, there should be some implementations in clear_data method | |
| pass | |
| def state_dict_reward_model(self) -> Dict[str, Any]: | |
| return { | |
| 'model': self.reward_model.state_dict(), | |
| 'optimizer': self.opt.state_dict(), | |
| } | |
| def load_state_dict_reward_model(self, state_dict: Dict[str, Any]) -> None: | |
| self.reward_model.load_state_dict(state_dict['model']) | |
| self.opt.load_state_dict(state_dict['optimizer']) | |