代码运行后不仅没有起到训练的效果(奖励始终很低),而且老是训练几轮后就提示出现Nan值报错,有人帮我看看代码哪里写错了吗?
import gymnasium as gym
import random
import numpy as np
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.distributions import Normal
from collections import deque
def orthogonal_init(layer, gain=1.0):
nn.init.orthogonal_(layer.weight, gain=gain)
nn.init.constant_(layer.bias, 0)
class Config:
def __init__(self):
self.env_name = 'Pendulum-v1'
self.algo_name = 'PPO'
self.render_mode = 'rgb_array'
self.train_eps = 1500
self.test_eps = 10
self.max_steps = 100
self.clip = 0.2
self.lr_a = 2e-3
self.lr_c = 1e-2
self.gamma = 0.98
self.lamda = 0.95
self.epochs = 5
self.update_freq = 100
self.seed = random.randint(0, 100)
self.actor_hidden_dim = 256
self.critic_hidden_dim = 256
self.n_states = None
self.n_actions = None
self.action_bound = None
self.device = torch.device('cuda') \
if torch.cuda.is_available() else torch.device('cpu')
def show(self):
print('-' * 30 + '参数列表' + '-' * 30)
for k, v in vars(self).items():
print(k, '=', v)
print('-' * 60)
class ReplayBuffer:
def __init__(self, cfg):
self.buffer = deque()
self.device = cfg.device
def push(self, transitions):
self.buffer.append(transitions)
def clear(self):
self.buffer.clear()
def sample(self):
return map(lambda x: torch.tensor(np.array(x), dtype=torch.float32,
device=self.device), zip(*self.buffer))
def size(self):
return len(self.buffer)
class Actor(nn.Module):
def __init__(self, cfg):
super(Actor, self).__init__()
self.fc1 = nn.Linear(cfg.n_states, cfg.actor_hidden_dim)
self.fc2 = nn.Linear(cfg.actor_hidden_dim, cfg.actor_hidden_dim)
self.fc_mu = nn.Linear(cfg.actor_hidden_dim, cfg.n_actions)
self.fc_std = nn.Linear(cfg.actor_hidden_dim, cfg.n_actions)
self.action_bound = cfg.action_bound
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.fc_mu, gain=0.01)
orthogonal_init(self.fc_std, gain=0.01)
def forward(self, x):
x = F.tanh(self.fc1(x))
x = F.tanh(self.fc2(x))
mu = F.tanh(self.fc_mu(x)) * self.action_bound
std = F.softplus(self.fc_std(x))
return mu, std
class Critic(nn.Module):
def __init__(self, cfg):
super(Critic, self).__init__()
self.fc1 = nn.Linear(cfg.n_states, cfg.critic_hidden_dim)
self.fc2 = nn.Linear(cfg.critic_hidden_dim, cfg.critic_hidden_dim)
self.fc3 = nn.Linear(cfg.critic_hidden_dim, 1)
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.fc3)
def forward(self, x):
x = F.tanh(self.fc1(x))
x = F.tanh(self.fc2(x))
value = self.fc3(x)
return value
class PPO:
def __init__(self, cfg):
self.cfg = cfg
self.actor = Actor(cfg).to(cfg.device)
self.critic = Critic(cfg).to(cfg.device)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=cfg.lr_a, eps=1e-5)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=cfg.lr_c, eps=1e-5)
self.memory = ReplayBuffer(cfg)
self.sample_count = 0
@torch.no_grad()
def choose_action(self, state, predict=False):
if not predict:
self.sample_count += 1
state = torch.tensor(state, device=self.cfg.device, dtype=torch.float).unsqueeze(0)
mu, std = self.actor(state)
dist = Normal(mu, std)
action = dist.sample().clamp(-self.cfg.action_bound, self.cfg.action_bound)
return action.squeeze(0).cpu().numpy()
def update(self):
if self.sample_count % self.cfg.update_freq != 0:
return
states, actions, rewards, next_states, dones = self.memory.sample()
actions, rewards, dones = actions.view(-1, 1), rewards.view(-1, 1), dones.view(-1, 1)
with torch.no_grad():
q_values = self.critic(states)
next_q_values = self.critic(next_states)
target_q_values = rewards + self.cfg.gamma * next_q_values * (1 - dones)
td_error = target_q_values - q_values
td_error = td_error.detach().cpu().numpy()
advantage_lst = []
advantage = 0.0
for delta in td_error[::-1]:
advantage = self.cfg.gamma * self.cfg.lamda * advantage + delta
advantage_lst.append(advantage)
advantage_lst.reverse()
advantage = torch.tensor(np.array(advantage_lst), device=self.cfg.device, dtype=torch.float32)
mu, std = self.actor(states)
action_dists = Normal(mu.detach(), std.detach())
old_probs = action_dists.log_prob(actions)
for _ in range(self.cfg.epochs):
mu, std = self.actor(states)
print(mu, std)
action_dists = Normal(mu, std)
log_probs = action_dists.log_prob(actions)
ratios = torch.exp(log_probs - old_probs)
surr1 = ratios * advantage
surr2 = torch.clamp(ratios, 1 - self.cfg.clip, 1 + self.cfg.clip) * advantage
actor_loss = torch.mean(-torch.min(surr1, surr2))
self.actor_optimizer.zero_grad()
actor_loss.backward()
nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
self.actor_optimizer.step()
critic_loss = torch.mean(F.mse_loss(self.critic(states), target_q_values.detach()))
self.critic_optimizer.zero_grad()
critic_loss.backward()
nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
self.critic_optimizer.step()
self.memory.clear()
def env_agent_config(cfg):
env = gym.make(cfg.env_name, render_mode=cfg.render_mode).unwrapped
print(f'观测空间 = {env.observation_space}')
print(f'动作空间 = {env.action_space}')
cfg.n_states = env.observation_space.shape[0]
cfg.n_actions = env.action_space.shape[0]
cfg.action_bound = env.action_space.high[0]
agent = PPO(cfg)
return env, agent
def train(env, agent, cfg):
print('开始训练!')
cfg.show()
rewards, steps = [], []
for i in range(cfg.train_eps):
ep_reward, ep_step = 0.0, 0
state, _ = env.reset(seed=cfg.seed)
for _ in range(cfg.max_steps):
ep_step += 1
action = agent.choose_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
agent.memory.push((state, action, reward, next_state, done))
state = next_state
agent.update()
ep_reward += reward
if done:
break
rewards.append(ep_reward)
steps.append(ep_step)
print(f'回合:{i + 1}/{cfg.train_eps} 奖励:{ep_reward:.0f} 步数:{ep_step:.0f}')
print('完成训练!')
env.close()
return rewards, steps
def test(agent, cfg):
print('开始测试!')
rewards, steps = [], []
env = gym.make(cfg.env_name, render_mode='human')
for i in range(cfg.test_eps):
ep_reward, ep_step = 0.0, 0
state, _ = env.reset(seed=cfg.seed)
for _ in range(cfg.max_steps):
ep_step += 1
action = agent.choose_action(state, predict=True)
next_state, reward, terminated, truncated, _ = env.step(action)
state = next_state
ep_reward += reward
if terminated or truncated:
break
steps.append(ep_step)
rewards.append(ep_reward)
print(f'回合:{i + 1}/{cfg.test_eps} 奖励:{ep_reward:.3f}')
print('结束测试!')
env.close()
return rewards, steps
if __name__ == '__main__':
cfg = Config()
env, agent = env_agent_config(cfg)
train_rewards, train_steps = train(env, agent, cfg)
test_rewards, test_steps = test(agent, cfg)