强化学习入门-9(SAC)

强化学习项目-9-HalfCheetah-v5(SAC)

环境

本项目使用的是 Gymnasium (MuJoCo) 提供的 HalfCheetah-v5 环境。这是一个经典的机器人控制任务,目标是让一只两足(半猎豹)机器人尽可能快地向前奔跑。

官网链接:https://gymnasium.farama.org/environments/mujoco/half_cheetah/

动作空间 (Continuous)

动作是一个维度为 6 的连续向量 $a \in [-1, 1]^6$,分别控制猎豹身体的 6 个关节(大腿、小腿、脚掌等)的扭矩。

状态向量

状态空间的维度为 17,包含:

  • 身体各个部位的位置(Position)
  • 各个关节的角度(Angle)
  • 各个部位的速度(Velocity)
  • 各个关节的角速度(Angular Velocity)

奖励函数

奖励主要由以下几部分组成:

  1. 前进奖励:向前移动的距离越远,奖励越大。
  2. 控制惩罚:为了避免剧烈抖动,动作幅度过大会受到惩罚。

引入环境

1
2
3
4
5
6
7
8
import gymnasium as gym, torch, numpy as np

env = gym.make('HalfCheetah-v5', render_mode=None)
eval_env = gym.make('HalfCheetah-v5', render_mode=None)

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])

SAC 算法

SAC (Soft Actor-Critic) 是一种基于 最大熵强化学习 (Maximum Entropy RL) 的算法。与 DDPG 不同,SAC 在最大化预期累积奖励的同时,还试图最大化策略的熵。这使得策略具有更好的探索能力和鲁棒性。

核心组件

  1. Actor 网络 : 输出动作的分布(通常是高斯分布的均值和标准差),是一个随机策略。
  2. Critic 网络 : 使用两个 Q 网络来减小价值高估的问题(Clipped Double Q-Learning)。
  3. 温度系数 : 控制熵正则化项的权重,SAC 可以通过自动调整 来动态平衡探索与利用。

损失函数

1. Critic 损失

Critic 的目标是最小化 Bellman 误差。目标值 的计算包含了熵项:

$$ y = r + \gamma ( \min_{j=1,2} Q_{\text{target}, j}(s’, a’) - \alpha \log \pi(a’|s’) ) $$

损失函数为:
$$ L_Q = \frac{1}{2} \sum_{i=1,2} (Q_i(s, a) - y)^2 $$

2. Actor 损失

Actor 的目标是最大化 Q 值并最大化熵(即最小化):

$$ J_{\pi} = \mathbb{E} * {a \sim \pi} [ \alpha \log \pi (a|s) - \min * {j=1,2} Q_{j}(s, a) ] $$

为了能够反向传播梯度,SAC 使用了 重参数化技巧 (Reparameterization Trick)
$$ a = \tanh(\mu(s) + \sigma(s) \cdot \epsilon), \quad \epsilon \sim \mathcal{N}(0, 1) $$

3. Alpha 损失 (自动熵调节)

通过梯度下降自动调整温度系数 ,以维持一个目标熵 :
$$ J(\alpha) = \mathbb{E}_{a \sim \pi} [ -\alpha (\log \pi(a|s) + \bar{\mathcal{H}}) ] $$

代码实现

模型定义 (Actor & Critic)

Actor 输出均值和标准差,用于构建高斯分布;Critic 输出 Q 值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from torch import nn, optim
import torch
import torch.nn.functional as F
import numpy as np

from Utils.ReplayBuffer import ReplayBuffer


class Actor(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim = 256):
super(Actor, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, action_dim * 2),
)


def forward(self, x):
return self.net(x)



class Critic(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(Critic, self).__init__()
self.Q1 = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
)
self.Q2 = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
)

def forward(self, state, action):
return self.Q1(torch.cat([state, action], dim=1)), self.Q2(torch.cat([state, action], dim=1))

SAC 类完整代码

实现了包含熵自动调节和重参数化采样的 SAC 逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class SAC:
def __init__(self, state_dim, action_dim, hidden_dim=256, actor_lr=3e-4, critic_lr=3e-4, alpha_lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2, device='cuda' if torch.cuda.is_available() else 'mps' if torch.mps.is_available() else 'cpu', replay_buffer_capacity=10000):
self.alpha_lr = alpha_lr
self.gamma = gamma
self.tau = tau
self.device = device
self.target_entropy = -action_dim
self.replay_buffer = ReplayBuffer(state_dim, action_dim, replay_buffer_capacity)
self.actor = Actor(state_dim, action_dim, hidden_dim).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic = Critic(state_dim, action_dim, hidden_dim).to(device)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
self.target_critic = Critic(state_dim, action_dim, hidden_dim).to(device)
self.target_critic.load_state_dict(self.critic.state_dict())
self.log_std_min = -20
self.log_std_max = 2
self.log_alpha = torch.tensor(np.log(alpha), requires_grad=True, device=device)
self.alpha_optimizer = optim.Adam([self.log_alpha], lr=alpha_lr)


@property
def alpha(self):
return self.log_alpha.exp()

def store_transition(self, state, action, reward, next_state, done):
self.replay_buffer.add(state, action, reward, next_state, done)

def act(self, obs, evaluate=False):
if isinstance(obs, np.ndarray):
obs = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
pred = self.actor(obs)
action_mean, action_log_std = torch.chunk(pred, 2, dim=-1)
if evaluate:
return torch.tanh(action_mean), None
log_std = torch.clamp(action_log_std, self.log_std_min, self.log_std_max)
std = torch.exp(log_std)
dist = torch.distributions.Normal(action_mean, std)
normal_sample = dist.rsample()
action = torch.tanh(normal_sample)
log_prob = dist.log_prob(normal_sample)
correction = 2. * (np.log(2.) - normal_sample - F.softplus(-2. * normal_sample))
log_prob -= correction
log_prob = log_prob.sum(dim=-1, keepdim=True)

return action, log_prob

def train(self, batch_size = 512):
if self.replay_buffer.size < batch_size: return
states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)
with torch.no_grad():
next_actions, new_log_prob = self.act(next_states)
target_Q1, target_Q2 = self.target_critic(next_states, next_actions)
target_Q = torch.min(target_Q1, target_Q2)
y = rewards + (1 - dones) * self.gamma * (target_Q - self.alpha.item() * new_log_prob)
curr_Q1, curr_Q2 = self.critic(states, actions)
critic_loss = F.mse_loss(curr_Q1, y) + F.mse_loss(curr_Q2, y)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
self.update_target()

new_actions, log_prob = self.act(states)
q1, q2 = self.critic(states, new_actions)
q_min = torch.min(q1, q2)
actor_loss = (self.alpha.item() * log_prob - q_min).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()

self.alpha_optimizer.zero_grad()
alpha_loss.backward()
self.alpha_optimizer.step()

def update_target(self):
for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

def save(self, filename):
"""
保存所有状态,确保既能用于测试,也能用于恢复训练
"""
torch.save({
# --- 模型参数 (测试/推理 必须) ---
'actor': self.actor.state_dict(),
'critic': self.critic.state_dict(),
'target_critic': self.target_critic.state_dict(), # 恢复训练需要
'log_alpha': self.log_alpha.detach(), # 恢复训练需要

# --- 优化器状态 (恢复训练 必须) ---
'actor_optimizer': self.actor_optimizer.state_dict(),
'critic_optimizer': self.critic_optimizer.state_dict(),
'alpha_optimizer': self.alpha_optimizer.state_dict(),
}, filename)

def load(self, filename, evaluate=False):
"""
加载模型
:param filename: 模型路径
:param evaluate:
True -> 仅加载 Actor 和 Critic (用于测试/验证)
False -> 加载所有优化器和参数 (用于继续训练)
"""
checkpoint = torch.load(filename, map_location=self.device)

# 1. 加载网络参数 (无论训练还是测试都需要 Actor)
self.actor.load_state_dict(checkpoint['actor'])
self.critic.load_state_dict(checkpoint['critic'])

# 2. 如果是测试模式,加载到这里就够了
if evaluate:
# 设为评估模式 (虽然 SAC Actor 通常没有 Dropout/BatchNorm,但这是好习惯)
self.actor.eval()
self.critic.eval()
print(f"Loaded model from {filename} (Evaluation Mode)")
return

# 3. 如果是继续训练模式,必须加载优化器和目标网络
self.target_critic.load_state_dict(checkpoint['target_critic'])

# 恢复 log_alpha 的值 (关键!否则 Alpha 会重置)
# 必须使用 .data.copy_ 来保持 requires_grad=True 的属性
self.log_alpha.data.copy_(checkpoint['log_alpha'])

# 加载优化器
self.actor_optimizer.load_state_dict(checkpoint['actor_optimizer'])
self.critic_optimizer.load_state_dict(checkpoint['critic_optimizer'])
self.alpha_optimizer.load_state_dict(checkpoint['alpha_optimizer'])

# 恢复训练模式
self.actor.train()
self.critic.train()
print(f"Loaded model from {filename} (Resume Training Mode)")

训练流程

训练代码包含了独立的 evaluate_policy 函数,用于在确定性策略下评估模型性能(每20轮进行一次评估,且评估取10次成绩的均值),并保存最佳模型。

总共训练4000轮,同时每4步进行一次模型训练,并在训练时将奖励缩放到$\frac{1}{5}$。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import gymnasium as gym, torch, numpy as np
from ALG.DRL.SAC import SAC
from tqdm import tqdm
from Utils.Smooth import Smooth

torch.set_float32_matmul_precision('high')

env = gym.make('HalfCheetah-v5', render_mode=None)
eval_env = gym.make('HalfCheetah-v5', render_mode=None)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
model = SAC(state_dim, action_dim, hidden_dim=1024, replay_buffer_capacity = 1000000)
episodes = 4000
warm_up = 20000
def evaluate_policy(agent, env, eval_episodes=10):
avg_reward = 0.
for _ in range(eval_episodes):
state, _ = env.reset()
done = False
while not done:
# 关键:evaluate=True (确定性策略,无噪声)
with torch.no_grad():
action, _ = agent.act(state, evaluate=True)
action = action.detach().cpu().numpy()[0] * max_action
state, reward, terminated, truncated, _ = env.step(action)
avg_reward += reward
done = terminated or truncated
return avg_reward / eval_episodes
scores = []
eval_scores = []
train_interval = 4
base_score = 12000
pbar = tqdm(range(episodes), desc="Training")
step = 0
for episode in pbar:
done = False
state, _ = env.reset()
score = 0
while not done:
step += 1
if step <= warm_up:
action = env.action_space.sample()
else:
with torch.no_grad():
action, _ = model.act(state)
action = action.detach().cpu().numpy()[0] * max_action
next_state, reward, termination, truncated, _ = env.step(action)
done = termination or truncated
score += reward
model.store_transition(state, action, reward / 5.0, next_state, termination)
state = next_state
if step > warm_up and step % train_interval ==0:
model.train()

current_eval_score = 0
if (episode + 1) % 20 == 0 and step >= warm_up:
current_eval_score = evaluate_policy(model, eval_env)
eval_scores.append(current_eval_score)

# 保存最佳模型 (基于评估分数,而不是训练分数)
if current_eval_score > base_score + 50:
base_score = current_eval_score
model.save(f"../../model/Half Cheetah-SAC-Best.pth")
tqdm.write(f"🔥 New Best Eval Score: {base_score:.2f} (Saved)")

scores.append(score)
pbar.set_postfix(ep=episode, score=f"{score:.2f}", avg100=f"{np.mean(scores[-100:]):.2f}")

训练结果

使用了较大的网络 (hidden_dim=1024) 和 100万容量的经验池。训练过程中对 Reward 进行了缩放 (/ 5.0) 以稳定训练。

图中红色曲线 (eval_scores) 代表确定性策略的评估得分,蓝色曲线代表训练过程中的探索得分。可以看到模型在 2000 轮左右达到较高性能,最终稳定在 16000-17000 分左右。


强化学习入门-9(SAC)
http://example.com/2025/12/28/ReinforcementLearning-9/
作者
ajls
发布于
2025年12月28日
许可协议