强化学习入门-7(DDPG)

强化学习项目-7-LunarLanderContinuous-v3(DDPG)

环境

本项目使用的是OpenAI Gym提供的经典控制环境的连续动作版本。与PPO笔记中的离散版本不同,这里的动作空间是连续的数值。

官网链接:https://gymnasium.farama.org/environments/box2d/lunar_lander/

动作空间 (Continuous)

动作是一个维度为2的向量 $a \in [-1, 1]^2$:

  1. 主引擎 (Main Engine):
    • $-1 \sim 0$: 引擎关闭
    • $0 \sim +1$: 引擎开启,数值越大推力越大(从50%到100%功率)
  2. 侧向推进器 (Side Engines):
    • $-1 \sim -0.5$: 右侧推进器开启(推向左)
    • $-0.5 \sim 0.5$: 关闭
    • $0.5 \sim 1$: 左侧推进器开启(推向右)

状态向量

与离散版一致,维度为8:
$$
s = [x, y, \dot{x}, \dot{y}, \theta, \dot{\theta}, l, r]^T
$$

奖励函数

  • 逻辑与离散版基本一致(靠近平台加分、坠毁扣分等)。
  • 区别:连续动作版本中,喷射燃料的扣分是根据动作的连续数值计算的,因此更鼓励“精准控制”油门,而非频繁的开关。

引入环境

注意需要指定 continuous=True

1
2
3
4
5
6
7
import gymnasium as gym
# 必须指定 continuous=True
env = gym.make("LunarLander-v3", continuous=True, render_mode="human")

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0]) # 通常为 1.0

DDPG 算法

DDPG (Deep Deterministic Policy Gradient) 是一种基于 Actor-Critic 架构的算法,专门用于解决 连续动作空间 的问题。它结合了 DQN 的思想(经验回放、目标网络)和确定性策略梯度。

核心组件

  1. Actor 网络 ($\mu$): 输入状态 $s$,直接输出确定的动作值 $a$。
  2. Critic 网络 ($Q$): 输入状态 $s$ 和动作 $a$,输出该动作的价值 $Q(s, a)$。
  3. 目标网络 (Target Networks): $\mu’$ 和 $Q’$,用于计算TD目标,保持训练稳定。
  4. 经验回放池 (Replay Buffer): 存储 $(s, a, r, s’, done)$,打破数据相关性。

损失函数

1. Critic 损失 (Value Loss)

Critic 的目标是最小化预测的 Q 值与 TD Target 之间的均方误差:

$$ L = \frac{1}{N} \sum (y_i - Q(s_i, a_i|\theta^Q))^2 $$

其中目标值 $y_i$ 由目标网络计算:
$$ y_i = r_i + \gamma Q’(s_{i+1}, \mu’(s_{i+1}|\theta^{\mu’})|\theta^{Q’}) \cdot (1 - d_i) $$

2. Actor 损失 (Policy Loss)

Actor 的目标是最大化 Critic 对其输出动作的评分。在梯度下降中,我们通过最小化 Q 值的负数来实现:

$$ J(\theta^\mu) = - \frac{1}{N} \sum Q(s_i, \mu(s_i|\theta^\mu)|\theta^Q) $$

探索策略 (Exploration)

由于 DDPG 是确定性策略,为了让智能体探索环境,我们在训练时给动作添加噪声:
$$ a_{exec} = \text{clip}(\mu(s) + \mathcal{N}, -a_{max}, a_{max}) $$
本项目中使用 高斯噪声 (Gaussian Noise) ,并随训练进行衰减。

高斯噪声代码(gemini3生成):

1
2
3
4
5
6
7
8
class GaussianNoise:
def __init__(self, action_dim, sigma=0.1):
self.action_dim = action_dim
self.sigma = sigma # 标准差,控制噪声大小

def sample(self):
# 生成标准正态分布噪声 * sigma
return np.random.normal(0, self.sigma, size=self.action_dim)

软更新 (Soft Update)

不同于 DQN 的硬更新,DDPG 采用软更新来缓慢更新目标网络参数:
$$ \theta’ \leftarrow \tau \theta + (1 - \tau) \theta’ $$
其中 $\tau$ 通常取极小值 (如 0.005)。

代码实现

模型定义 (Actor & Critic)

注意这里Critic网络输出的是$Q(s, a)$,因此输入层节点个数为状态与动作维度之和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from torch import nn
import torch
import torch.nn.functional as F

class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action, hidden_dim=256):
super(Actor, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Tanh(),
)
self.max_action = max_action

def forward(self, state):
return self.net(state) * self.max_action

def act(self, state):
return self.forward(state)

class Critic(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(Critic, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
)

def forward(self, state, action):
return self.net(torch.cat([state, action], dim=1))

DDPG 类完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DDPG:
def __init__(self, state_dim, action_dim, max_action, device = 'cuda' if torch.cuda.is_available() else 'cpu', hidden_dim = 256, batch_size = 256, gamma = 0.99,tau = 0.001, replay_buffer_size = 5000, actor_lr = 3e-4, critic_lr = 3e-4):
self.device = device
self.batch_size = batch_size
self.gamma = gamma
self.tau = tau
self.max_action = max_action

self.replay_buffer = ReplayBuffer(state_dim, action_dim, replay_buffer_size)

self.actor = Actor(state_dim, action_dim, max_action, hidden_dim).to(self.device)
self.target_actor = Actor(state_dim, action_dim, max_action, hidden_dim).to(self.device)
self.target_actor.load_state_dict(self.actor.state_dict())
self.critic = Critic(state_dim, action_dim, hidden_dim).to(self.device)
self.target_critic = Critic(state_dim, action_dim, hidden_dim).to(self.device)
self.target_critic.load_state_dict(self.critic.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

def act(self, state):
state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
with torch.no_grad():
action = self.actor(state)
return action.cpu().numpy().flatten()

def store_transition(self, state, action, reward, next_state, done):
self.replay_buffer.add(state, action, reward, next_state, done)

def sample(self):
return self.replay_buffer.sample(self.batch_size)

def train(self):
if self.replay_buffer.size < self.batch_size: return
states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size, device=self.device)
with torch.no_grad():
next_actions = self.target_actor(next_states)
td_targets = rewards + self.gamma * self.target_critic(next_states, next_actions) * (1 - dones)
current_q = self.critic(states, actions)
critic_loss = F.mse_loss(current_q, td_targets)
actor_loss = -self.critic(states, self.actor(states)).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):
target_param.data.copy_(param.data * self.tau + target_param.data * (1 - self.tau))
for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
target_param.data.copy_(param.data * self.tau + target_param.data * (1 - self.tau))

def save(self, filename):
"""
保存所有网络参数和优化器状态到一个文件
"""
torch.save({
'actor': self.actor.state_dict(),
'critic': self.critic.state_dict(),
'target_actor': self.target_actor.state_dict(),
'target_critic': self.target_critic.state_dict(),
'actor_optimizer': self.actor_optimizer.state_dict(),
'critic_optimizer': self.critic_optimizer.state_dict(),
}, filename)

def load(self, filename):
"""
加载模型参数
"""
# map_location 确保在 CPU 机器上也能加载 GPU 训练的模型,反之亦然
checkpoint = torch.load(filename, map_location=self.device)

self.actor.load_state_dict(checkpoint['actor'])
self.critic.load_state_dict(checkpoint['critic'])
self.target_actor.load_state_dict(checkpoint['target_actor'])
self.target_critic.load_state_dict(checkpoint['target_critic'])
self.actor_optimizer.load_state_dict(checkpoint['actor_optimizer'])
self.critic_optimizer.load_state_dict(checkpoint['critic_optimizer'])

训练流程

训练中加入了高斯噪声衰减机制和 预热(Warmup 阶段,以平衡探索与利用。

预热:设置在前5000步内使用系统的随机动作,并且不进行模型训练。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import gymnasium as gym, torch, numpy as np, matplotlib.pyplot as plt
from ALG.DRL.DDPG import DDPG

from tqdm import tqdm

from Utils.Noise import GaussianNoise
from Utils.Smooth import Smooth

env = gym.make('LunarLander-v3', continuous=True, render_mode=None)
state_dim = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
model = DDPG(state_dim, action_dim, max_action, replay_buffer_size=100000, tau = 0.005, actor_lr=1e-4, batch_size= 512)

noise = GaussianNoise(action_dim)
scores = []
episodes = 3000
step = 0
warmup_steps = 5000
noise_decay = 0.999
min_noise = 0.01
max_value = 200
update_interval = 4
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:
if step > warmup_steps: noise.sigma = max(min_noise, noise.sigma * noise_decay)
done = False
state, _ = env.reset()
score = 0
while not done:
step += 1
if step <= warmup_steps:
action = env.action_space.sample()
else:
action = model.act(state)
action = (action + noise.sample()).clip(-max_action, max_action)
next_state, reward, termination, truncated, _ = env.step(action)
done = termination or truncated
score += reward
model.store_transition(state, action, reward, next_state, done)
state = next_state
if step > warmup_steps and step % update_interval == 0: model.train()
scores.append(score)
pbar.set_postfix(ep=episode, score=f"{score:.2f}", avg100=f"{np.mean(scores[-100:]):.2f}")

if np.mean(scores[-100:]) > max_value:
model.save("../../model/lunarLanderContinuous-DDPG.pth")


smooth = Smooth(scores)
smooth.show(title = "DDPG in LunarLander-v3-continuous")

训练结果

经过参数调整(增大 Batch Size 至 512,增大 Buffer 至 100000),模型成功收敛。

可以看到,模型在前期(约前800轮)处于探索阶段,分数较低;在预热结束且 Buffer 充足后,分数迅速上升,最终稳定在 200 分以上,实现了平稳着陆。


强化学习入门-7(DDPG)
http://example.com/2025/12/14/ReinforcementLearning-7/
作者
ajls
发布于
2025年12月14日
许可协议