MND-IA/skills/strategist.py
2025-12-31 19:58:09 +08:00

743 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Skill C: Strategist (宏观策略师)
=================================
职能:维护世界书、判断周期、更新叙事权重
输入Narrative_JSON + 历史 WorldBook
输出:更新后的 World_Book
设计原则:
- 使用 LLM 进行宏观周期判断和叙事生命周期分析
- 结合规则引擎提供可解释的决策逻辑
- 支持 LLM 失败时的优雅降级
"""
import sys
from pathlib import Path
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.world_book import WorldBook, Narrative, create_narrative_id
from core.config import config, llm_call
from typing import Dict, List, Optional
from datetime import datetime
import json
class MacroStrategist:
"""
宏观策略师 - 负责 World Book 的智能维护
核心职责:
1. 接收 Analyst 的新闻分析结果
2. 使用 LLM 进行宏观周期和叙事生命周期判断
3. 智能更新或创建叙事对象
4. 应用时间衰减
5. 生成投资策略建议
"""
# LLM 系统提示词
MACRO_ANALYSIS_PROMPT = """你是一位资深的 A 股宏观策略分析师,擅长根据政策新闻判断市场周期变化。
你的核心任务是:
1. 分析当前新闻叙事对宏观环境的影响
2. 判断市场所处的周期阶段
3. 识别关键的宏观驱动因子
4. 给出策略配置建议
宏观周期状态:
- upward上行政策利好、流动性充裕、风险偏好上升
- downward下行政策收紧、流动性紧张、风险偏好下降
- neutral中性政策观望、流动性平稳
流动性状态:
- loose宽松降准降息、央行投放
- tight紧张加息、回笼资金
- neutral中性
政策风向:
- stimulus刺激财政扩张、产业扶持
- regulation监管整顿规范、去杠杆
- wait_and_see观望
请严格按照 JSON 格式返回分析结果。"""
def __init__(self, world_book: WorldBook, use_llm: bool = True):
"""
Args:
world_book: WorldBook 实例
use_llm: 是否使用 LLM 进行分析(默认 True
"""
self.wb = world_book
self.macro_matrix = self._load_macro_matrix()
self.use_llm = use_llm
if self.use_llm:
print("[Strategist] ✅ LLM 模式启用 - 将使用大模型进行智能策略分析")
else:
print("[Strategist] ⚠️ 降级模式 - 将使用规则引擎")
def _load_macro_matrix(self) -> Dict:
"""加载宏观逻辑矩阵"""
matrix_path = Path("core") / "macro_matrix.json"
if not matrix_path.exists():
print("[Strategist] 警告: 未找到 macro_matrix.json")
return {}
with open(matrix_path, 'r', encoding='utf-8') as f:
return json.load(f)
def process_narrative_json(
self,
narrative_json: Dict
) -> None:
"""
处理 Analyst 输出的 Narrative_JSON
逻辑:
1. 遍历所有叙事
2. 如果是新叙事 → 创建
3. 如果已存在 → 强化boost
4. 更新生命周期阶段
Args:
narrative_json: Analyst.generate_narrative_json() 的输出
"""
narratives = narrative_json.get('narratives', [])
print(f"[Strategist] 处理 {len(narratives)} 个叙事...")
for narrative_data in narratives:
topic = narrative_data['topic']
# 检查是否已存在
existing_narrative = self.wb.get_narrative_by_topic(topic)
if existing_narrative:
# 已存在 → 强化
self._boost_narrative(existing_narrative, narrative_data)
else:
# 新叙事 → 创建
self._create_narrative(narrative_data)
# 推导宏观因子向量
self.infer_macro_factor_vector(narrative_json)
print(f"[Strategist] 叙事处理完成,当前活跃叙事数: {len(self.wb.active_narratives)}")
def _create_narrative(self, narrative_data: Dict) -> None:
"""创建新叙事对象"""
topic = narrative_data['topic']
# 生成唯一 ID
narrative_id = create_narrative_id(topic)
# 判断衰减系数Level A 新闻衰减慢)
decay_factor = 0.93 if narrative_data['level_a_count'] > 0 else 0.95
# 创建 Narrative 对象
narrative = Narrative(
id=narrative_id,
topic=topic,
related_etfs=narrative_data['related_etfs'],
lifecycle_stage=narrative_data['lifecycle_stage'],
base_score=narrative_data['max_score'],
decay_factor=decay_factor,
current_weight=narrative_data['avg_score']
)
self.wb.add_narrative(narrative)
print(f"[Strategist] 创建新叙事: {topic} (评分: {narrative.current_weight})")
def _boost_narrative(
self,
existing_narrative: Narrative,
narrative_data: Dict
) -> None:
"""强化已有叙事"""
topic = narrative_data['topic']
new_score = narrative_data['avg_score']
# 使用 Narrative 的 boost 方法
existing_narrative.boost(new_score)
# 如果有 Level A 新闻,可能需要升级生命周期
if narrative_data['level_a_count'] > 0:
if existing_narrative.lifecycle_stage == "fermentation":
existing_narrative.update_stage("realization")
print(f"[Strategist] 强化叙事: {topic}{existing_narrative.current_weight:.2f}")
def daily_maintenance(self) -> Dict:
"""
每日维护任务
执行:
1. 对所有叙事应用时间衰减
2. 移除衰退的叙事
3. 检测生命周期阶段变化
Returns:
维护报告
"""
print("[Strategist] 执行每日维护...")
# 1. 应用衰减
self.wb.decay_all_narratives()
# 2. 检测阶段降级
for narrative in self.wb.active_narratives.values():
self._check_stage_downgrade(narrative)
# 3. 移除低权重叙事
removed = self.wb.remove_weak_narratives(threshold=10.0)
# 4. 生成报告
report = {
"timestamp": datetime.now().isoformat(),
"total_narratives": len(self.wb.active_narratives),
"removed_narratives": len(removed),
"top_narratives": [
{"topic": n.topic, "weight": n.current_weight}
for n in self.wb.get_top_narratives(5)
]
}
print(f"[Strategist] 维护完成,移除 {len(removed)} 个衰退叙事")
return report
def _check_stage_downgrade(self, narrative: Narrative) -> None:
"""检测叙事是否需要降级生命周期"""
current_stage = narrative.lifecycle_stage
weight = narrative.current_weight
# 权重低于阈值 → 进入衰退期
if weight < 30 and current_stage != "decay":
narrative.update_stage("decay")
print(f"[Strategist] 叙事进入衰退期: {narrative.topic}")
# realization → fermentation
elif weight < 60 and current_stage == "realization":
narrative.update_stage("fermentation")
def detect_macro_cycle_change(
self,
narrative_json: Dict,
market_data: Optional[Dict] = None
) -> Optional[Dict]:
"""
检测宏观周期变化(核心方法)
优先使用 LLM 进行智能分析,失败时降级到规则引擎
Args:
narrative_json: Analyst 输出
market_data: Quant 输出(可选)
Returns:
宏观周期变化建议(如有)
"""
# 检查是否有 Level A 新闻
level_a_count = sum(
n.get('level_a_count', 0)
for n in narrative_json.get('narratives', [])
)
if level_a_count == 0:
print("[Strategist] 无 Level A 新闻,跳过宏观周期检测")
return None
print(f"[Strategist] 检测到 {level_a_count} 条 Level A 新闻,分析宏观周期...")
if self.use_llm:
result = self._detect_cycle_with_llm(narrative_json, market_data)
if result:
return result
print("[Strategist] ⚠️ LLM 分析失败,降级到规则引擎")
# 降级到规则引擎
return self._detect_cycle_with_rules(narrative_json)
def _detect_cycle_with_llm(
self,
narrative_json: Dict,
market_data: Optional[Dict]
) -> Optional[Dict]:
"""使用 LLM 分析宏观周期变化"""
# 构建分析上下文
narratives_summary = []
for n in narrative_json.get('narratives', []):
topic_name = n.get('topic_name', n.get('topic', ''))
narratives_summary.append({
"topic": topic_name,
"score": n.get('max_score', 0),
"level_a": n.get('level_a_count', 0) > 0,
"sentiment": n.get('overall_sentiment', 'neutral'),
"signals": n.get('key_signals', [])[:2]
})
# 当前宏观状态
current_macro = self.wb.macro_cycle.to_dict()
prompt = f"""请分析以下市场叙事信息,判断宏观周期是否发生变化:
【当前宏观状态】
- 市场周期: {current_macro['status']}
- 流动性: {current_macro['liquidity']}
- 政策风向: {current_macro['policy_wind']}
【最新叙事信息】
{json.dumps(narratives_summary, ensure_ascii=False, indent=2)}
请分析后以 JSON 格式返回:
{{
"cycle_changed": true/false,
"new_status": "upward/downward/neutral",
"new_liquidity": "loose/tight/neutral",
"new_policy_wind": "stimulus/regulation/wait_and_see",
"reason": "变化原因50字内",
"confidence": 0-100,
"key_factors": ["驱动因子1", "驱动因子2"]
}}
如果宏观环境未发生明显变化cycle_changed 设为 false。"""
try:
llm_output = llm_call(
messages=[
{"role": "system", "content": self.MACRO_ANALYSIS_PROMPT},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=500
)
if not llm_output:
return None
# 清理并解析 JSON
import re
llm_output = llm_output.strip()
if llm_output.startswith("```"):
llm_output = re.sub(r'^```(?:json)?\s*', '', llm_output)
llm_output = re.sub(r'\s*```$', '', llm_output)
result = json.loads(llm_output)
if not result.get("cycle_changed", False):
print(f"[Strategist] 🤖 LLM 判断:宏观周期无明显变化")
return None
# 验证字段
valid_status = ["upward", "downward", "neutral"]
valid_liquidity = ["loose", "tight", "neutral"]
valid_policy = ["stimulus", "regulation", "wait_and_see"]
new_status = result.get("new_status", "neutral")
new_liquidity = result.get("new_liquidity", "neutral")
new_policy = result.get("new_policy_wind", "wait_and_see")
if new_status not in valid_status:
new_status = "neutral"
if new_liquidity not in valid_liquidity:
new_liquidity = "neutral"
if new_policy not in valid_policy:
new_policy = "wait_and_see"
print(f"[Strategist] 🤖 LLM 判断:{result.get('reason', '')}")
print(f"[Strategist] 🤖 置信度: {result.get('confidence', 0)}%")
return {
"status": new_status,
"liquidity": new_liquidity,
"policy_wind": new_policy,
"reason": result.get("reason", "LLM 分析结果"),
"key_factors": result.get("key_factors", []),
"analysis_method": "llm"
}
except json.JSONDecodeError as e:
print(f"[Strategist] LLM 返回格式解析失败: {e}")
return None
except Exception as e:
print(f"[Strategist] LLM 分析异常: {e}")
return None
def _detect_cycle_with_rules(self, narrative_json: Dict) -> Optional[Dict]:
"""规则引擎检测宏观周期变化(降级方案)"""
# 检测关键词模式
narratives_text = " ".join([
n.get('topic', '') + " " + " ".join(n.get('key_signals', []))
for n in narrative_json.get('narratives', [])
])
# 货币宽松信号
if any(kw in narratives_text for kw in ["降准", "降息", "宽松", "流动性"]):
return {
"status": "upward",
"liquidity": "loose",
"policy_wind": "stimulus",
"reason": "货币政策宽松,市场流动性改善",
"key_factors": ["降准/降息"],
"analysis_method": "rules"
}
# 监管收紧信号
if any(kw in narratives_text for kw in ["监管", "整顿", "规范", "收紧"]):
return {
"status": "downward",
"liquidity": "tight",
"policy_wind": "regulation",
"reason": "监管政策收紧,风险偏好下降",
"key_factors": ["监管收紧"],
"analysis_method": "rules"
}
# 财政刺激信号
if any(kw in narratives_text for kw in ["刺激", "万亿", "基建", "财政"]):
return {
"status": "upward",
"liquidity": "loose",
"policy_wind": "stimulus",
"reason": "财政政策积极,经济预期改善",
"key_factors": ["财政刺激"],
"analysis_method": "rules"
}
return None
def apply_macro_cycle_change(self, cycle_change: Dict) -> None:
"""应用宏观周期变化"""
self.wb.update_macro_cycle(
status=cycle_change.get('status'),
liquidity=cycle_change.get('liquidity'),
policy_wind=cycle_change.get('policy_wind')
)
def infer_macro_factor_vector(self, narrative_json: Dict) -> Dict[str, float]:
"""
根据新闻叙事推导宏观因子向量(核心方法)
优先使用 LLM 进行智能推导,失败时降级到规则引擎。
这是核心方法,将新闻内容映射到宏观因子空间,供 PM 进行向量点积计算。
Args:
narrative_json: Analyst 输出的叙事 JSON
Returns:
宏观因子向量,例如:
{
"interest_rate_down": 1.0,
"policy_digital_economy": 0.8,
"geopolitics_tension": 0.5
}
"""
print("[Strategist] 推导宏观因子向量...")
if self.use_llm:
result = self._infer_factors_with_llm(narrative_json)
if result:
self.wb.update_macro_factor_vector(result)
return result
print("[Strategist] ⚠️ LLM 推导失败,降级到规则引擎")
# 降级到规则引擎
result = self._infer_factors_with_rules(narrative_json)
self.wb.update_macro_factor_vector(result)
return result
def _infer_factors_with_llm(self, narrative_json: Dict) -> Optional[Dict[str, float]]:
"""使用 LLM 推导宏观因子向量"""
# 准备叙事摘要
narratives_summary = []
for n in narrative_json.get('narratives', []):
narratives_summary.append({
"topic": n.get('topic_name', n.get('topic', '')),
"score": n.get('max_score', 0),
"level_a": n.get('level_a_count', 0) > 0,
"sentiment": n.get('overall_sentiment', 'neutral'),
"key_signals": n.get('key_signals', [])
})
# 可用因子列表
available_factors = [
"interest_rate_down", "liquidity_easing", "cpi_rebound",
"risk_on", "risk_off", "policy_tech_self_reliance",
"policy_digital_economy", "policy_new_infra", "policy_capital_market",
"policy_soe_reform", "policy_low_altitude", "policy_food_security",
"policy_public_health", "policy_platform_economy", "consumption_stimulus",
"govt_spending", "geopolitics_tension", "currency_rmb_depreciation",
"export_growth", "fed_rate_cut", "dollar_index_down",
"inflation_expectations", "oil_price_up", "market_volume_spike",
"tech_cycle_up", "demographic_trend", "foreign_inflow"
]
prompt = f"""请根据以下市场叙事信息,推导当前活跃的宏观因子及其强度。
【叙事信息】
{json.dumps(narratives_summary, ensure_ascii=False, indent=2)}
【可用宏观因子】
{', '.join(available_factors)}
请分析新闻内容判断哪些宏观因子被触发并给出强度评分0.0-1.0)。
强度参考:
- 1.0: 政策明确、信号强烈(如央行降准公告)
- 0.7-0.9: 政策导向明确(如部委发文)
- 0.4-0.6: 信号中等(如行业动态)
- 0.1-0.3: 信号微弱(如市场传闻)
以 JSON 格式返回(仅包含被触发的因子):
{{
"factors": {{
"factor_name": 0.8,
"factor_name2": 0.5
}},
"reasoning": "简要说明推导逻辑50字内"
}}"""
try:
llm_output = llm_call(
messages=[
{"role": "system", "content": "你是一位宏观经济分析师,擅长从新闻中提取宏观经济信号。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=600
)
if not llm_output:
return None
# 清理并解析 JSON
import re
llm_output = llm_output.strip()
if llm_output.startswith("```"):
llm_output = re.sub(r'^```(?:json)?\s*', '', llm_output)
llm_output = re.sub(r'\s*```$', '', llm_output)
result = json.loads(llm_output)
factors = result.get("factors", {})
reasoning = result.get("reasoning", "")
# 验证因子名称和强度
valid_factors = {}
for factor_name, strength in factors.items():
if factor_name in available_factors:
strength = float(strength)
strength = max(0.0, min(1.0, strength))
valid_factors[factor_name] = round(strength, 2)
if valid_factors:
print(f"[Strategist] 🤖 LLM 推导出 {len(valid_factors)} 个宏观因子")
print(f"[Strategist] 🤖 推导逻辑: {reasoning}")
for factor, strength in sorted(valid_factors.items(), key=lambda x: -x[1])[:5]:
print(f" 📊 {factor}: {strength}")
return valid_factors
except json.JSONDecodeError as e:
print(f"[Strategist] LLM 返回格式解析失败: {e}")
return None
except Exception as e:
print(f"[Strategist] LLM 推导异常: {e}")
return None
def _infer_factors_with_rules(self, narrative_json: Dict) -> Dict[str, float]:
"""规则引擎推导宏观因子向量(降级方案)"""
factor_vector: Dict[str, float] = {}
# 宏观因子关键词映射表
factor_keyword_map = {
"interest_rate_down": ["降息", "降准", "MLF下调", "LPR下调", "利率下行"],
"liquidity_easing": ["流动性", "宽松", "货币政策", "注入流动性"],
"cpi_rebound": ["CPI", "通胀", "物价上涨", "消费复苏"],
"risk_on": ["牛市", "突破", "成交量放大", "风险偏好"],
"risk_off": ["暴跌", "恐慌", "避险", "黑天鹅"],
"policy_tech_self_reliance": ["自主可控", "国产替代", "卡脖子", "芯片", "半导体"],
"policy_digital_economy": ["数字经济", "数据要素", "信创", "AI", "算力"],
"policy_new_infra": ["新基建", "5G", "6G", "东数西算", "算力底座"],
"policy_capital_market": ["资本市场", "印花税", "T+0", "金融强国"],
"policy_soe_reform": ["国企改革", "中特估", "红利"],
"policy_low_altitude": ["低空经济", "无人机", "eVTOL"],
"policy_food_security": ["粮食安全", "一号文件", "种业"],
"policy_public_health": ["医疗", "医保", "创新药", "老龄化"],
"consumption_stimulus": ["消费券", "汽车下乡", "家电补贴", "内需"],
"govt_spending": ["财政支出", "专项债", "万亿", "基建投资"],
"geopolitics_tension": ["地缘", "制裁", "冲突", "台海", "中美"],
"export_growth": ["出口", "外贸", "订单", "海外需求"],
"market_volume_spike": ["成交量", "放量", "天量"],
}
# 收集所有文本
all_text = ""
level_a_count = 0
level_b_count = 0
for narrative in narrative_json.get('narratives', []):
all_text += f" {narrative.get('topic', '')} "
all_text += " ".join(narrative.get('key_signals', []))
level_a_count += narrative.get('level_a_count', 0)
level_b_count += narrative.get('level_b_count', 0)
# 扫描关键词
for factor_name, keywords in factor_keyword_map.items():
match_count = sum(1 for kw in keywords if kw in all_text)
if match_count > 0:
if level_a_count > 0:
strength = min(1.0, 0.8 + match_count * 0.1)
elif level_b_count > 0:
strength = min(0.9, 0.5 + match_count * 0.1)
else:
strength = min(0.6, 0.3 + match_count * 0.1)
factor_vector[factor_name] = round(strength, 2)
print(f"[Strategist] 📏 规则引擎推导出 {len(factor_vector)} 个宏观因子")
return factor_vector
def calculate_macro_impact_on_etf(
self,
etf_code: str,
macro_factor: str
) -> float:
"""
计算宏观因子对特定 ETF 的影响
Args:
etf_code: ETF 代码
macro_factor: 宏观因子名称(如 "rate_cut"
Returns:
影响值 (-10 到 +10)
"""
# 从宏观矩阵查询
factor_data = self.macro_matrix.get("macro_factors", {}).get(macro_factor, {})
impact_map = factor_data.get("impact", {})
# 查找 ETF 对应的类别
etf_mapping = self.macro_matrix.get("etf_mapping", {})
for etf_type, codes in etf_mapping.items():
if etf_code in codes:
return impact_map.get(etf_type, 0)
return 0
def generate_world_book_snapshot(self) -> Dict:
"""生成 World Book 快照(供 PM 使用)"""
snapshot = self.wb.export_snapshot()
# 添加策略师视角的元数据
snapshot["strategist_view"] = {
"strong_narratives": [
n.topic for n in self.wb.get_top_narratives(3)
],
"macro_recommendation": self._get_macro_recommendation(),
"risk_level": self._assess_risk_level()
}
return snapshot
def _get_macro_recommendation(self) -> str:
"""生成宏观建议"""
cycle = self.wb.macro_cycle
if cycle.status == "upward" and cycle.liquidity == "loose":
return "aggressive" # 激进配置
elif cycle.status == "downward" or cycle.policy_wind == "regulation":
return "defensive" # 防守配置
else:
return "balanced" # 平衡配置
def _assess_risk_level(self) -> str:
"""评估市场风险等级"""
# 基于叙事数量和宏观状态
narrative_count = len(self.wb.active_narratives)
cycle = self.wb.macro_cycle
if cycle.status == "downward" or narrative_count < 3:
return "high"
elif cycle.status == "upward" and narrative_count >= 5:
return "low"
else:
return "medium"
# ==================== 测试代码 ====================
if __name__ == "__main__":
print("=" * 50)
print("Skill C: Strategist 宏观策略师测试")
print("=" * 50)
# 创建 WorldBook 和 Strategist
wb = WorldBook(data_dir="data")
strategist = MacroStrategist(wb)
# 模拟 Analyst 输出
mock_narrative_json = {
"timestamp": datetime.now().isoformat(),
"total_news": 3,
"narratives": [
{
"topic": "低空经济",
"news_count": 2,
"avg_score": 82.0,
"max_score": 90.0,
"level_a_count": 0,
"related_etfs": ["512980", "159969"],
"lifecycle_stage": "fermentation"
},
{
"topic": "AI算力",
"news_count": 1,
"avg_score": 75.0,
"max_score": 75.0,
"level_a_count": 0,
"related_etfs": ["515980", "159813"],
"lifecycle_stage": "fermentation"
}
]
}
# 1. 处理叙事 JSON
print("\n1. 处理叙事 JSON:")
strategist.process_narrative_json(mock_narrative_json)
# 2. 显示当前 World Book 状态
print("\n2. 当前 World Book 状态:")
print(f" 活跃叙事数: {len(wb.active_narratives)}")
print(f" 宏观周期: {wb.macro_cycle.to_dict()}")
# 3. 执行每日维护
print("\n3. 执行每日维护:")
maintenance_report = strategist.daily_maintenance()
print(f" 维护报告: {json.dumps(maintenance_report, ensure_ascii=False, indent=2)}")
# 4. 检测宏观周期变化(模拟降准新闻)
print("\n4. 检测宏观周期变化:")
mock_news_with_policy = {
"narratives": [
{"topic": "央行降准", "level_a_count": 1, "avg_score": 95.0}
]
}
cycle_change = strategist.detect_macro_cycle_change(mock_news_with_policy)
if cycle_change:
print(f" 检测到周期变化: {cycle_change['reason']}")
strategist.apply_macro_cycle_change(cycle_change)
# 5. 生成快照
print("\n5. 生成 World Book 快照:")
snapshot = strategist.generate_world_book_snapshot()
print(f" 策略建议: {snapshot['strategist_view']['macro_recommendation']}")
print(f" 风险等级: {snapshot['strategist_view']['risk_level']}")
# 6. 保存 World Book
wb.save()
print("\n✅ Strategist 模块测试完成")