SemanticCommunication/code/baselines/single_dqn.py

297 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import random
from collections import deque
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
"""
Baseline: SingleAgentDQN (单智能体 DQN 基线)
=====================================
Purpose (non-MARL baseline):
- This baseline represents a traditional single-agent approach to the resource allocation problem.
- It uses a centralized DQN that controls both groups by discretizing the continuous action space.
- 目的(非多智能体基线):该基线代表了解决资源分配问题的传统单智能体方法。它使用中心化 DQN通过对连续动作空间进行离散化同时控制两个用户组。
Difference from Co-MADDPG:
1. Algorithm Class: Non-MARL (DQN) vs MARL (Co-MADDPG).
2. Action Space: Discrete (48 actions) vs Continuous.
3. Architecture: Centralized control vs Decentralized execution with CTDE.
4. Exploration: Epsilon-greedy vs OU Noise.
5. 与 Co-MADDPG 的区别:
- 算法类别:非多智能体 (DQN) vs 多智能体 (Co-MADDPG)。
- 动作空间离散48 种动作组合) vs 连续。
- 架构:中心化控制 vs CTDE 架构下的分布式执行。
- 探索机制:ε-greedy vs OU 噪声。
Contribution:
- Contributes to performance tables showing the limitations of discretization and centralized control in complex multi-user scenarios.
- 贡献:用于性能表,展示在复杂多用户场景下,动作离散化和中心化控制的局限性。
"""
# ---- Discrete action mapping (离散动作映射) ----
# 4 levels for subcarrier fraction, 4 for power fraction, 3 for m_param
# 子载波比例 4 级,功率比例 4 级m 参数 3 级
N_SUB_LEVELS = [0.25, 0.5, 0.75, 1.0]
P_FRAC_LEVELS = [0.25, 0.5, 0.75, 1.0]
M_PARAM_LEVELS = [0.33, 0.66, 1.0]
NUM_ACTIONS = len(N_SUB_LEVELS) * len(P_FRAC_LEVELS) * len(M_PARAM_LEVELS) # 48 combinations
# Build lookup table: index -> (n_sub_frac, p_frac, m_param)
# 构建查找表:索引 -> (子载波比例, 功率比例, m 参数)
_ACTION_TABLE = []
for n in N_SUB_LEVELS:
for p in P_FRAC_LEVELS:
for m in M_PARAM_LEVELS:
_ACTION_TABLE.append(np.array([n, p, m], dtype=np.float32))
class DQNNet(nn.Module):
"""
Simple Fully Connected Q-network.
简单的全连接 Q 网络。
"""
def __init__(self, state_dim, num_actions):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, num_actions),
)
def forward(self, x):
"""Map state to Q-values for each discrete action."""
return self.net(x)
class DQNReplayBuffer:
"""
Wrapper buffer for SingleAgentDQN.
单智能体 DQN 的封装重放池。
Accepts the multi-agent 9-argument signature but stores transitions suitable for DQN.
接收多智能体的 9 参数签名,但内部存储适合 DQN 的转换数据。
"""
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
self._last_action_s_idx = 0
self._last_action_b_idx = 0
def set_last_actions(self, idx_s, idx_b):
"""Store the discrete action indices used."""
self._last_action_s_idx = idx_s
self._last_action_b_idx = idx_b
def push(self, obs_s, obs_b, act_s, act_b, rew_s, rew_b,
next_obs_s, next_obs_b, done=False):
"""
Store multi-agent step as a single-agent transition.
将多智能体步骤作为单智能体转换存储。
"""
# Concatenate observations for centralized state
# 拼接观察值以形成中心化状态
state = np.concatenate([np.asarray(obs_s, dtype=np.float32),
np.asarray(obs_b, dtype=np.float32)])
next_state = np.concatenate([np.asarray(next_obs_s, dtype=np.float32),
np.asarray(next_obs_b, dtype=np.float32)])
# Average rewards for single-agent scalar reward
# 对奖励求平均以获得单智能体标量奖励
reward = 0.5 * (float(rew_s) + float(rew_b))
self.buffer.append((state, self._last_action_s_idx, self._last_action_b_idx,
reward, next_state, float(done)))
def sample(self, batch_size):
"""Sample a batch of transitions."""
batch = random.sample(self.buffer, batch_size)
states, a_s, a_b, rewards, next_states, dones = zip(*batch)
return (np.array(states), np.array(a_s), np.array(a_b),
np.array(rewards, dtype=np.float32),
np.array(next_states), np.array(dones, dtype=np.float32))
def __len__(self):
return len(self.buffer)
class SingleAgentDQN:
"""
SingleAgentDQN algorithm implementation.
单智能体 DQN 算法实现。
"""
def __init__(self, config):
# Initialize configuration and device
# 初始化配置和设备
self.config = config
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Hyperparameters
# 超参数
self.gamma = config['training']['gamma']
self.batch_size = config['training']['batch_size']
self.tau = config['training']['tau']
# Dimensions: Concentated state
# 维度:拼接后的状态
self.obs_dim = config['env']['num_subcarriers'] + 4
self.state_dim = self.obs_dim * 2
self.num_actions = NUM_ACTIONS
# Two DQN heads: one for semantic (s) actions, one for traditional (b) actions
# 两个 DQN 头:一个用于语义动作 (s),一个用于传统动作 (b)
self.q_net_s = DQNNet(self.state_dim, self.num_actions).to(self.device)
self.q_net_b = DQNNet(self.state_dim, self.num_actions).to(self.device)
self.q_target_s = DQNNet(self.state_dim, self.num_actions).to(self.device)
self.q_target_b = DQNNet(self.state_dim, self.num_actions).to(self.device)
self.q_target_s.load_state_dict(self.q_net_s.state_dict())
self.q_target_b.load_state_dict(self.q_net_b.state_dict())
# Optimizers
# 优化器
lr = config['training'].get('actor_lr', 1e-4)
self.optimizer_s = torch.optim.Adam(self.q_net_s.parameters(), lr=lr)
self.optimizer_b = torch.optim.Adam(self.q_net_b.parameters(), lr=lr)
# Epsilon-greedy exploration parameters
# ε-greedy 探索参数
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay_episodes = 3000
# Specialized Replay Buffer
# 专用的重放池
self.replay_buffer = DQNReplayBuffer(config['training']['buffer_capacity'])
# Discrete action index tracking
# 离散动作索引追踪
self._last_action_s_idx = 0
self._last_action_b_idx = 0
# EpsilonAdapter: Hack to allow epsilon decay via train.py's existing loop
# EpsilonAdapter用于通过 train.py 现有循环触发 ε 衰减的技巧
self.noise_s = type('EpsilonAdapter', (), {
'decay_sigma': lambda _, ep: self._decay_epsilon(ep)
})()
def select_action(self, obs_s, obs_b, explore=True):
"""
Select discrete actions using epsilon-greedy policy.
使用 ε-greedy 策略选择离散动作。
"""
state = np.concatenate([obs_s, obs_b]).astype(np.float32)
state_t = torch.FloatTensor(state).unsqueeze(0).to(self.device)
if explore and random.random() < self.epsilon:
# Random exploration
# 随机探索
idx_s = random.randrange(self.num_actions)
idx_b = random.randrange(self.num_actions)
else:
# Exploit learned Q-values
# 利用已学习的 Q 值
self.q_net_s.eval()
self.q_net_b.eval()
with torch.no_grad():
q_s = self.q_net_s(state_t)
q_b = self.q_net_b(state_t)
self.q_net_s.train()
self.q_net_b.train()
idx_s = q_s.argmax(dim=1).item()
idx_b = q_b.argmax(dim=1).item()
# Update last indices for the buffer push
# 更新用于存入重放池的最后索引
self._last_action_s_idx = idx_s
self._last_action_b_idx = idx_b
self.replay_buffer.set_last_actions(idx_s, idx_b)
# Return continuous actions from lookup table
# 从查找表中返回对应的连续动作
return _ACTION_TABLE[idx_s].copy(), _ACTION_TABLE[idx_b].copy()
def compute_rewards(self, qoe_s, qoe_b, qoe_sys):
"""
Compute scalar reward for single agent.
为单智能体计算标量奖励。
Formula: r = 0.5 * (qoe_s + qoe_b)
公式说明:由于是单智能体控制全局,奖励取两组用户 QoE 的均值。
"""
lam = 0.5
r = 0.5 * (qoe_s + qoe_b)
return r, r, lam
def update(self):
"""
Update the Q-networks.
更新 Q 网络。
"""
if len(self.replay_buffer) < self.batch_size:
return None
# Sample batch
# 采样批量数据
states, a_s, a_b, rewards, next_states, dones = \
self.replay_buffer.sample(self.batch_size)
# To tensors
# 转换为张量
states_t = torch.FloatTensor(states).to(self.device)
next_states_t = torch.FloatTensor(next_states).to(self.device)
rewards_t = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
dones_t = torch.FloatTensor(dones).unsqueeze(1).to(self.device)
a_s_t = torch.LongTensor(a_s).unsqueeze(1).to(self.device)
a_b_t = torch.LongTensor(a_b).unsqueeze(1).to(self.device)
# 1. Update Semantic Head (1. 更新语义分支)
q_values_s = self.q_net_s(states_t).gather(1, a_s_t)
with torch.no_grad():
next_q_s = self.q_target_s(next_states_t).max(1, keepdim=True)[0]
target_s = rewards_t + self.gamma * (1 - dones_t) * next_q_s
loss_s = F.mse_loss(q_values_s, target_s)
self.optimizer_s.zero_grad()
loss_s.backward()
self.optimizer_s.step()
# 2. Update Traditional Head (2. 更新传统分支)
q_values_b = self.q_net_b(states_t).gather(1, a_b_t)
with torch.no_grad():
next_q_b = self.q_target_b(next_states_t).max(1, keepdim=True)[0]
target_b = rewards_t + self.gamma * (1 - dones_t) * next_q_b
loss_b = F.mse_loss(q_values_b, target_b)
self.optimizer_b.zero_grad()
loss_b.backward()
self.optimizer_b.step()
# 3. Soft update target networks (3. 目标网络软更新)
for target, source in [
(self.q_target_s, self.q_net_s),
(self.q_target_b, self.q_net_b),
]:
for tp, sp in zip(target.parameters(), source.parameters()):
tp.data.copy_(self.tau * sp.data + (1.0 - self.tau) * tp.data)
return {'loss_s': loss_s.item(), 'loss_b': loss_b.item()}
def _decay_epsilon(self, episode):
"""
Decay epsilon over episodes.
随训练轮数衰减 ε。
"""
frac = min(1.0, episode / max(1, self.epsilon_decay_episodes))
self.epsilon = self.epsilon + frac * (self.epsilon_min - self.epsilon)
def save(self, path):
"""Save Q-nets."""
os.makedirs(path, exist_ok=True)
torch.save(self.q_net_s.state_dict(), os.path.join(path, "q_net_s.pth"))
torch.save(self.q_net_b.state_dict(), os.path.join(path, "q_net_b.pth"))
def load(self, path):
"""Load Q-nets."""
self.q_net_s.load_state_dict(torch.load(os.path.join(path, "q_net_s.pth"), map_location=self.device))
self.q_net_b.load_state_dict(torch.load(os.path.join(path, "q_net_b.pth"), map_location=self.device))
self.q_target_s.load_state_dict(self.q_net_s.state_dict())
self.q_target_b.load_state_dict(self.q_net_b.state_dict())