强化学习入门-4(PPO)

强化学习项目-4-LunarLander-v3(PPO)

环境

本环境是OpenAI Gym提供的一个经典控制环境。

官网链接:https://gymnasium.farama.org/environments/box2d/lunar_lander/

操作:

  • 0:什么都不做
  • 1:左侧推进器:推动着陆器向右移动
  • 2:右侧推进器:推动着陆器向左移动
  • 3:主引擎:推动着陆器向上移动

对应状态向量

$$
s = \left[
\begin{aligned}
x \\
y \\
\dot{x} \\
\dot{y} \\
\theta \\
\dot{\theta} \\
l \\
r
\end{aligned}
\right]
$$

  • $x, y$ :横坐标和纵坐标
  • $\dot{x}, \dot{y}$ :在横坐标和纵坐标上的移动速度
  • $\theta$ : 着陆器机身的角度
  • $\dot{\theta}$ : 着陆器机身的角度的变化速度(角速度)
  • $l, r$ :左右腿是否接触地面

奖励函数:

  • 成功抵达平台:$100 \sim 140$
  • 朝向或远离平台情况:靠近加分,远离扣分
  • 坠毁:$-100$
  • 软着陆: $+100$
  • 腿着陆:$+10$
  • 使用主引擎一次:$-0.3$
  • 使用单侧推进器一次(左右推进器之一):$-0.03$

引入环境

下载包

1
pip install gymnasium

导入

1
2
3
4
5
import gymnasium as gym
env = gym.make("LunarLander-v3", render_mode="human")
# 获取状态维度和动作维度
state_dim = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.n

PPO算法

策略裁切

PPO算法的核心思想,用于约束新旧策略的差异幅度。

代码实现

1
2
3
4
5
6
# 对数概率计算比例
ratio = torch.exp(new_log_probs - mb_old_log_probs)
surr1 = ratio * mb_advantages
clipped_ratio = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon)
surr2 = clipped_ratio * mb_advantages
actor_loss = -torch.min(surr1, surr2).mean()

损失函数

PPO算法的损失函数如下:

  • $L(\theta, \phi) = \underbrace{L^{\text{CLIP}}(\theta)}{\text{策略损失 (Actor Loss)}} - c_1 \underbrace{L^{\text{VF}}(\phi)}{\text{价值损失 (Critic Loss)}} + c_2 \underbrace{H(\pi_{\theta})}_{\text{熵损失 (Entropy Loss)}}$

注: 由于训练策略梯度时我们期待表现好的动作被选择的概率更高,因此此时执行的是梯度上升。

熵损失

  • $H(\pi_{\theta}) = -\sum\limits_{a}\pi_{\theta}(a|s)log\pi_{\theta}(a|s)$

PPO算法中,我们希望最大化$L(\theta, \phi)$,则表示我们希望鼓励高熵,因此熵函数被加到总损失中。

广义优势估计(Generalized Advantage Estimation, GAE)

为了平衡方差和偏差,这里采用双重加权的GAE:

  • $A_{t}^{GAE} = \delta_{t} + (\gamma\lambda)A_{t + 1}^{GAE}$
  • 其中:$\delta_{t} = R_{t} + \gamma V_{t + 1} \cdot (1 - dones) - V_{t}$

可以看到这里每一个优势均依赖下一步的优势,因此需要完整收集整个智能体与环境交互的轨迹,通过从后往前的方式计算出每一步的优势

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
def GAE(self, states, rewards, next_states, dones):
advantage = torch.zeros_like(rewards)
values = self.critic(states).detach()
next_values = self.critic(next_states).detach()
last_advantage = 0
for T in reversed(range(states.shape[0])):
td_target = rewards[T] + self.gamma * next_values[T] * (1 - dones[T])
td_delta = td_target - values[T]
advantage[T] = td_delta + self.lamda * self.gamma * (1 - dones[T]) * last_advantage
last_advantage = advantage[T]
returns = advantage + values
return advantage, returns

注: 这里采集的数据不一定是单轮交互的结果,可能是多轮交互的结果,因此计算要乘上$(1 - dones[T])$,保证在该轮交互结束后的数据清空

Actor-Critic

这里不做修改,与下文结构一致

AC算法

PPO训练

将$T$步数据打乱后小batch进行训练$K$次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def train(self, states, actions, rewards, next_states, dones, old_log_probs):
states = torch.FloatTensor(np.array(states)).to(self.device)
actions = torch.LongTensor(np.array(actions)).view(-1, 1).to(self.device)
rewards = torch.FloatTensor(np.array(rewards)).view(-1, 1).to(self.device)
next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
dones = torch.FloatTensor(np.array(dones)).view(-1, 1).to(self.device)
old_log_probs = torch.FloatTensor(np.array(old_log_probs)).view(-1, 1).to(self.device)

advantages, returns = self.GAE(states, rewards, next_states, dones)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-5)

batch_size = 64
data_length = states.size(0)

for _ in range(self.K_epochs):
# 生成随机索引以进行采样 (Shuffle)
indices = torch.randperm(data_length).to(self.device)

# 小批量迭代 (Mini-batch Sampling)
for start_index in range(0, data_length, batch_size):
sample_indices = indices[start_index: start_index + batch_size]
mb_states = states[sample_indices]
mb_actions = actions[sample_indices]
mb_old_log_probs = old_log_probs[sample_indices]
mb_advantages = advantages[sample_indices]
mb_returns = returns[sample_indices]
probs = self.actor(mb_states)
dist = Categorical(probs)
new_log_probs = dist.log_prob(mb_actions.squeeze(-1)).view(-1, 1)

ratio = torch.exp(new_log_probs - mb_old_log_probs)

surr1 = ratio * mb_advantages
clipped_ratio = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon)
surr2 = clipped_ratio * mb_advantages
actor_loss = -torch.min(surr1, surr2).mean()

current_values = self.critic(mb_states)
critic_loss = self.c1_vf * nn.functional.mse_loss(current_values, mb_returns)

entropy_loss = -self.c2_entropy * dist.entropy().mean()
actor_loss += entropy_loss

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

PPO类完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from torch import nn
import torch
from torch.distributions import Categorical
import numpy as np


class Actor(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim):
super(Actor, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, action_dim), nn.Softmax(dim=-1)
)

def forward(self, x):
return self.net(x)


class Critic(nn.Module):
def __init__(self, state_dim, hidden_dim):
super(Critic, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, 1)
)

def forward(self, x):
return self.net(x)


class PPO():
def __init__(self, env, hidden_dim, actor_lr, critic_lr, K_opoch):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tau = 0.001
self.gamma = 0.99
self.lamda = 0.95
self.epsilon = 0.2
self.c1_vf = 0.5
self.c2_entropy = 0.01
self.K_epochs = K_opoch
self.state_dim = env.observation_space.shape[0]
self.action_dim = env.action_space.n
self.hidden_dim = hidden_dim
self.actor = Actor(self.state_dim, self.action_dim, self.hidden_dim).to(self.device)
self.critic = Critic(self.state_dim, self.hidden_dim).to(self.device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)

def select_action(self, state):
state = torch.from_numpy(state).float().to(self.device)
with torch.no_grad():
probs = self.actor(state)
disk = Categorical(probs)
action = disk.sample()
log_prob = disk.log_prob(action)
return action.item(), log_prob.item()

def GAE(self, states, rewards, next_states, dones):
advantage = torch.zeros_like(rewards)
values = self.critic(states).detach()
next_values = self.critic(next_states).detach()
last_advantage = 0
for T in reversed(range(states.shape[0])):
td_target = rewards[T] + self.gamma * next_values[T] * (1 - dones[T])
td_delta = td_target - values[T]
advantage[T] = td_delta + self.lamda * self.gamma * (1 - dones[T]) * last_advantage
last_advantage = advantage[T]
returns = advantage + values
return advantage, returns

def train(self, states, actions, rewards, next_states, dones, old_log_probs):
states = torch.FloatTensor(np.array(states)).to(self.device)
actions = torch.LongTensor(np.array(actions)).view(-1, 1).to(self.device)
rewards = torch.FloatTensor(np.array(rewards)).view(-1, 1).to(self.device)
next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
dones = torch.FloatTensor(np.array(dones)).view(-1, 1).to(self.device)
old_log_probs = torch.FloatTensor(np.array(old_log_probs)).view(-1, 1).to(self.device)

advantages, returns = self.GAE(states, rewards, next_states, dones)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-5)

batch_size = 64
data_length = states.size(0)

for _ in range(self.K_epochs):
indices = torch.randperm(data_length).to(self.device)
for start_index in range(0, data_length, batch_size):
sample_indices = indices[start_index: start_index + batch_size]
mb_states = states[sample_indices]
mb_actions = actions[sample_indices]
mb_old_log_probs = old_log_probs[sample_indices]
mb_advantages = advantages[sample_indices]
mb_returns = returns[sample_indices]
probs = self.actor(mb_states)
dist = Categorical(probs)
new_log_probs = dist.log_prob(mb_actions.squeeze(-1)).view(-1, 1)
ratio = torch.exp(new_log_probs - mb_old_log_probs)
surr1 = ratio * mb_advantages
clipped_ratio = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon)
surr2 = clipped_ratio * mb_advantages
actor_loss = -torch.min(surr1, surr2).mean()

current_values = self.critic(mb_states)
critic_loss = self.c1_vf * nn.functional.mse_loss(current_values, mb_returns)

entropy_loss = -self.c2_entropy * dist.entropy().mean()

actor_loss += entropy_loss

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

收集数据并训练

每一轮训练均收集满$1024$步数据后进行,同时若该轮游戏未结束,则在本轮训练后使用更新后的模型继续进行

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import gymnasium as gym, torch, numpy as np, matplotlib.pyplot as plt
from DRL.PPO import PPO

from tqdm import tqdm
env = gym.make('LunarLander-v3')

scores = []
model = PPO(env, 256, 1e-4, 3e-4, 4)
T_steps = 1024
episodes = 2000
state, _ = env.reset()
pbar = tqdm(range(episodes), desc="Training")
score = 0
for episode in pbar:
done = False
states, actions, rewards, dones, next_states, old_log_probs = [], [], [], [], [], []
for i in range(T_steps):
action, old_log_prob = model.select_action(state)
next_state, reward, done, truncated, _ = env.step(action)
done = done or truncated
score += reward
states.append(state)
actions.append(action)
rewards.append(reward)
old_log_probs.append(old_log_prob)
next_states.append(next_state)
dones.append(done)
state = next_state
if done:
state, _ = env.reset()
pbar.set_postfix(ep=episode, score=f"{score:.2f}", avg100=f"{np.mean(scores[-100:]):.2f}")
score = 0
model.train(states, actions, rewards, next_states, dones, old_log_probs)

plt.plot(scores)
plt.xlabel("Episode")
plt.ylabel("Score")
plt.show()

强化学习入门-4(PPO)
http://example.com/2025/10/25/ReinforcementLearning-4/
Author
John Doe
Posted on
October 25, 2025
Licensed under