import os import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from agents.actor import Actor from agents.replay_buffer import ReplayBuffer from agents.noise import OUNoise """ Baseline: IndependentDDPG (独立 DDPG 基线) ===================================== Purpose (ablation): - This baseline removes the Centralized Training Decentralized Execution (CTDE) component. - It is used to demonstrate the necessity of joint critics that observe other agents' actions for stable training in MARL. - 目的(消融实验):该基线移除了中心化训练分布式执行(CTDE)组件。用于证明在多智能体强化学习中,引入能观察其他智能体动作的联合 Critic 对维持训练稳定性的必要性。 Difference from Co-MADDPG: 1. Critic Type: IndependentCritics are used, which only take the local observation and local action (obs_i, act_i) as input. 2. Update Order: Simultaneous independent updates for both agents. 3. 与 Co-MADDPG 的区别: - Critic 类型:使用独立 Critic,其输入仅包含局部观察与局部动作 (obs_i, act_i)。 - 更新顺序:两个智能体同时进行独立的更新。 Contribution: - Contributes to ablation studies showing how centralized critics mitigate non-stationarity issues. - 贡献:用于消融实验,展示中心化 Critic 如何缓解非平稳性(Non-stationarity)问题。 """ class IndependentCritic(nn.Module): """ IndependentCritic that takes only a single agent's observation and action. 独立 Critic,仅接收单个智能体的观察与动作。 """ def __init__(self, obs_dim, act_dim, hidden_sizes=[512, 512, 256]): super().__init__() assert len(hidden_sizes) == 3 self.net = nn.Sequential( nn.Linear(obs_dim + act_dim, hidden_sizes[0]), nn.ReLU(), nn.Linear(hidden_sizes[0], hidden_sizes[1]), nn.ReLU(), nn.Linear(hidden_sizes[1], hidden_sizes[2]), nn.ReLU(), nn.Linear(hidden_sizes[2], 1), ) def forward(self, obs, act): # Concatenate local observation and local action # 拼接局部观察与局部动作 x = torch.cat([obs, act], dim=1) return self.net(x) class IndependentDDPG: """ IndependentDDPG algorithm implementation. 独立 DDPG 算法实现。 """ def __init__(self, config): # Initialize configuration and device # 初始化配置和设备 self.config = config self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Hyperparameters # 超参数 self.gamma = config['training']['gamma'] self.tau = config['training']['tau'] self.batch_size = config['training']['batch_size'] # Dimensions # 维度 self.obs_dim = config['env']['num_subcarriers'] + 4 self.act_dim = 3 # Hidden layer configurations # 隐藏层配置 hidden_a = config['network']['actor_hidden'] hidden_c = config['network']['critic_hidden'] # Agent S: Local Actor and Independent Critic # 智能体 S:局部 Actor 与独立 Critic self.actor_s = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device) self.actor_s_target = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device) self.actor_s_target.load_state_dict(self.actor_s.state_dict()) self.critic_s = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device) self.critic_s_target = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device) self.critic_s_target.load_state_dict(self.critic_s.state_dict()) # Agent B: Local Actor and Independent Critic # 智能体 B:局部 Actor 与独立 Critic self.actor_b = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device) self.actor_b_target = Actor(self.obs_dim, self.act_dim, hidden_a).to(self.device) self.actor_b_target.load_state_dict(self.actor_b.state_dict()) self.critic_b = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device) self.critic_b_target = IndependentCritic(self.obs_dim, self.act_dim, hidden_c).to(self.device) self.critic_b_target.load_state_dict(self.critic_b.state_dict()) # Optimizers # 优化器 self.actor_s_optimizer = torch.optim.Adam(self.actor_s.parameters(), lr=config['training']['actor_lr']) self.actor_b_optimizer = torch.optim.Adam(self.actor_b.parameters(), lr=config['training']['actor_lr']) self.critic_s_optimizer = torch.optim.Adam(self.critic_s.parameters(), lr=config['training']['critic_lr']) self.critic_b_optimizer = torch.optim.Adam(self.critic_b.parameters(), lr=config['training']['critic_lr']) # Shared replay buffer # 共享重放池 self.replay_buffer = ReplayBuffer(config['training']['buffer_capacity']) # Noise for exploration # 探索噪声 self.noise_s = OUNoise(self.act_dim, theta=config['training']['ou_theta'], sigma_init=config['training']['ou_sigma_init'], sigma_min=config['training']['ou_sigma_min']) self.noise_b = OUNoise(self.act_dim, theta=config['training']['ou_theta'], sigma_init=config['training']['ou_sigma_init'], sigma_min=config['training']['ou_sigma_min']) def select_action(self, obs_s, obs_b, explore=True): """ Select actions for both agents. 为两个智能体选择动作。 """ self.actor_s.eval() self.actor_b.eval() with torch.no_grad(): obs_s_t = torch.FloatTensor(obs_s).unsqueeze(0).to(self.device) obs_b_t = torch.FloatTensor(obs_b).unsqueeze(0).to(self.device) act_s = self.actor_s(obs_s_t).cpu().numpy()[0] act_b = self.actor_b(obs_b_t).cpu().numpy()[0] self.actor_s.train() self.actor_b.train() if explore: # Apply OU noise # 应用 OU 噪声 act_s = np.clip(act_s + self.noise_s.sample(), 0.0, 1.0) act_b = np.clip(act_b + self.noise_b.sample(), 0.0, 1.0) else: act_s = np.clip(act_s, 0.0, 1.0) act_b = np.clip(act_b, 0.0, 1.0) return act_s, act_b def compute_rewards(self, qoe_s, qoe_b, qoe_sys): """ Compute rewards based on independent competitive behavior (λ=0). 基于独立的竞争行为计算奖励 (λ=0)。 Formula: r_i = comp_self * qoe_i + comp_sys * qoe_sys 公式说明:独立模式下默认为纯竞争,每个智能体仅优化自身效用及系统整体惩罚。 """ lam = 0.0 r_s = self.config['reward']['comp_self'] * qoe_s + self.config['reward']['comp_sys'] * qoe_sys r_b = self.config['reward']['comp_self'] * qoe_b + self.config['reward']['comp_sys'] * qoe_sys return r_s, r_b, lam def update(self): """ Update each agent independently and simultaneously. 独立且同步地更新每个智能体。 """ if len(self.replay_buffer) < self.batch_size: return None # Sample batch # 采样批量数据 obs_s, obs_b, act_s, act_b, rew_s, rew_b, next_obs_s, next_obs_b, dones = \ self.replay_buffer.sample(self.batch_size) # To tensors # 转换为张量 obs_s = torch.FloatTensor(obs_s).to(self.device) obs_b = torch.FloatTensor(obs_b).to(self.device) act_s = torch.FloatTensor(act_s).to(self.device) act_b = torch.FloatTensor(act_b).to(self.device) rew_s = torch.FloatTensor(rew_s).unsqueeze(1).to(self.device) rew_b = torch.FloatTensor(rew_b).unsqueeze(1).to(self.device) next_obs_s = torch.FloatTensor(next_obs_s).to(self.device) next_obs_b = torch.FloatTensor(next_obs_b).to(self.device) dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device) # --- Update Agent S (independent) --- # --- 独立更新智能体 S --- with torch.no_grad(): # Critic target only uses local next observation and action # Critic 目标仅使用局部下一状态观察与动作 next_act_s = self.actor_s_target(next_obs_s) target_q_s = rew_s + self.gamma * (1 - dones) * self.critic_s_target(next_obs_s, next_act_s) current_q_s = self.critic_s(obs_s, act_s) critic_loss_s = F.mse_loss(current_q_s, target_q_s) self.critic_s_optimizer.zero_grad() critic_loss_s.backward() self.critic_s_optimizer.step() new_act_s = self.actor_s(obs_s) actor_loss_s = -self.critic_s(obs_s, new_act_s).mean() self.actor_s_optimizer.zero_grad() actor_loss_s.backward() self.actor_s_optimizer.step() # --- Update Agent B (independent) --- # --- 独立更新智能体 B --- with torch.no_grad(): # Critic target only uses local next observation and action # Critic 目标仅使用局部下一状态观察与动作 next_act_b = self.actor_b_target(next_obs_b) target_q_b = rew_b + self.gamma * (1 - dones) * self.critic_b_target(next_obs_b, next_act_b) current_q_b = self.critic_b(obs_b, act_b) critic_loss_b = F.mse_loss(current_q_b, target_q_b) self.critic_b_optimizer.zero_grad() critic_loss_b.backward() self.critic_b_optimizer.step() new_act_b = self.actor_b(obs_b) actor_loss_b = -self.critic_b(obs_b, new_act_b).mean() self.actor_b_optimizer.zero_grad() actor_loss_b.backward() self.actor_b_optimizer.step() # Soft update targets for both agents # 软更新两个智能体的目标网络 for target, source in [ (self.critic_s_target, self.critic_s), (self.critic_b_target, self.critic_b), (self.actor_s_target, self.actor_s), (self.actor_b_target, self.actor_b), ]: for tp, sp in zip(target.parameters(), source.parameters()): tp.data.copy_(self.tau * sp.data + (1.0 - self.tau) * tp.data) return { 'actor_loss_s': actor_loss_s.item(), 'actor_loss_b': actor_loss_b.item(), 'critic_loss_s': critic_loss_s.item(), 'critic_loss_b': critic_loss_b.item(), } def save(self, path): """ Save models. 保存模型。 """ os.makedirs(path, exist_ok=True) torch.save(self.actor_s.state_dict(), os.path.join(path, "actor_s.pth")) torch.save(self.actor_b.state_dict(), os.path.join(path, "actor_b.pth")) torch.save(self.critic_s.state_dict(), os.path.join(path, "critic_s.pth")) torch.save(self.critic_b.state_dict(), os.path.join(path, "critic_b.pth")) def load(self, path): """ Load models. 加载模型。 """ self.actor_s.load_state_dict(torch.load(os.path.join(path, "actor_s.pth"), map_location=self.device)) self.actor_b.load_state_dict(torch.load(os.path.join(path, "actor_b.pth"), map_location=self.device)) self.critic_s.load_state_dict(torch.load(os.path.join(path, "critic_s.pth"), map_location=self.device)) self.critic_b.load_state_dict(torch.load(os.path.join(path, "critic_b.pth"), map_location=self.device)) self.actor_s_target.load_state_dict(self.actor_s.state_dict()) self.actor_b_target.load_state_dict(self.actor_b.state_dict()) self.critic_s_target.load_state_dict(self.critic_s.state_dict()) self.critic_b_target.load_state_dict(self.critic_b.state_dict())