267 lines
12 KiB
Python
267 lines
12 KiB
Python
import os
|
||
import numpy as np
|
||
import torch
|
||
import torch.nn as nn
|
||
import torch.nn.functional as F
|
||
|
||
from agents.actor import Actor
|
||
from agents.replay_buffer import ReplayBuffer
|
||
from agents.noise import OUNoise
|
||
|
||
"""
|
||
Baseline: IndependentDDPG (独立 DDPG 基线)
|
||
=====================================
|
||
Purpose (ablation):
|
||
- This baseline removes the Centralized Training Decentralized Execution (CTDE) component.
|
||
- It is used to demonstrate the necessity of joint critics that observe other agents' actions for stable training in MARL.
|
||
- 目的(消融实验):该基线移除了中心化训练分布式执行(CTDE)组件。用于证明在多智能体强化学习中,引入能观察其他智能体动作的联合 Critic 对维持训练稳定性的必要性。
|
||
|
||
Difference from Co-MADDPG:
|
||
1. Critic Type: IndependentCritics are used, which only take the local observation and local action (obs_i, act_i) as input.
|
||
2. Update Order: Simultaneous independent updates for both agents.
|
||
3. 与 Co-MADDPG 的区别:
|
||
- Critic 类型:使用独立 Critic,其输入仅包含局部观察与局部动作 (obs_i, act_i)。
|
||
- 更新顺序:两个智能体同时进行独立的更新。
|
||
|
||
Contribution:
|
||
- Contributes to ablation studies showing how centralized critics mitigate non-stationarity issues.
|
||
- 贡献:用于消融实验,展示中心化 Critic 如何缓解非平稳性(Non-stationarity)问题。
|
||
"""
|
||
|
||
class IndependentCritic(nn.Module):
|
||
"""
|
||
IndependentCritic that takes only a single agent's observation and action.
|
||
独立 Critic,仅接收单个智能体的观察与动作。
|
||
"""
|
||
def __init__(self, obs_dim, act_dim, hidden_sizes=[512, 512, 256]):
|
||
super().__init__()
|
||
assert len(hidden_sizes) == 3
|
||
self.net = nn.Sequential(
|
||
nn.Linear(obs_dim + act_dim, hidden_sizes[0]),
|
||
nn.ReLU(),
|
||
nn.Linear(hidden_sizes[0], hidden_sizes[1]),
|
||
nn.ReLU(),
|
||
nn.Linear(hidden_sizes[1], hidden_sizes[2]),
|
||
nn.ReLU(),
|
||
nn.Linear(hidden_sizes[2], 1),
|
||
)
|
||
|
||
def forward(self, obs, act):
|
||
# Concatenate local observation and local action
|
||
# 拼接局部观察与局部动作
|
||
x = torch.cat([obs, act], dim=1)
|
||
return self.net(x)
|
||
|
||
|
||
class IndependentDDPG:
|
||
"""
|
||
IndependentDDPG algorithm implementation.
|
||
独立 DDPG 算法实现。
|
||
"""
|
||
def __init__(self, config):
|
||
# Initialize configuration and device
|
||
# 初始化配置和设备
|
||
self.config = config
|
||
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||
|
||
# Hyperparameters
|
||
# 超参数
|
||
self.gamma = config['training']['gamma']
|
||
self.tau = config['training']['tau']
|
||
self.batch_size = config['training']['batch_size']
|
||
|
||
# Dimensions
|
||
# 维度
|
||
self.obs_dim = config['env']['num_subcarriers'] + 4
|
||
self.act_dim = 3
|
||
|
||
# Hidden layer configurations
|
||
# 隐藏层配置
|
||
hidden_a = config['network']['actor_hidden']
|
||
hidden_c = config['network']['critic_hidden']
|
||
|
||
# Agent S: Local Actor and Independent Critic
|
||
# 智能体 S:局部 Actor 与独立 Critic
|
||
self.actor_s = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device)
|
||
self.actor_s_target = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device)
|
||
self.actor_s_target.load_state_dict(self.actor_s.state_dict())
|
||
self.critic_s = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device)
|
||
self.critic_s_target = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device)
|
||
self.critic_s_target.load_state_dict(self.critic_s.state_dict())
|
||
|
||
# Agent B: Local Actor and Independent Critic
|
||
# 智能体 B:局部 Actor 与独立 Critic
|
||
self.actor_b = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device)
|
||
self.actor_b_target = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device)
|
||
self.actor_b_target.load_state_dict(self.actor_b.state_dict())
|
||
self.critic_b = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device)
|
||
self.critic_b_target = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device)
|
||
self.critic_b_target.load_state_dict(self.critic_b.state_dict())
|
||
|
||
# Optimizers
|
||
# 优化器
|
||
self.actor_s_optimizer = torch.optim.Adam(self.actor_s.parameters(), lr=config['training']['actor_lr'])
|
||
self.actor_b_optimizer = torch.optim.Adam(self.actor_b.parameters(), lr=config['training']['actor_lr'])
|
||
self.critic_s_optimizer = torch.optim.Adam(self.critic_s.parameters(), lr=config['training']['critic_lr'])
|
||
self.critic_b_optimizer = torch.optim.Adam(self.critic_b.parameters(), lr=config['training']['critic_lr'])
|
||
|
||
# Shared replay buffer
|
||
# 共享重放池
|
||
self.replay_buffer = ReplayBuffer(config['training']['buffer_capacity'])
|
||
|
||
# Noise for exploration
|
||
# 探索噪声
|
||
self.noise_s = OUNoise(self.act_dim, theta=config['training']['ou_theta'],
|
||
sigma_init=config['training']['ou_sigma_init'],
|
||
sigma_min=config['training']['ou_sigma_min'])
|
||
self.noise_b = OUNoise(self.act_dim, theta=config['training']['ou_theta'],
|
||
sigma_init=config['training']['ou_sigma_init'],
|
||
sigma_min=config['training']['ou_sigma_min'])
|
||
|
||
def select_action(self, obs_s, obs_b, explore=True):
|
||
"""
|
||
Select actions for both agents.
|
||
为两个智能体选择动作。
|
||
"""
|
||
self.actor_s.eval()
|
||
self.actor_b.eval()
|
||
with torch.no_grad():
|
||
obs_s_t = torch.FloatTensor(obs_s).unsqueeze(0).to(self.device)
|
||
obs_b_t = torch.FloatTensor(obs_b).unsqueeze(0).to(self.device)
|
||
act_s = self.actor_s(obs_s_t).cpu().numpy()[0]
|
||
act_b = self.actor_b(obs_b_t).cpu().numpy()[0]
|
||
self.actor_s.train()
|
||
self.actor_b.train()
|
||
|
||
if explore:
|
||
# Apply OU noise
|
||
# 应用 OU 噪声
|
||
act_s = np.clip(act_s + self.noise_s.sample(), 0.0, 1.0)
|
||
act_b = np.clip(act_b + self.noise_b.sample(), 0.0, 1.0)
|
||
else:
|
||
act_s = np.clip(act_s, 0.0, 1.0)
|
||
act_b = np.clip(act_b, 0.0, 1.0)
|
||
|
||
return act_s, act_b
|
||
|
||
def compute_rewards(self, qoe_s, qoe_b, qoe_sys):
|
||
"""
|
||
Compute rewards based on independent competitive behavior (λ=0).
|
||
基于独立的竞争行为计算奖励 (λ=0)。
|
||
|
||
Formula: r_i = comp_self * qoe_i + comp_sys * qoe_sys
|
||
公式说明:独立模式下默认为纯竞争,每个智能体仅优化自身效用及系统整体惩罚。
|
||
"""
|
||
lam = 0.0
|
||
r_s = self.config['reward']['comp_self'] * qoe_s + self.config['reward']['comp_sys'] * qoe_sys
|
||
r_b = self.config['reward']['comp_self'] * qoe_b + self.config['reward']['comp_sys'] * qoe_sys
|
||
return r_s, r_b, lam
|
||
|
||
def update(self):
|
||
"""
|
||
Update each agent independently and simultaneously.
|
||
独立且同步地更新每个智能体。
|
||
"""
|
||
if len(self.replay_buffer) < self.batch_size:
|
||
return None
|
||
|
||
# Sample batch
|
||
# 采样批量数据
|
||
obs_s, obs_b, act_s, act_b, rew_s, rew_b, next_obs_s, next_obs_b, dones = \
|
||
self.replay_buffer.sample(self.batch_size)
|
||
|
||
# To tensors
|
||
# 转换为张量
|
||
obs_s = torch.FloatTensor(obs_s).to(self.device)
|
||
obs_b = torch.FloatTensor(obs_b).to(self.device)
|
||
act_s = torch.FloatTensor(act_s).to(self.device)
|
||
act_b = torch.FloatTensor(act_b).to(self.device)
|
||
rew_s = torch.FloatTensor(rew_s).unsqueeze(1).to(self.device)
|
||
rew_b = torch.FloatTensor(rew_b).unsqueeze(1).to(self.device)
|
||
next_obs_s = torch.FloatTensor(next_obs_s).to(self.device)
|
||
next_obs_b = torch.FloatTensor(next_obs_b).to(self.device)
|
||
dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)
|
||
|
||
# --- Update Agent S (independent) ---
|
||
# --- 独立更新智能体 S ---
|
||
with torch.no_grad():
|
||
# Critic target only uses local next observation and action
|
||
# Critic 目标仅使用局部下一状态观察与动作
|
||
next_act_s = self.actor_s_target(next_obs_s)
|
||
target_q_s = rew_s + self.gamma * (1 - dones) * self.critic_s_target(next_obs_s, next_act_s)
|
||
|
||
current_q_s = self.critic_s(obs_s, act_s)
|
||
critic_loss_s = F.mse_loss(current_q_s, target_q_s)
|
||
self.critic_s_optimizer.zero_grad()
|
||
critic_loss_s.backward()
|
||
self.critic_s_optimizer.step()
|
||
|
||
new_act_s = self.actor_s(obs_s)
|
||
actor_loss_s = -self.critic_s(obs_s, new_act_s).mean()
|
||
self.actor_s_optimizer.zero_grad()
|
||
actor_loss_s.backward()
|
||
self.actor_s_optimizer.step()
|
||
|
||
# --- Update Agent B (independent) ---
|
||
# --- 独立更新智能体 B ---
|
||
with torch.no_grad():
|
||
# Critic target only uses local next observation and action
|
||
# Critic 目标仅使用局部下一状态观察与动作
|
||
next_act_b = self.actor_b_target(next_obs_b)
|
||
target_q_b = rew_b + self.gamma * (1 - dones) * self.critic_b_target(next_obs_b, next_act_b)
|
||
|
||
current_q_b = self.critic_b(obs_b, act_b)
|
||
critic_loss_b = F.mse_loss(current_q_b, target_q_b)
|
||
self.critic_b_optimizer.zero_grad()
|
||
critic_loss_b.backward()
|
||
self.critic_b_optimizer.step()
|
||
|
||
new_act_b = self.actor_b(obs_b)
|
||
actor_loss_b = -self.critic_b(obs_b, new_act_b).mean()
|
||
self.actor_b_optimizer.zero_grad()
|
||
actor_loss_b.backward()
|
||
self.actor_b_optimizer.step()
|
||
|
||
# Soft update targets for both agents
|
||
# 软更新两个智能体的目标网络
|
||
for target, source in [
|
||
(self.critic_s_target, self.critic_s),
|
||
(self.critic_b_target, self.critic_b),
|
||
(self.actor_s_target, self.actor_s),
|
||
(self.actor_b_target, self.actor_b),
|
||
]:
|
||
for tp, sp in zip(target.parameters(), source.parameters()):
|
||
tp.data.copy_(self.tau * sp.data + (1.0 - self.tau) * tp.data)
|
||
|
||
return {
|
||
'actor_loss_s': actor_loss_s.item(),
|
||
'actor_loss_b': actor_loss_b.item(),
|
||
'critic_loss_s': critic_loss_s.item(),
|
||
'critic_loss_b': critic_loss_b.item(),
|
||
}
|
||
|
||
def save(self, path):
|
||
"""
|
||
Save models.
|
||
保存模型。
|
||
"""
|
||
os.makedirs(path, exist_ok=True)
|
||
torch.save(self.actor_s.state_dict(), os.path.join(path, "actor_s.pth"))
|
||
torch.save(self.actor_b.state_dict(), os.path.join(path, "actor_b.pth"))
|
||
torch.save(self.critic_s.state_dict(), os.path.join(path, "critic_s.pth"))
|
||
torch.save(self.critic_b.state_dict(), os.path.join(path, "critic_b.pth"))
|
||
|
||
def load(self, path):
|
||
"""
|
||
Load models.
|
||
加载模型。
|
||
"""
|
||
self.actor_s.load_state_dict(torch.load(os.path.join(path, "actor_s.pth"), map_location=self.device))
|
||
self.actor_b.load_state_dict(torch.load(os.path.join(path, "actor_b.pth"), map_location=self.device))
|
||
self.critic_s.load_state_dict(torch.load(os.path.join(path, "critic_s.pth"), map_location=self.device))
|
||
self.critic_b.load_state_dict(torch.load(os.path.join(path, "critic_b.pth"), map_location=self.device))
|
||
self.actor_s_target.load_state_dict(self.actor_s.state_dict())
|
||
self.actor_b_target.load_state_dict(self.actor_b.state_dict())
|
||
self.critic_s_target.load_state_dict(self.critic_s.state_dict())
|
||
self.critic_b_target.load_state_dict(self.critic_b.state_dict())
|