1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| import gymnasium as gym, torch, torch.nn as nn, numpy as np, random, matplotlib.pyplot as plt from collections import deque
env = gym.make("LunarLander-v3")
state_dim = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n action_dim = env.action_space.n
device = 'cpu'
class VAnet(nn.Module): def __init__(self, hidden_dim = 128): super(VAnet, self).__init__() self.shared = nn.Sequential( nn.Linear(state_dim, hidden_dim),nn.ReLU(), nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), ) self.value = nn.Sequential( nn.Linear(hidden_dim, hidden_dim),nn.ReLU(), nn.Linear(hidden_dim, 1) ) self.advantage = nn.Sequential( nn.Linear(hidden_dim, hidden_dim),nn.ReLU(), nn.Linear(hidden_dim, action_dim) )
def forward(self, x): shared = self.shared(x) value = self.value(shared) advantage = self.advantage(shared) return value + advantage - advantage.mean()
class ReplayBuffer(object): def __init__(self, max_size = 50000): self.max_size = max_size self.buffer = deque(maxlen = max_size)
def add(self, state, action, reward, next_state, done): if self.__len__() >= self.max_size: self.buffer.popleft() self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size): indices = np.random.choice(len(self.buffer), batch_size, replace=True) batch = [self.buffer[i] for i in indices] states, actions, rewards, next_states, dones = zip(*batch) return (torch.FloatTensor(states).to(device), torch.LongTensor(actions).to(device), torch.FloatTensor(rewards).to(device), torch.FloatTensor(next_states).to(device), torch.FloatTensor(dones).to(device))
def __len__(self): return len(self.buffer)
class DQN(): def __init__(self, lr = 3e-4,gamma = 0.98, epsilon = 0.1, batch_size = 128, update_epochs = 4): self.q_net = VAnet().to(device) self.target_q_net = VAnet().to(device) self.target_q_net.load_state_dict(self.q_net.state_dict()) self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr) self.gamma = gamma self.epsilon = epsilon self.batch_size = batch_size self.update_epochs = update_epochs self.loss = nn.MSELoss() self.memory = ReplayBuffer() self.learnstep = 0
def choose_action(self, state): state = torch.FloatTensor(state).unsqueeze(0).to(device) if np.random.random() > self.epsilon: action_values = self.q_net(state) action = torch.argmax(action_values).item() else: action = np.random.randint(0, action_dim) return action
def store_transition(self, state, action, reward, next_state, done): self.memory.add(state, action, reward, next_state, done)
def learn(self): self.learnstep += 1 if len(self.memory) < self.batch_size or self.learnstep % self.update_epochs != 0: return states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size) q_values = self.q_net(states) next_q_values = self.target_q_net(next_states) q_sa = q_values.gather(1, actions.unsqueeze(1)).squeeze(1) target = rewards + self.gamma * next_q_values.max(1)[0].detach() * (1 - dones) loss = self.loss(q_sa, target) self.optimizer.zero_grad() loss.backward() self.optimizer.step() for target_param, param in zip(self.target_q_net.parameters(), self.q_net.parameters()): target_param.data.copy_(tau * param.data + (1.0 - tau) * target_param.data)
from tqdm import tqdm episodes = 1000 tau = 0.001 epsilon_decay = 0.99 epsilon_start = 1 epsilon_end = 0.05 scores = [] model = DQN() model.epsilon = epsilon_start pbar = tqdm(range(episodes), desc="Training") for episode in pbar: state, _ = env.reset() score = 0 done = False while not done: action = model.choose_action(state) next_state, reward, done, truncated,_ = env.step(action) done = done or truncated model.store_transition(state, action, reward, next_state, done) model.learn() state = next_state score += reward env.render() scores.append(score) model.epsilon = max(epsilon_end, epsilon_decay * model.epsilon) pbar.set_postfix(ep=episode, score=score, avg100=np.mean(scores[-100:]), ε=model.epsilon) print(scores) torch.save(model.q_net.state_dict(), "../../model/LunarLander-DuelingDQN.pt") plt.plot(scores) plt.show()
|