""" 无线资源分配环境 / Main Gym-like environment for wireless resource allocation. 该模块实现了一个用于语义和传统用户共存系统的无线资源分配环境。 它通过 Gym 风格的 reset/step 接口,处理子载波分配、功率控制和压缩率优化。 This module implements a wireless resource allocation environment for systems with coexisting semantic and traditional users. It handles subcarrier allocation, power control, and compression ratio optimization via a Gym-like reset/step interface. 作者/Author: Sisyphus-Junior 日期/Date: 2026-02-28 论文引用/Paper Reference: Co-MADDPG based Resource Allocation for Semantic Communication 依赖/Dependencies: numpy, envs.channel_model, envs.semantic_module """ import numpy as np from envs.channel_model import ChannelModel from envs.semantic_module import SemanticModule class WirelessEnv: """ 语义与传统通信共存环境。 Wireless environment with semantic and traditional communication. 管理信道状态、执行动作并计算系统范围内的 QoE。 Manages channel states, executes actions, and computes system-wide QoE. Parameters ---------- config : dict 包含 'env' 和 'training' 部分的配置字典。 Configuration dictionary containing 'env' and 'training' sections. """ def __init__(self, config): # 提取环境和训练配置 / Extract environment and training configs env_config = config['env'] train_config = config['training'] # 核心系统参数 / Core system parameters self.N = env_config['num_subcarriers'] # 子载波数量 N / Number of subcarriers self.K_s = env_config['num_semantic_users'] # 语义用户数 / Number of semantic users self.K_b = env_config['num_traditional_users'] # 传统用户数 / Number of traditional users self.K = self.K_s + self.K_b # 总用户数 / Total number of users # 物理层参数 / Physical layer parameters self.P_max = env_config['max_power'] # 最大总发射功率 / Maximum total transmit power self.R_req = env_config['min_rate_req'] # 传统用户最小速率需求 / Min rate requirement for traditional users self.delta_f = env_config['subcarrier_spacing'] # 子载波间隔 / Subcarrier spacing self.rho_min = env_config['rho_min'] # 最小压缩率 / Minimum compression ratio self.rho_max = env_config['rho_max'] # 最大压缩率 / Maximum compression ratio self.w1 = env_config['w1'] # 语义 QoE 权重 1 / Semantic QoE weight 1 self.w2 = env_config['w2'] # 语义 QoE 权重 2 / Semantic QoE weight 2 # 距离限制 / Distance limits self.min_d = env_config.get('min_distance', 50.0) self.max_d = env_config.get('max_distance', 500.0) # 训练步数控制 / Training step control self.max_steps = train_config['max_steps'] self.step_count = 0 # 初始化模型 / Initialize models self.channel_model = ChannelModel(config) self.semantic_module = SemanticModule(config) # 初始状态变量 / Initial state variables self.distances = np.zeros(self.K) # 用户距离 / User distances self.channel_gains = np.zeros((self.K, self.N), dtype=complex) # 复信道增益 / Complex channel gains self.content_sensitivity = 0.5 # 内容敏感度 / Content sensitivity self.business_priority = 0.5 # 业务优先级 / Business priority self.load_s = 0.5 # 语义流量负载 / Semantic traffic load self.load_b = 0.5 # 传统流量负载 / Traditional traffic load self.alloc_s = 0.0 # 语义子载波分配比例 / Semantic subcarrier allocation fraction self.alloc_b = 0.0 # 传统子载波分配比例 / Traditional subcarrier allocation fraction self.qoe_avg_s = 0.0 # 语义平均 QoE / Rolling average semantic QoE self.qoe_avg_b = 0.0 # 传统平均 QoE / Rolling average traditional QoE @property def obs_dim(self): """观察维度: 子载波 (N) + 4 个额外特征。 / Observation dimension: Subcarriers (N) + 4 extra features.""" return self.N + 4 @property def act_dim(self): """动作维度: 子载波比例, 功率比例, [语义: 压缩率]。 / Action dimension: Subcarrier fraction, Power fraction, [Semantic: Compression ratio].""" return 3 def reset(self): """ 重置环境状态。 Reset environment state. Returns ------- tuple (语义智能体观察, 传统智能体观察)。 (semantic_observation, traditional_observation). """ # 在 [min_distance, max_distance] 内随机分配用户距离 / Random user distances in [min_distance, max_distance] self.distances = np.random.uniform(self.min_d, self.max_d, size=self.K) # 生成信道 (形状: K x N 复数) / Generate channel (shape: K x N complex) - Eq.(6) self.channel_gains = self.channel_model.generate_channel(self.distances, self.N) self.step_count = 0 # 随机设置观察参数 / Random params for observation self.content_sensitivity = np.random.uniform(0.3, 0.8) self.business_priority = np.random.uniform(0.3, 0.8) self.load_s = np.random.uniform(0.2, 0.8) self.load_b = np.random.uniform(0.2, 0.8) # 重置分配比例和移动平均值 / Reset allocations and moving averages self.alloc_s = 0.0 self.alloc_b = 0.0 self.qoe_avg_s = 0.0 self.qoe_avg_b = 0.0 # 获取初始观察 / Get initial observations obs_s = self._get_observation('semantic') obs_b = self._get_observation('traditional') return obs_s, obs_b def _get_observation(self, agent_type): """ 构造智能体的观察向量。 Construct observation vector for agents. Parameters ---------- agent_type : str 'semantic' 或 'traditional'。 'semantic' or 'traditional'. Returns ------- np.ndarray 归一化后的观察向量。 Normalized observation vector. """ if agent_type == 'semantic': # 语义用户索引范围 / Semantic user indices range user_indices = range(self.K_b, self.K) if len(user_indices) > 0: # 计算平均信道增益平方 (功率) / Mean channel power channel_power = np.mean(np.abs(self.channel_gains[user_indices])**2, axis=0) else: channel_power = np.zeros(self.N) # 归一化信道功率 / Normalize channel power channel_norm = channel_power / (np.max(channel_power) + 1e-10) # 拼接额外特征 / Concatenate extra features obs = np.concatenate([channel_norm, [self.qoe_avg_s, self.content_sensitivity, self.alloc_s, self.load_s]]) else: # 传统 / traditional # 传统用户索引范围 / Traditional user indices range user_indices = range(0, self.K_b) if len(user_indices) > 0: # 计算平均信道功率 / Mean channel power channel_power = np.mean(np.abs(self.channel_gains[user_indices])**2, axis=0) else: channel_power = np.zeros(self.N) # 归一化信道功率 / Normalize channel power channel_norm = channel_power / (np.max(channel_power) + 1e-10) # 拼接额外特征 / Concatenate extra features obs = np.concatenate([channel_norm, [self.qoe_avg_b, self.business_priority, self.alloc_b, self.load_b]]) # 返回 32位浮点型观察 / Return float32 observation return obs.astype(np.float32) def step(self, action_s, action_b): """ 执行一个时间步。 Execute a single environment step. Parameters ---------- action_s : np.ndarray 语义智能体动作 [子载波比例, 功率比例, 压缩率]。 Semantic agent action [sub_fraction, power_fraction, compression_ratio]. action_b : np.ndarray 传统智能体动作 [子载波比例, 功率比例, 冗余参数]。 Traditional agent action [sub_fraction, power_fraction, redundant_param]. Returns ------- tuple (obs_s, obs_b, reward_s, reward_b, done, info). """ self.step_count += 1 # 1. 解码动作 / Decode actions # 计算子载波分配数量 / Compute number of subcarriers n_sub_s = max(1, int(round(action_s[0] * self.N))) n_sub_b = max(1, int(round(action_b[0] * self.N))) # 限制总子载波数量 / Clip total subcarriers if n_sub_s + n_sub_b > self.N: total = n_sub_s + n_sub_b n_sub_s = int(round(n_sub_s * self.N / total)) n_sub_b = self.N - n_sub_s # 计算功率分配 / Compute power allocation p_s = action_s[1] * self.P_max p_b = action_b[1] * self.P_max # 限制总功率 / Limit total power if p_s + p_b > self.P_max: total_p = p_s + p_b p_s = p_s * self.P_max / total_p p_b = p_b * self.P_max / total_p # 解码语义压缩率 / Decode semantic compression ratio rho = action_s[2] * (self.rho_max - self.rho_min) + self.rho_min # 2. 分配子载波 (基于信道质量的贪婪算法) / Allocate subcarriers (greedy by channel quality) # 计算两组用户的平均信道质量 / Mean channel quality for both groups sem_channel = np.mean(np.abs(self.channel_gains[self.K_b:])**2, axis=0) if self.K_s > 0 else np.zeros(self.N) trad_channel = np.mean(np.abs(self.channel_gains[:self.K_b])**2, axis=0) if self.K_b > 0 else np.zeros(self.N) # 语义用户优先挑选最好的子载波 / Semantic users pick best subcarriers first all_subs = np.arange(self.N) sem_sorted = np.argsort(-sem_channel) sem_subs = sem_sorted[:n_sub_s] # 剩余子载波给传统用户 / Remaining subcarriers for traditional users remaining = np.setdiff1d(all_subs, sem_subs) if len(remaining) >= n_sub_b: trad_quality = trad_channel[remaining] best_idx = np.argsort(-trad_quality)[:n_sub_b] trad_subs = remaining[best_idx] else: trad_subs = remaining n_sub_b = len(trad_subs) # 3. 功率分配 (组内均分) / Power allocation (equal within group) noise_power = self.channel_model.noise_power # 分配矩阵和功率矩阵 / Allocation and power matrices alloc_matrix = np.zeros((self.K, self.N)) power_matrix = np.zeros((self.K, self.N)) # 在 K_s 个用户中循环分配语义子载波 / Distribute semantic subcarriers among K_s users round-robin for i, k in enumerate(range(self.K_b, self.K)): user_subs = sem_subs[i::max(1, self.K_s)] if len(user_subs) > 0: alloc_matrix[k, user_subs] = 1 power_matrix[k, user_subs] = p_s / max(n_sub_s, 1) # 在 K_b 个用户中循环分配传统子载波 / Distribute traditional subcarriers among K_b users round-robin for i, k in enumerate(range(0, self.K_b)): user_subs = trad_subs[i::max(1, self.K_b)] if len(user_subs) > 0: alloc_matrix[k, user_subs] = 1 power_matrix[k, user_subs] = p_b / max(n_sub_b, 1) # 4. 计算 SNR / Compute SNR - Eq.(8) snr_matrix = self.channel_model.compute_snr(self.channel_gains, power_matrix, noise_power) # 5. 计算每个用户的 QoE / Compute QoE for each user qoe_list = [] rates = [] ssim_values = [] # 传统用户 QoE 计算 / Traditional users QoE computation - Eq.(QoE_b) for k in range(self.K_b): user_subs = np.where(alloc_matrix[k] > 0)[0] if len(user_subs) == 0: rate_k = 0.0 else: # R_k = Σ α * Δf * log2(1 + γ) / R_k = Σ α * Δf * log2(1 + γ) rate_k = np.sum(self.delta_f * np.log2(1 + snr_matrix[k, user_subs])) rates.append(rate_k) # QoE_b = min(R_k / R_req, 1) / QoE_b = min(R_k / R_req, 1) qoe_k = min(rate_k / self.R_req, 1.0) qoe_list.append(qoe_k) # 语义用户 QoE 计算 / Semantic users QoE computation - Eq.(QoE_s) for k in range(self.K_b, self.K): user_subs = np.where(alloc_matrix[k] > 0)[0] if len(user_subs) == 0: ssim_k = 0.0 else: avg_snr = np.mean(snr_matrix[k, user_subs]) # 计算语义相似度 / Compute SSim - Eq. (SSim) ssim_k = self.semantic_module.compute_ssim(avg_snr, rho) ssim_values.append(float(ssim_k)) # 计算语义 QoE / Compute semantic QoE qoe_k = self.semantic_module.compute_semantic_qoe(ssim_k, rho, self.w1, self.w2, self.rho_max) qoe_list.append(qoe_k) # 6. 系统平均 QoE / System QoE qoe_sys = np.mean(qoe_list) if len(qoe_list) > 0 else 0.0 qoe_s = np.mean(qoe_list[self.K_b:]) if self.K_s > 0 else 0.0 qoe_b = np.mean(qoe_list[:self.K_b]) if self.K_b > 0 else 0.0 # 更新滚动平均值 / Update rolling averages alpha_smooth = 0.1 self.qoe_avg_s = alpha_smooth * qoe_s + (1 - alpha_smooth) * self.qoe_avg_s self.qoe_avg_b = alpha_smooth * qoe_b + (1 - alpha_smooth) * self.qoe_avg_b # 记录当前分配比例 / Record current allocation ratios self.alloc_s = n_sub_s / self.N self.alloc_b = n_sub_b / self.N # 7. 为下一步生成新信道 (块衰落) / Regenerate channel for next step (block fading) self.channel_gains = self.channel_model.generate_channel(self.distances, self.N) # 8. 构造输出数据 / Build output obs_s = self._get_observation('semantic') obs_b = self._get_observation('traditional') done = (self.step_count >= self.max_steps) # 计算速率满足度 / Compute rate satisfaction for traditional users if len(rates) > 0: rate_satisfaction = float(np.mean([1.0 if r >= self.R_req else 0.0 for r in rates])) else: rate_satisfaction = 1.0 # 构造信息字典 / Construct info dictionary info = { 'qoe_semantic': qoe_s, 'qoe_traditional': qoe_b, 'qoe_sys': qoe_sys, 'qoe_list': qoe_list, 'rates': rates, 'ssim_values': ssim_values, 'rate_satisfaction': rate_satisfaction, 'rho': rho, 'n_sub_s': n_sub_s, 'n_sub_b': n_sub_b, } # 返回结果 (奖励值设为各自的平均 QoE) / Return results (rewards set to respective mean QoEs) return obs_s, obs_b, qoe_s, qoe_b, done, info