强化学习入门-8(MADDPG)

强化学习项目-8-SimpleTag-v3(MADDPG)

环境

本项目使用的是 PettingZoo 提供的 MPE (Multi-Agent Particle Environments) 中的 Simple Tag 环境。这是一个典型的“捕食者-猎物”(Predator-Prey)博弈场景。

官网链接:https://pettingzoo.farama.org/environments/mpe/simple_tag/

场景描述

  • 对抗方 (Adversaries/Predators) :多个捕食者(本代码中为3个)。目标是撞击(捕获)猎物。
  • 好的一方 (Good Agents/Prey) :单个猎物(本代码中为1个)。目标是躲避捕食者。
  • 障碍物 :环境中存在障碍物阻挡视线和移动。

动作空间 (Continuous)

这是一个连续动作空间环境。每个智能体的动作是一个向量,控制其在二维平面上的加速度或速度。

  • continuous_actions=True:动作在一定范围内连续取值,适合使用 DDPG/MADDPG 类算法。

观测空间

每个智能体只能看到局部的观测信息(Partial Observation),包括:

  • 自身的某种物理属性(速度、位置)。
  • 相对于地标(Landmarks)的相对位置。
  • 相对于其他智能体的相对位置(如果在探测范围内)。

奖励函数

  • 捕食者 :通过撞击猎物获得正向奖励,如果距离猎物越近也会有微小的奖励引导。
  • 猎物 :被撞击会获得负向奖励,离开边界也会受到惩罚。

MADDPG 算法

MADDPG (Multi-Agent Deep Deterministic Policy Gradient) 是 DDPG 在多智能体领域的扩展。它采用了 集中式训练,分布式执行 (Centralized Training, Decentralized Execution, CTDE) 的框架。

核心思想

在多智能体环境中,如果每个智能体单纯使用 DDPG,环境对于单个智能体来说是 非平稳的 (Non-stationary) (因为其他智能体的策略也在变)。

MADDPG 的解决方法:

  1. Actor (策略网络) :只接收智能体自己的局部观测 $o_i$ ,输出动作 $a_i$ 。这允许在测试时分布式执行。
  2. Critic (价值网络) :在训练时,Critic 可以接收 全局信息 ,即所有智能体的观测 $x = (o_1, \dots, o_N)$ 和所有智能体的动作 $a = (a_1, \dots, a_N)$。

损失函数

1. Critic 更新

每个智能体 $i$ 都有自己的 Critic,用于估计全局状态下的 Q 值 $Q_i^\pi(x, a_1, \dots, a_N)$。
损失函数为最小化 TD Error:

$$L(\theta_i) = \frac{1}{S} \sum_j (y^j - Q_i^\mu(x^j, a_1^j, \dots, a_N^j))^2$$

其中目标值 $y^j$:
$$y^j = r_i^j + \gamma Q_i^{\mu’}(x’^j, a_1’^j, \dots, a_N’^j)|_{a_k’=\mu_k’(o_k’)}$$

2. Actor 更新

Actor 的目标是最大化 Critic 的评分。注意,计算梯度时,我们保持其他智能体的动作不变,只针对当前智能体 $i$ 的参数求导:

$$\nabla_{\theta_i} J \approx \frac{1}{S} \sum_j \nabla_{\theta_i} \mu_i(o_i^j) \nabla_{a_i} Q_i^\mu(x^j, a_1^j, \dots, a_i, \dots, a_N^j)|_{a_i=\mu_i(o_i^j)}$$

算法流程

  1. 收集经验 $(x, a, r, x’)$ 存入 Replay Buffer。这里的 $x$ 包含所有智能体的观测,$a$ 包含所有智能体的动作。
  2. 从 Buffer 中采样。
  3. 使用目标网络计算 Target Q 值。
  4. 更新 Critic:最小化预测 Q 和 Target Q 的差距。
  5. 更新 Actor:利用 Critic 对动作的梯度来更新策略。
  6. 软更新目标网络参数。

代码实现

模型定义 (MADDPG.py)

该文件包含了 ActorCritic 网络定义,单个 MADDPGAgent 的逻辑,以及协调所有智能体的 MADDPG 主类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import random
from collections import deque

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from Utils.Noise import GaussianNoise

class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action, hidden_dim = 256):
super(Actor, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Sigmoid(),
)
self.max_action = max_action

def forward(self, state):
return self.net(state) * self.max_action


class Critic(nn.Module):
def __init__(self, critic_input_dim, hidden_dim = 256):
super(Critic, self).__init__()
self.net = nn.Sequential(
nn.Linear(critic_input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
)

def forward(self, obs, action):
return self.net(torch.cat((obs, action), dim=1))

class MADDPGAgent():
def __init__(self, obs_dim, action_dim, critic_input_dim, max_action, device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.mps.is_available() else 'cpu', actor_lr = 3e-4, critic_lr = 1e-4, hidden_dim = 256, tau = 0.001):
self.obs_dim = obs_dim
self.action_dim = action_dim
self.critic_input_dim = critic_input_dim
self.hidden_dim = hidden_dim
self.max_action = max_action
self.device = device
self.tau = tau
self.actor = Actor(self.obs_dim, self.action_dim, self.max_action, self.hidden_dim).to(device)
self.critic = Critic(self.critic_input_dim, self.hidden_dim).to(device)

self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

self.target_actor = Actor(self.obs_dim, self.action_dim, self.max_action, self.hidden_dim).to(device)
self.target_actor.load_state_dict(self.actor.state_dict())
self.target_critic = Critic(self.critic_input_dim, self.hidden_dim).to(device)
self.target_critic.load_state_dict(self.critic.state_dict())


def act(self, obs, noise_std=0.0):
obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).to(self.device)
with torch.no_grad():
action = self.actor(obs_tensor).cpu().numpy()[0]

noise = GaussianNoise(action_dim=self.action_dim, sigma=noise_std)
action = action + noise.sample()

return np.clip(action, 0, self.max_action).astype(np.float32)

def update_target(self):
for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)


class ReplayBuffer():
def __init__(self, max_size=10000):
self.memory = deque(maxlen=max_size)

def store(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))

def sample(self, batch_size=512):
if len(self.memory) < batch_size:
batch = random.sample(self.memory, len(self.memory))
else:
batch = random.sample(self.memory, batch_size)

obs_n, act_n, rew_n, next_obs_n, done_n = zip(*batch)

def transpose_stack(x):
return [np.stack(agent_data) for agent_data in zip(*x)]

return (
transpose_stack(obs_n),
transpose_stack(act_n),
transpose_stack(rew_n),
transpose_stack(next_obs_n),
transpose_stack(done_n)
)

def __len__(self):
return len(self.memory)




class MADDPG():
def __init__(self, env, n_agent, device = 'cuda' if torch.cuda.is_available() else 'cpu', actor_lr = 3e-4, critic_lr = 1e-4, hidden_dim = 256, batch_size = 512, gamma = 0.99, tau = 0.001, replay_buffer_size = 50000):
self.agents = {}
self.agent_names = env.agents
self.n_agent = n_agent
self.device = device
self.actor_lr = actor_lr
self.critic_lr = critic_lr
self.hidden_dim = hidden_dim
self.batch_size = batch_size
self.gamma = gamma
self.tau = tau
self.replay_buffer = ReplayBuffer(max_size=replay_buffer_size)

env.reset()
global_critic_dim = 0
for agent in self.agent_names:
obs_dim = env.observation_space(agent).shape[0]
act_dim = env.action_space(agent).shape[0]
global_critic_dim += obs_dim + act_dim

for agent_id in self.agent_names:
obs_dim = env.observation_space(agent_id).shape[0]
act_dim = env.action_space(agent_id).shape[0]
max_action = env.action_space(agent_id).high[0]

self.agents[agent_id] = MADDPGAgent(obs_dim, act_dim, global_critic_dim, max_action)

def store_transition(self, obs, action, reward, next_obs, done):
obs_n, act_n, reward_n, next_obs_n, done_n = [], [], [], [], []
for agent_id in self.agent_names:
obs_n.append(obs[agent_id])
act_n.append(action[agent_id])
reward_n.append(reward[agent_id])
next_obs_n.append(next_obs[agent_id])
done_n.append(done[agent_id])
self.replay_buffer.store(obs_n, act_n, reward_n, next_obs_n, done_n)

def step(self, obs, noise_std = 0.1):
actions = {}
for agent_id in self.agent_names:
actions[agent_id] = self.agents[agent_id].act(obs[agent_id], noise_std)
return actions

def train(self):
if self.replay_buffer.__len__() < self.batch_size: return
obs_n, act_n, reward_n, next_obs_n, done_n = self.replay_buffer.sample(batch_size=self.batch_size)
obs_n = [torch.tensor(o, dtype=torch.float32).to(self.device) for o in obs_n]
act_n = [torch.tensor(a, dtype=torch.float32).to(self.device) for a in act_n]
reward_n = [torch.tensor(r, dtype=torch.float32).unsqueeze(1).to(self.device) for r in reward_n]
next_obs_n = [torch.tensor(no, dtype=torch.float32).to(self.device) for no in next_obs_n]
done_n = [torch.tensor(d, dtype=torch.float32).unsqueeze(1).to(self.device) for d in done_n]
next_act_n = []
with torch.no_grad():
for i, agent_name in enumerate(self.agent_names):
agent = self.agents[agent_name]
target_act = agent.target_actor(next_obs_n[i])
next_act_n.append(target_act)
target_critic_obs = torch.cat(next_obs_n, dim=1)
target_critic_act = torch.cat(next_act_n, dim=1)

current_critic_obs = torch.cat(obs_n, dim=1)
current_critic_act = torch.cat(act_n, dim=1)

for i, agent_name in enumerate(self.agent_names):
agent = self.agents[agent_name]
with torch.no_grad():
target_q_next = agent.target_critic(target_critic_obs, target_critic_act)
target_q = reward_n[i] + (1 - done_n[i]) * self.gamma * target_q_next

current_q = agent.critic(current_critic_obs, current_critic_act)

critic_loss = F.mse_loss(current_q, target_q)

curr_pol_out = agent.actor(obs_n[i])
actor_input_act_n = [a.detach() for a in act_n]
actor_input_act_n[i] = curr_pol_out
critic_input_act_update = torch.cat(actor_input_act_n, dim=1)

actor_loss = -agent.critic(current_critic_obs, critic_input_act_update).mean()

agent.actor_optimizer.zero_grad()
actor_loss.backward()
agent.actor_optimizer.step()

agent.critic_optimizer.zero_grad()
critic_loss.backward()
agent.critic_optimizer.step()

agent.update_target()

训练流程 (simple_tag-MADDPG.py)

这里设置了预热步数(Warmup steps)来填充 Replay Buffer,并引入了噪声衰减机制以平衡探索和利用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from pettingzoo.mpe import simple_tag_v3
from tqdm import tqdm
import numpy as np

from ALG.MA.MADDPG import MADDPG
from Utils.Smooth import Smooth

env = simple_tag_v3.parallel_env(render_mode=None, continuous_actions=True)
env.reset()
agents = env.possible_agents
n_agents = len(agents)
max_action = float(env.action_space(agents[0]).high[0])
model = MADDPG(env, n_agents)

adv_scores = []
adv_size = 3
good_scores = []
good_size = 1
collisions = []
episodes = 3000
step = 0
warmup_steps = 5012
noise_std = 0.5
noise_decay = 0.9995
min_noise = 0.05
update_interval = 4
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:
if step > warmup_steps: noise_std = max(min_noise, noise_std * noise_decay)
obs, _ = env.reset()
adv_score = 0
good_score = 0
collision = 0
while True:
step += 1
if step <= warmup_steps:
action = {agent: env.action_space(agent).sample() for agent in agents}
else:
action = model.step(obs, noise_std)
next_obs, rewards, terminations, truncations, _ = env.step(action)
done = {agent: terminations[agent] or truncations[agent] for agent in agents}
done_flag = any(done.values())
for agent_id, reward in rewards.items():
if 'adversary' in agent_id:
adv_score += reward
if reward > 1:collision += 1
else:
good_score += reward
model.store_transition(obs, action, rewards, next_obs, done)
obs = next_obs
# if step > warmup_steps and step % update_interval == 0: model.train()
if step > warmup_steps: model.train()
if done_flag:
break
adv_score = adv_score / adv_size
good_score = good_score / good_size
adv_scores.append(adv_score)
good_scores.append(good_score)
collisions.append(collision)
pbar.set_postfix(ep=episode, adv=f"{adv_score:.2f}", avg_adv=f"{np.mean(adv_scores[-100:]):.2f}", good=f"{good_score:.2f}", avg_good=f"{np.mean(good_scores[-100:]):.2f}")


# if np.mean(collisions[-1000:]) >= 3 and np.mean(collisions[-1000:]) <= 4:
# model.save("../../model/MADDPG-simple-tag-v3.pth")
data = {
"Adversary Score": adv_scores,
"Good Agent Score": good_scores,
"Collisions": collisions
}
smooth = Smooth(data, weight=0.9)
smooth.show(title="Predator-Prey Training Results", ylabel="Reward / Count")
smooth.show(title="Detailed Training Metrics", subplot=True)

训练结果

训练进行了 3000 轮。从下方的图表中可以观察到明显的对抗学习过程:

  1. **Adversary Score (上图)**:捕食者的得分呈现明显的上升趋势,表明它们逐渐学会了合作来围堵和撞击猎物。
  2. **Good Agent Score (中图)**:猎物的得分保持为负(因为不断被撞击或被逼出界),但在博弈过程中有波动,显示出其在尝试调整策略以生存。
  3. **Collisions (下图)**:碰撞次数显著增加,平均碰撞次数上升到了 3-4 次左右,直接验证了捕食者策略的有效性。

用过程中保存的最优表现的参数进行1000轮测试,每轮平均碰撞次数保持在3次左右。


强化学习入门-8(MADDPG)
http://example.com/2025/12/24/Multi_Agent-ReinforcementLearning-8/
作者
ajls
发布于
2025年12月24日
许可协议