743 lines
27 KiB
Python
743 lines
27 KiB
Python
"""
|
||
Skill C: Strategist (宏观策略师)
|
||
=================================
|
||
职能:维护世界书、判断周期、更新叙事权重
|
||
|
||
输入:Narrative_JSON + 历史 WorldBook
|
||
输出:更新后的 World_Book
|
||
|
||
设计原则:
|
||
- 使用 LLM 进行宏观周期判断和叙事生命周期分析
|
||
- 结合规则引擎提供可解释的决策逻辑
|
||
- 支持 LLM 失败时的优雅降级
|
||
"""
|
||
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# 添加项目根目录到路径
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
from core.world_book import WorldBook, Narrative, create_narrative_id
|
||
from core.config import config, llm_call
|
||
from typing import Dict, List, Optional
|
||
from datetime import datetime
|
||
import json
|
||
|
||
|
||
class MacroStrategist:
|
||
"""
|
||
宏观策略师 - 负责 World Book 的智能维护
|
||
|
||
核心职责:
|
||
1. 接收 Analyst 的新闻分析结果
|
||
2. 使用 LLM 进行宏观周期和叙事生命周期判断
|
||
3. 智能更新或创建叙事对象
|
||
4. 应用时间衰减
|
||
5. 生成投资策略建议
|
||
"""
|
||
|
||
# LLM 系统提示词
|
||
MACRO_ANALYSIS_PROMPT = """你是一位资深的 A 股宏观策略分析师,擅长根据政策新闻判断市场周期变化。
|
||
|
||
你的核心任务是:
|
||
1. 分析当前新闻叙事对宏观环境的影响
|
||
2. 判断市场所处的周期阶段
|
||
3. 识别关键的宏观驱动因子
|
||
4. 给出策略配置建议
|
||
|
||
宏观周期状态:
|
||
- upward(上行):政策利好、流动性充裕、风险偏好上升
|
||
- downward(下行):政策收紧、流动性紧张、风险偏好下降
|
||
- neutral(中性):政策观望、流动性平稳
|
||
|
||
流动性状态:
|
||
- loose(宽松):降准降息、央行投放
|
||
- tight(紧张):加息、回笼资金
|
||
- neutral(中性)
|
||
|
||
政策风向:
|
||
- stimulus(刺激):财政扩张、产业扶持
|
||
- regulation(监管):整顿规范、去杠杆
|
||
- wait_and_see(观望)
|
||
|
||
请严格按照 JSON 格式返回分析结果。"""
|
||
|
||
def __init__(self, world_book: WorldBook, use_llm: bool = True):
|
||
"""
|
||
Args:
|
||
world_book: WorldBook 实例
|
||
use_llm: 是否使用 LLM 进行分析(默认 True)
|
||
"""
|
||
self.wb = world_book
|
||
self.macro_matrix = self._load_macro_matrix()
|
||
self.use_llm = use_llm
|
||
|
||
if self.use_llm:
|
||
print("[Strategist] ✅ LLM 模式启用 - 将使用大模型进行智能策略分析")
|
||
else:
|
||
print("[Strategist] ⚠️ 降级模式 - 将使用规则引擎")
|
||
|
||
def _load_macro_matrix(self) -> Dict:
|
||
"""加载宏观逻辑矩阵"""
|
||
matrix_path = Path("core") / "macro_matrix.json"
|
||
if not matrix_path.exists():
|
||
print("[Strategist] 警告: 未找到 macro_matrix.json")
|
||
return {}
|
||
|
||
with open(matrix_path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
|
||
def process_narrative_json(
|
||
self,
|
||
narrative_json: Dict
|
||
) -> None:
|
||
"""
|
||
处理 Analyst 输出的 Narrative_JSON
|
||
|
||
逻辑:
|
||
1. 遍历所有叙事
|
||
2. 如果是新叙事 → 创建
|
||
3. 如果已存在 → 强化(boost)
|
||
4. 更新生命周期阶段
|
||
|
||
Args:
|
||
narrative_json: Analyst.generate_narrative_json() 的输出
|
||
"""
|
||
narratives = narrative_json.get('narratives', [])
|
||
print(f"[Strategist] 处理 {len(narratives)} 个叙事...")
|
||
|
||
for narrative_data in narratives:
|
||
topic = narrative_data['topic']
|
||
|
||
# 检查是否已存在
|
||
existing_narrative = self.wb.get_narrative_by_topic(topic)
|
||
|
||
if existing_narrative:
|
||
# 已存在 → 强化
|
||
self._boost_narrative(existing_narrative, narrative_data)
|
||
else:
|
||
# 新叙事 → 创建
|
||
self._create_narrative(narrative_data)
|
||
|
||
# 推导宏观因子向量
|
||
self.infer_macro_factor_vector(narrative_json)
|
||
|
||
print(f"[Strategist] 叙事处理完成,当前活跃叙事数: {len(self.wb.active_narratives)}")
|
||
|
||
def _create_narrative(self, narrative_data: Dict) -> None:
|
||
"""创建新叙事对象"""
|
||
topic = narrative_data['topic']
|
||
|
||
# 生成唯一 ID
|
||
narrative_id = create_narrative_id(topic)
|
||
|
||
# 判断衰减系数(Level A 新闻衰减慢)
|
||
decay_factor = 0.93 if narrative_data['level_a_count'] > 0 else 0.95
|
||
|
||
# 创建 Narrative 对象
|
||
narrative = Narrative(
|
||
id=narrative_id,
|
||
topic=topic,
|
||
related_etfs=narrative_data['related_etfs'],
|
||
lifecycle_stage=narrative_data['lifecycle_stage'],
|
||
base_score=narrative_data['max_score'],
|
||
decay_factor=decay_factor,
|
||
current_weight=narrative_data['avg_score']
|
||
)
|
||
|
||
self.wb.add_narrative(narrative)
|
||
print(f"[Strategist] 创建新叙事: {topic} (评分: {narrative.current_weight})")
|
||
|
||
def _boost_narrative(
|
||
self,
|
||
existing_narrative: Narrative,
|
||
narrative_data: Dict
|
||
) -> None:
|
||
"""强化已有叙事"""
|
||
topic = narrative_data['topic']
|
||
new_score = narrative_data['avg_score']
|
||
|
||
# 使用 Narrative 的 boost 方法
|
||
existing_narrative.boost(new_score)
|
||
|
||
# 如果有 Level A 新闻,可能需要升级生命周期
|
||
if narrative_data['level_a_count'] > 0:
|
||
if existing_narrative.lifecycle_stage == "fermentation":
|
||
existing_narrative.update_stage("realization")
|
||
|
||
print(f"[Strategist] 强化叙事: {topic} → {existing_narrative.current_weight:.2f}")
|
||
|
||
def daily_maintenance(self) -> Dict:
|
||
"""
|
||
每日维护任务
|
||
|
||
执行:
|
||
1. 对所有叙事应用时间衰减
|
||
2. 移除衰退的叙事
|
||
3. 检测生命周期阶段变化
|
||
|
||
Returns:
|
||
维护报告
|
||
"""
|
||
print("[Strategist] 执行每日维护...")
|
||
|
||
# 1. 应用衰减
|
||
self.wb.decay_all_narratives()
|
||
|
||
# 2. 检测阶段降级
|
||
for narrative in self.wb.active_narratives.values():
|
||
self._check_stage_downgrade(narrative)
|
||
|
||
# 3. 移除低权重叙事
|
||
removed = self.wb.remove_weak_narratives(threshold=10.0)
|
||
|
||
# 4. 生成报告
|
||
report = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"total_narratives": len(self.wb.active_narratives),
|
||
"removed_narratives": len(removed),
|
||
"top_narratives": [
|
||
{"topic": n.topic, "weight": n.current_weight}
|
||
for n in self.wb.get_top_narratives(5)
|
||
]
|
||
}
|
||
|
||
print(f"[Strategist] 维护完成,移除 {len(removed)} 个衰退叙事")
|
||
return report
|
||
|
||
def _check_stage_downgrade(self, narrative: Narrative) -> None:
|
||
"""检测叙事是否需要降级生命周期"""
|
||
current_stage = narrative.lifecycle_stage
|
||
weight = narrative.current_weight
|
||
|
||
# 权重低于阈值 → 进入衰退期
|
||
if weight < 30 and current_stage != "decay":
|
||
narrative.update_stage("decay")
|
||
print(f"[Strategist] 叙事进入衰退期: {narrative.topic}")
|
||
|
||
# realization → fermentation
|
||
elif weight < 60 and current_stage == "realization":
|
||
narrative.update_stage("fermentation")
|
||
|
||
def detect_macro_cycle_change(
|
||
self,
|
||
narrative_json: Dict,
|
||
market_data: Optional[Dict] = None
|
||
) -> Optional[Dict]:
|
||
"""
|
||
检测宏观周期变化(核心方法)
|
||
|
||
优先使用 LLM 进行智能分析,失败时降级到规则引擎
|
||
|
||
Args:
|
||
narrative_json: Analyst 输出
|
||
market_data: Quant 输出(可选)
|
||
|
||
Returns:
|
||
宏观周期变化建议(如有)
|
||
"""
|
||
# 检查是否有 Level A 新闻
|
||
level_a_count = sum(
|
||
n.get('level_a_count', 0)
|
||
for n in narrative_json.get('narratives', [])
|
||
)
|
||
|
||
if level_a_count == 0:
|
||
print("[Strategist] 无 Level A 新闻,跳过宏观周期检测")
|
||
return None
|
||
|
||
print(f"[Strategist] 检测到 {level_a_count} 条 Level A 新闻,分析宏观周期...")
|
||
|
||
if self.use_llm:
|
||
result = self._detect_cycle_with_llm(narrative_json, market_data)
|
||
if result:
|
||
return result
|
||
print("[Strategist] ⚠️ LLM 分析失败,降级到规则引擎")
|
||
|
||
# 降级到规则引擎
|
||
return self._detect_cycle_with_rules(narrative_json)
|
||
|
||
def _detect_cycle_with_llm(
|
||
self,
|
||
narrative_json: Dict,
|
||
market_data: Optional[Dict]
|
||
) -> Optional[Dict]:
|
||
"""使用 LLM 分析宏观周期变化"""
|
||
# 构建分析上下文
|
||
narratives_summary = []
|
||
for n in narrative_json.get('narratives', []):
|
||
topic_name = n.get('topic_name', n.get('topic', ''))
|
||
narratives_summary.append({
|
||
"topic": topic_name,
|
||
"score": n.get('max_score', 0),
|
||
"level_a": n.get('level_a_count', 0) > 0,
|
||
"sentiment": n.get('overall_sentiment', 'neutral'),
|
||
"signals": n.get('key_signals', [])[:2]
|
||
})
|
||
|
||
# 当前宏观状态
|
||
current_macro = self.wb.macro_cycle.to_dict()
|
||
|
||
prompt = f"""请分析以下市场叙事信息,判断宏观周期是否发生变化:
|
||
|
||
【当前宏观状态】
|
||
- 市场周期: {current_macro['status']}
|
||
- 流动性: {current_macro['liquidity']}
|
||
- 政策风向: {current_macro['policy_wind']}
|
||
|
||
【最新叙事信息】
|
||
{json.dumps(narratives_summary, ensure_ascii=False, indent=2)}
|
||
|
||
请分析后以 JSON 格式返回:
|
||
{{
|
||
"cycle_changed": true/false,
|
||
"new_status": "upward/downward/neutral",
|
||
"new_liquidity": "loose/tight/neutral",
|
||
"new_policy_wind": "stimulus/regulation/wait_and_see",
|
||
"reason": "变化原因(50字内)",
|
||
"confidence": 0-100,
|
||
"key_factors": ["驱动因子1", "驱动因子2"]
|
||
}}
|
||
|
||
如果宏观环境未发生明显变化,cycle_changed 设为 false。"""
|
||
|
||
try:
|
||
llm_output = llm_call(
|
||
messages=[
|
||
{"role": "system", "content": self.MACRO_ANALYSIS_PROMPT},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
temperature=0.3,
|
||
max_tokens=500
|
||
)
|
||
|
||
if not llm_output:
|
||
return None
|
||
|
||
# 清理并解析 JSON
|
||
import re
|
||
llm_output = llm_output.strip()
|
||
if llm_output.startswith("```"):
|
||
llm_output = re.sub(r'^```(?:json)?\s*', '', llm_output)
|
||
llm_output = re.sub(r'\s*```$', '', llm_output)
|
||
|
||
result = json.loads(llm_output)
|
||
|
||
if not result.get("cycle_changed", False):
|
||
print(f"[Strategist] 🤖 LLM 判断:宏观周期无明显变化")
|
||
return None
|
||
|
||
# 验证字段
|
||
valid_status = ["upward", "downward", "neutral"]
|
||
valid_liquidity = ["loose", "tight", "neutral"]
|
||
valid_policy = ["stimulus", "regulation", "wait_and_see"]
|
||
|
||
new_status = result.get("new_status", "neutral")
|
||
new_liquidity = result.get("new_liquidity", "neutral")
|
||
new_policy = result.get("new_policy_wind", "wait_and_see")
|
||
|
||
if new_status not in valid_status:
|
||
new_status = "neutral"
|
||
if new_liquidity not in valid_liquidity:
|
||
new_liquidity = "neutral"
|
||
if new_policy not in valid_policy:
|
||
new_policy = "wait_and_see"
|
||
|
||
print(f"[Strategist] 🤖 LLM 判断:{result.get('reason', '')}")
|
||
print(f"[Strategist] 🤖 置信度: {result.get('confidence', 0)}%")
|
||
|
||
return {
|
||
"status": new_status,
|
||
"liquidity": new_liquidity,
|
||
"policy_wind": new_policy,
|
||
"reason": result.get("reason", "LLM 分析结果"),
|
||
"key_factors": result.get("key_factors", []),
|
||
"analysis_method": "llm"
|
||
}
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f"[Strategist] LLM 返回格式解析失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"[Strategist] LLM 分析异常: {e}")
|
||
return None
|
||
|
||
def _detect_cycle_with_rules(self, narrative_json: Dict) -> Optional[Dict]:
|
||
"""规则引擎检测宏观周期变化(降级方案)"""
|
||
# 检测关键词模式
|
||
narratives_text = " ".join([
|
||
n.get('topic', '') + " " + " ".join(n.get('key_signals', []))
|
||
for n in narrative_json.get('narratives', [])
|
||
])
|
||
|
||
# 货币宽松信号
|
||
if any(kw in narratives_text for kw in ["降准", "降息", "宽松", "流动性"]):
|
||
return {
|
||
"status": "upward",
|
||
"liquidity": "loose",
|
||
"policy_wind": "stimulus",
|
||
"reason": "货币政策宽松,市场流动性改善",
|
||
"key_factors": ["降准/降息"],
|
||
"analysis_method": "rules"
|
||
}
|
||
|
||
# 监管收紧信号
|
||
if any(kw in narratives_text for kw in ["监管", "整顿", "规范", "收紧"]):
|
||
return {
|
||
"status": "downward",
|
||
"liquidity": "tight",
|
||
"policy_wind": "regulation",
|
||
"reason": "监管政策收紧,风险偏好下降",
|
||
"key_factors": ["监管收紧"],
|
||
"analysis_method": "rules"
|
||
}
|
||
|
||
# 财政刺激信号
|
||
if any(kw in narratives_text for kw in ["刺激", "万亿", "基建", "财政"]):
|
||
return {
|
||
"status": "upward",
|
||
"liquidity": "loose",
|
||
"policy_wind": "stimulus",
|
||
"reason": "财政政策积极,经济预期改善",
|
||
"key_factors": ["财政刺激"],
|
||
"analysis_method": "rules"
|
||
}
|
||
|
||
return None
|
||
|
||
def apply_macro_cycle_change(self, cycle_change: Dict) -> None:
|
||
"""应用宏观周期变化"""
|
||
self.wb.update_macro_cycle(
|
||
status=cycle_change.get('status'),
|
||
liquidity=cycle_change.get('liquidity'),
|
||
policy_wind=cycle_change.get('policy_wind')
|
||
)
|
||
|
||
def infer_macro_factor_vector(self, narrative_json: Dict) -> Dict[str, float]:
|
||
"""
|
||
根据新闻叙事推导宏观因子向量(核心方法)
|
||
|
||
优先使用 LLM 进行智能推导,失败时降级到规则引擎。
|
||
|
||
这是核心方法,将新闻内容映射到宏观因子空间,供 PM 进行向量点积计算。
|
||
|
||
Args:
|
||
narrative_json: Analyst 输出的叙事 JSON
|
||
|
||
Returns:
|
||
宏观因子向量,例如:
|
||
{
|
||
"interest_rate_down": 1.0,
|
||
"policy_digital_economy": 0.8,
|
||
"geopolitics_tension": 0.5
|
||
}
|
||
"""
|
||
print("[Strategist] 推导宏观因子向量...")
|
||
|
||
if self.use_llm:
|
||
result = self._infer_factors_with_llm(narrative_json)
|
||
if result:
|
||
self.wb.update_macro_factor_vector(result)
|
||
return result
|
||
print("[Strategist] ⚠️ LLM 推导失败,降级到规则引擎")
|
||
|
||
# 降级到规则引擎
|
||
result = self._infer_factors_with_rules(narrative_json)
|
||
self.wb.update_macro_factor_vector(result)
|
||
return result
|
||
|
||
def _infer_factors_with_llm(self, narrative_json: Dict) -> Optional[Dict[str, float]]:
|
||
"""使用 LLM 推导宏观因子向量"""
|
||
# 准备叙事摘要
|
||
narratives_summary = []
|
||
for n in narrative_json.get('narratives', []):
|
||
narratives_summary.append({
|
||
"topic": n.get('topic_name', n.get('topic', '')),
|
||
"score": n.get('max_score', 0),
|
||
"level_a": n.get('level_a_count', 0) > 0,
|
||
"sentiment": n.get('overall_sentiment', 'neutral'),
|
||
"key_signals": n.get('key_signals', [])
|
||
})
|
||
|
||
# 可用因子列表
|
||
available_factors = [
|
||
"interest_rate_down", "liquidity_easing", "cpi_rebound",
|
||
"risk_on", "risk_off", "policy_tech_self_reliance",
|
||
"policy_digital_economy", "policy_new_infra", "policy_capital_market",
|
||
"policy_soe_reform", "policy_low_altitude", "policy_food_security",
|
||
"policy_public_health", "policy_platform_economy", "consumption_stimulus",
|
||
"govt_spending", "geopolitics_tension", "currency_rmb_depreciation",
|
||
"export_growth", "fed_rate_cut", "dollar_index_down",
|
||
"inflation_expectations", "oil_price_up", "market_volume_spike",
|
||
"tech_cycle_up", "demographic_trend", "foreign_inflow"
|
||
]
|
||
|
||
prompt = f"""请根据以下市场叙事信息,推导当前活跃的宏观因子及其强度。
|
||
|
||
【叙事信息】
|
||
{json.dumps(narratives_summary, ensure_ascii=False, indent=2)}
|
||
|
||
【可用宏观因子】
|
||
{', '.join(available_factors)}
|
||
|
||
请分析新闻内容,判断哪些宏观因子被触发,并给出强度评分(0.0-1.0)。
|
||
强度参考:
|
||
- 1.0: 政策明确、信号强烈(如央行降准公告)
|
||
- 0.7-0.9: 政策导向明确(如部委发文)
|
||
- 0.4-0.6: 信号中等(如行业动态)
|
||
- 0.1-0.3: 信号微弱(如市场传闻)
|
||
|
||
以 JSON 格式返回(仅包含被触发的因子):
|
||
{{
|
||
"factors": {{
|
||
"factor_name": 0.8,
|
||
"factor_name2": 0.5
|
||
}},
|
||
"reasoning": "简要说明推导逻辑(50字内)"
|
||
}}"""
|
||
|
||
try:
|
||
llm_output = llm_call(
|
||
messages=[
|
||
{"role": "system", "content": "你是一位宏观经济分析师,擅长从新闻中提取宏观经济信号。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
temperature=0.3,
|
||
max_tokens=600
|
||
)
|
||
|
||
if not llm_output:
|
||
return None
|
||
|
||
# 清理并解析 JSON
|
||
import re
|
||
llm_output = llm_output.strip()
|
||
if llm_output.startswith("```"):
|
||
llm_output = re.sub(r'^```(?:json)?\s*', '', llm_output)
|
||
llm_output = re.sub(r'\s*```$', '', llm_output)
|
||
|
||
result = json.loads(llm_output)
|
||
factors = result.get("factors", {})
|
||
reasoning = result.get("reasoning", "")
|
||
|
||
# 验证因子名称和强度
|
||
valid_factors = {}
|
||
for factor_name, strength in factors.items():
|
||
if factor_name in available_factors:
|
||
strength = float(strength)
|
||
strength = max(0.0, min(1.0, strength))
|
||
valid_factors[factor_name] = round(strength, 2)
|
||
|
||
if valid_factors:
|
||
print(f"[Strategist] 🤖 LLM 推导出 {len(valid_factors)} 个宏观因子")
|
||
print(f"[Strategist] 🤖 推导逻辑: {reasoning}")
|
||
for factor, strength in sorted(valid_factors.items(), key=lambda x: -x[1])[:5]:
|
||
print(f" 📊 {factor}: {strength}")
|
||
|
||
return valid_factors
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f"[Strategist] LLM 返回格式解析失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"[Strategist] LLM 推导异常: {e}")
|
||
return None
|
||
|
||
def _infer_factors_with_rules(self, narrative_json: Dict) -> Dict[str, float]:
|
||
"""规则引擎推导宏观因子向量(降级方案)"""
|
||
factor_vector: Dict[str, float] = {}
|
||
|
||
# 宏观因子关键词映射表
|
||
factor_keyword_map = {
|
||
"interest_rate_down": ["降息", "降准", "MLF下调", "LPR下调", "利率下行"],
|
||
"liquidity_easing": ["流动性", "宽松", "货币政策", "注入流动性"],
|
||
"cpi_rebound": ["CPI", "通胀", "物价上涨", "消费复苏"],
|
||
"risk_on": ["牛市", "突破", "成交量放大", "风险偏好"],
|
||
"risk_off": ["暴跌", "恐慌", "避险", "黑天鹅"],
|
||
"policy_tech_self_reliance": ["自主可控", "国产替代", "卡脖子", "芯片", "半导体"],
|
||
"policy_digital_economy": ["数字经济", "数据要素", "信创", "AI", "算力"],
|
||
"policy_new_infra": ["新基建", "5G", "6G", "东数西算", "算力底座"],
|
||
"policy_capital_market": ["资本市场", "印花税", "T+0", "金融强国"],
|
||
"policy_soe_reform": ["国企改革", "中特估", "红利"],
|
||
"policy_low_altitude": ["低空经济", "无人机", "eVTOL"],
|
||
"policy_food_security": ["粮食安全", "一号文件", "种业"],
|
||
"policy_public_health": ["医疗", "医保", "创新药", "老龄化"],
|
||
"consumption_stimulus": ["消费券", "汽车下乡", "家电补贴", "内需"],
|
||
"govt_spending": ["财政支出", "专项债", "万亿", "基建投资"],
|
||
"geopolitics_tension": ["地缘", "制裁", "冲突", "台海", "中美"],
|
||
"export_growth": ["出口", "外贸", "订单", "海外需求"],
|
||
"market_volume_spike": ["成交量", "放量", "天量"],
|
||
}
|
||
|
||
# 收集所有文本
|
||
all_text = ""
|
||
level_a_count = 0
|
||
level_b_count = 0
|
||
|
||
for narrative in narrative_json.get('narratives', []):
|
||
all_text += f" {narrative.get('topic', '')} "
|
||
all_text += " ".join(narrative.get('key_signals', []))
|
||
level_a_count += narrative.get('level_a_count', 0)
|
||
level_b_count += narrative.get('level_b_count', 0)
|
||
|
||
# 扫描关键词
|
||
for factor_name, keywords in factor_keyword_map.items():
|
||
match_count = sum(1 for kw in keywords if kw in all_text)
|
||
|
||
if match_count > 0:
|
||
if level_a_count > 0:
|
||
strength = min(1.0, 0.8 + match_count * 0.1)
|
||
elif level_b_count > 0:
|
||
strength = min(0.9, 0.5 + match_count * 0.1)
|
||
else:
|
||
strength = min(0.6, 0.3 + match_count * 0.1)
|
||
|
||
factor_vector[factor_name] = round(strength, 2)
|
||
|
||
print(f"[Strategist] 📏 规则引擎推导出 {len(factor_vector)} 个宏观因子")
|
||
return factor_vector
|
||
|
||
def calculate_macro_impact_on_etf(
|
||
self,
|
||
etf_code: str,
|
||
macro_factor: str
|
||
) -> float:
|
||
"""
|
||
计算宏观因子对特定 ETF 的影响
|
||
|
||
Args:
|
||
etf_code: ETF 代码
|
||
macro_factor: 宏观因子名称(如 "rate_cut")
|
||
|
||
Returns:
|
||
影响值 (-10 到 +10)
|
||
"""
|
||
# 从宏观矩阵查询
|
||
factor_data = self.macro_matrix.get("macro_factors", {}).get(macro_factor, {})
|
||
impact_map = factor_data.get("impact", {})
|
||
|
||
# 查找 ETF 对应的类别
|
||
etf_mapping = self.macro_matrix.get("etf_mapping", {})
|
||
|
||
for etf_type, codes in etf_mapping.items():
|
||
if etf_code in codes:
|
||
return impact_map.get(etf_type, 0)
|
||
|
||
return 0
|
||
|
||
def generate_world_book_snapshot(self) -> Dict:
|
||
"""生成 World Book 快照(供 PM 使用)"""
|
||
snapshot = self.wb.export_snapshot()
|
||
|
||
# 添加策略师视角的元数据
|
||
snapshot["strategist_view"] = {
|
||
"strong_narratives": [
|
||
n.topic for n in self.wb.get_top_narratives(3)
|
||
],
|
||
"macro_recommendation": self._get_macro_recommendation(),
|
||
"risk_level": self._assess_risk_level()
|
||
}
|
||
|
||
return snapshot
|
||
|
||
def _get_macro_recommendation(self) -> str:
|
||
"""生成宏观建议"""
|
||
cycle = self.wb.macro_cycle
|
||
|
||
if cycle.status == "upward" and cycle.liquidity == "loose":
|
||
return "aggressive" # 激进配置
|
||
elif cycle.status == "downward" or cycle.policy_wind == "regulation":
|
||
return "defensive" # 防守配置
|
||
else:
|
||
return "balanced" # 平衡配置
|
||
|
||
def _assess_risk_level(self) -> str:
|
||
"""评估市场风险等级"""
|
||
# 基于叙事数量和宏观状态
|
||
narrative_count = len(self.wb.active_narratives)
|
||
cycle = self.wb.macro_cycle
|
||
|
||
if cycle.status == "downward" or narrative_count < 3:
|
||
return "high"
|
||
elif cycle.status == "upward" and narrative_count >= 5:
|
||
return "low"
|
||
else:
|
||
return "medium"
|
||
|
||
|
||
# ==================== 测试代码 ====================
|
||
|
||
if __name__ == "__main__":
|
||
print("=" * 50)
|
||
print("Skill C: Strategist 宏观策略师测试")
|
||
print("=" * 50)
|
||
|
||
# 创建 WorldBook 和 Strategist
|
||
wb = WorldBook(data_dir="data")
|
||
strategist = MacroStrategist(wb)
|
||
|
||
# 模拟 Analyst 输出
|
||
mock_narrative_json = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"total_news": 3,
|
||
"narratives": [
|
||
{
|
||
"topic": "低空经济",
|
||
"news_count": 2,
|
||
"avg_score": 82.0,
|
||
"max_score": 90.0,
|
||
"level_a_count": 0,
|
||
"related_etfs": ["512980", "159969"],
|
||
"lifecycle_stage": "fermentation"
|
||
},
|
||
{
|
||
"topic": "AI算力",
|
||
"news_count": 1,
|
||
"avg_score": 75.0,
|
||
"max_score": 75.0,
|
||
"level_a_count": 0,
|
||
"related_etfs": ["515980", "159813"],
|
||
"lifecycle_stage": "fermentation"
|
||
}
|
||
]
|
||
}
|
||
|
||
# 1. 处理叙事 JSON
|
||
print("\n1. 处理叙事 JSON:")
|
||
strategist.process_narrative_json(mock_narrative_json)
|
||
|
||
# 2. 显示当前 World Book 状态
|
||
print("\n2. 当前 World Book 状态:")
|
||
print(f" 活跃叙事数: {len(wb.active_narratives)}")
|
||
print(f" 宏观周期: {wb.macro_cycle.to_dict()}")
|
||
|
||
# 3. 执行每日维护
|
||
print("\n3. 执行每日维护:")
|
||
maintenance_report = strategist.daily_maintenance()
|
||
print(f" 维护报告: {json.dumps(maintenance_report, ensure_ascii=False, indent=2)}")
|
||
|
||
# 4. 检测宏观周期变化(模拟降准新闻)
|
||
print("\n4. 检测宏观周期变化:")
|
||
mock_news_with_policy = {
|
||
"narratives": [
|
||
{"topic": "央行降准", "level_a_count": 1, "avg_score": 95.0}
|
||
]
|
||
}
|
||
|
||
cycle_change = strategist.detect_macro_cycle_change(mock_news_with_policy)
|
||
if cycle_change:
|
||
print(f" 检测到周期变化: {cycle_change['reason']}")
|
||
strategist.apply_macro_cycle_change(cycle_change)
|
||
|
||
# 5. 生成快照
|
||
print("\n5. 生成 World Book 快照:")
|
||
snapshot = strategist.generate_world_book_snapshot()
|
||
print(f" 策略建议: {snapshot['strategist_view']['macro_recommendation']}")
|
||
print(f" 风险等级: {snapshot['strategist_view']['risk_level']}")
|
||
|
||
# 6. 保存 World Book
|
||
wb.save()
|
||
|
||
print("\n✅ Strategist 模块测试完成")
|