强化学习入门-1(DDQN)

强化学习项目-1-CartPole-v1(DDQN)

环境

本环境是OpenAI Gym提供的一个经典控制环境。

官网链接:https://gymnasium.farama.org/environments/classic_control/cart_pole/

观测空间(状态S)

状态共包含$4$个参数:

  • 车位置(Cart Position)
  • 车速(Cart Velocity)
  • 杆子的角度(Pole Angle)
  • 角速度(Pole Angular Velocity)

动作空间(动作A)

  • 0: 推动车向左移动
  • 1: 推动车向右移动

奖励

每坚持一步,环境将会给出$1$点奖励,最大可以获得$500$奖励,同时只要达到$200$就视为达到通过门槛。

引入环境

下载包

1
pip install gymnasium

导入

1
2
3
4
5
import gymnasium as gym
env = gym.make("CartPole-v1", render_mode="human")
# 获取状态维度和动作维度
state_dim = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.n

Q网络

定义

这里$Q$网络仅为一个替代$Q$函数的预测神经网络,对于状态$s$预测所有的$Q(s, a)$

双网络结构

为了确保$Q$值的稳定性,一般会使用两个神经网络:

  • $Q$网络:用于估计当前策略的$Q$值的网络
  • 目标网络:用于提高稳定的目标$Q$值的网络

简单来说,就是由$Q$网络输出预测值,由目标网络预测结果作为真实值,并且每次训练仅更新$Q$网络,每经过若干轮训练后再将$Q$网络参数复制到目标网络。

代码实现

这里网络采用两层隐藏层,维度均为$128$,激活函数为Relu

1
2
3
4
5
6
7
8
9
10
11
class Qnet(nn.Module):
def __init__(self, hidden_dim = 128):
super(Qnet, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)

def forward(self, x):
return self.net(x)

经验回放池

定义

用于存储和重复利用历史交互数据的数据结构。

它把智能体与环境交互产生的经验元组(通常形如$(s, a, r, s^{\prime}, done)$)暂存起来,并在后续训练中以随机小批量的形式反复抽取,用于更新策略或价值函数。

代码实现

经验回放池共包含3个函数:

  • 初始化:创建一个双端队列存储数据,并设置最大容量
  • 添加数据:将经验元组放入双端队列,如果超过容量先进行删除操作
  • 随机采样:随机采样$batch ; size$组数据,转换后成张量后返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ReplayBuffer(object):
def __init__(self, max_size = 50000):
self.max_size = max_size
self.buffer = deque(maxlen = max_size)

def add(self, state, action, reward, next_state, done):
if self.__len__() >= self.max_size:
self.buffer.popleft()
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size, device = 'mps'):
indices = np.random.choice(len(self.buffer), batch_size, replace=True)
batch = [self.buffer[i] for i in indices]
states, actions, rewards, next_states, dones = zip(*batch)
return (torch.FloatTensor(states).to(device),
torch.LongTensor(actions).to(device),
torch.FloatTensor(rewards).to(device),
torch.FloatTensor(next_states).to(device),
torch.FloatTensor(dones).to(device))

DQN算法

定义

DQN算法的核心就是使用神经网络替代了Q函数,用于预测$Q(s,a)$

初始化

定义时将所有需要的参数设置好。

定义好两个网络,设置好优化器,折扣因子等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
class DQN():
def __init__(self, lr = 3e-4,gamma = 0.98, epsilon = 0.1, batch_size = 128, update_epochs = 50):
self.q_net = Qnet()
self.target_q_net = Qnet()
self.target_q_net.load_state_dict(self.q_net.state_dict())
self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr)
self.gamma = gamma
self.epsilon = epsilon
self.batch_size = batch_size
self.update_epochs = update_epochs
self.loss = nn.MSELoss()
self.memory = ReplayBuffer()
self.learnstep = 0

动作选择

这里使用$\epsilon$贪心策略进行动作选择,$\epsilon$在训练时动态更新。

1
2
3
4
5
6
7
8
9
def choose_action(self, state):
state = torch.from_numpy(state).float()
state = state.unsqueeze(0)
if np.random.random() > self.epsilon:
action_values = self.q_net(state)
action = torch.argmax(action_values).item()
else:
action = np.random.randint(0, action_dim)
return action

状态保存

将智能体与环境的互动存储下来,用于后续的训练。

1
2
def store_transition(self, state, action, reward, next_state, done):
self.memory.add(state, action, reward, next_state, done)

训练

当收集到超过$batch ; size$组信息后,就可以开始训练了,在经验回放池中随机取出$batch ; size$组信息,并通过$Q$网络得到预测到$Q(s_{t},a_{t})$值。

然后通过目标网络得到下一状态的$Q(s_{t + 1}, a_{t + 1})$,并计算出目标$Q$值。

得到当前$Q$值和目标$Q$值后计算损失并更新网络,同时定期更新目标网络。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def learn(self):
if len(self.memory) < self.batch_size:
return
# 批量计算Q(s,a)
states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
q_values = self.q_net(states)
next_q_values = self.target_q_net(next_states)
q_sa = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
target = rewards + self.gamma * next_q_values.max(1)[0].detach() * (1 - dones)
# 计算损失并反向传播
loss = self.loss(q_sa, target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 目标网络更新
self.learnstep += 1
if self.learnstep % self.update_epochs == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict())

环境交互 & 模型训练

设置好参数后就可以初始化环境开始收集信息并训练模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from tqdm import tqdm
episodes = 1000
epsilon_dacay = 0.995
epsilon_start = 1
epsilon_end = 0.05
scores = []
model = DQN()
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:
state, _ = env.reset()
score = 0
done = False
while not done:
action = model.choose_action(state)
next_state, reward, done, truncated,_ = env.step(action)
done = done or truncated
model.store_transition(state, action, reward, next_state, done)
state = next_state
model.learn()
score += reward
env.render()
scores.append(score)
model.epsilon = max(epsilon_end, model.epsilon * epsilon_dacay)
pbar.set_postfix(ep=episode, score=score, avg100=np.mean(scores[-100:]), ε=model.epsilon)
torch.save(model.q_net.state_dict(), "../model/cartpole.pt")
print(scores)
plt.plot(scores)
plt.show()

完整程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import gymnasium as gym, torch, torch.nn as nn, numpy as np, random, matplotlib.pyplot as plt
from collections import deque

env = gym.make("CartPole-v1")
# env = gym.make("CartPole-v1", render_mode="human")
state_dim = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.n
# print(state_dim, action_dim)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
class Qnet(nn.Module):
def __init__(self, hidden_dim = 128):
super(Qnet, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)

def forward(self, x):
return self.net(x)

class ReplayBuffer(object):
def __init__(self, max_size = 50000):
self.max_size = max_size
self.buffer = deque(maxlen = max_size)

def add(self, state, action, reward, next_state, done):
if self.__len__() >= self.max_size:
self.buffer.popleft()
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size, device = 'cpu'):
indices = np.random.choice(len(self.buffer), batch_size, replace=True)
batch = [self.buffer[i] for i in indices]
states, actions, rewards, next_states, dones = zip(*batch)
return (torch.FloatTensor(states).to(device),
torch.LongTensor(actions).to(device),
torch.FloatTensor(rewards).to(device),
torch.FloatTensor(next_states).to(device),
torch.FloatTensor(dones).to(device))

def __len__(self):
return len(self.buffer)

class DQN():
def __init__(self, lr = 3e-4,gamma = 0.98, epsilon = 0.1, batch_size = 128, update_epochs = 50):
self.q_net = Qnet()
self.target_q_net = Qnet()
self.target_q_net.load_state_dict(self.q_net.state_dict())
self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr)
self.gamma = gamma
self.epsilon = epsilon
self.batch_size = batch_size
self.update_epochs = update_epochs
self.loss = nn.MSELoss()
self.memory = ReplayBuffer()
self.learnstep = 0

def choose_action(self, state):
state = torch.from_numpy(state).float()
state = state.unsqueeze(0)
if np.random.random() > self.epsilon:
action_values = self.q_net(state)
action = torch.argmax(action_values).item()
else:
action = np.random.randint(0, action_dim)
return action

def store_transition(self, state, action, reward, next_state, done):
self.memory.add(state, action, reward, next_state, done)

def learn(self):
if len(self.memory) < self.batch_size:
return
# 批量计算Q(s,a)
states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
q_values = self.q_net(states)
next_q_values = self.target_q_net(next_states)
q_sa = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
target = rewards + self.gamma * next_q_values.max(1)[0].detach() * (1 - dones)
# 计算损失并反向传播
loss = self.loss(q_sa, target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 目标网络更新
self.learnstep += 1
if self.learnstep % self.update_epochs == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict())

from tqdm import tqdm
episodes = 1000
epsilon_dacay = 0.995
epsilon_start = 1
epsilon_end = 0.05
scores = []
model = DQN()
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:
state, _ = env.reset()
score = 0
done = False
while not done:
action = model.choose_action(state) # 根据杆子角度简单决策
next_state, reward, done, truncated,_ = env.step(action)
done = done or truncated
model.store_transition(state, action, reward, next_state, done)
state = next_state
model.learn()
score += reward
env.render()
scores.append(score)
model.epsilon = max(epsilon_end, model.epsilon * epsilon_dacay)
pbar.set_postfix(ep=episode, score=score, avg100=np.mean(scores[-100:]), ε=model.epsilon)
torch.save(model.q_net.state_dict(), "../model/cartpole.pt")
print(scores)
plt.plot(scores)
plt.show()

强化学习入门-1(DDQN)
http://example.com/2025/10/05/ReinforcementLearning-1/
Author
John Doe
Posted on
October 5, 2025
Licensed under