MND-IA/core/world_book.py
2025-12-31 19:58:09 +08:00

401 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MND-IA Core: World Book (世界书)
====================================
这是系统的数据中心和逻辑基石,存储宏观状态、活跃叙事对象和历史经验。
所有 Agent 必须通过此模块访问和更新系统状态。
"""
import json
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from pathlib import Path
from copy import deepcopy
class Narrative:
"""单个叙事对象 (Narrative Object)"""
def __init__(
self,
id: str,
topic: str,
related_etfs: List[str],
lifecycle_stage: str = "incubation",
base_score: float = 50.0,
decay_factor: float = 0.95,
current_weight: float = None
):
self.id = id
self.topic = topic
self.related_etfs = related_etfs
self.lifecycle_stage = lifecycle_stage # incubation | fermentation | realization | decay
self.base_score = base_score
self.decay_factor = decay_factor
self.current_weight = current_weight if current_weight is not None else base_score
self.last_updated = datetime.now().strftime("%Y-%m-%d")
def decay(self) -> None:
"""应用时间衰减"""
self.current_weight *= self.decay_factor
self.last_updated = datetime.now().strftime("%Y-%m-%d")
def boost(self, new_score: float) -> None:
"""新闻强化叙事权重"""
# 使用加权平均新信息占40%历史占60%
self.current_weight = self.current_weight * 0.6 + new_score * 0.4
self.last_updated = datetime.now().strftime("%Y-%m-%d")
def update_stage(self, new_stage: str) -> None:
"""更新生命周期阶段"""
valid_stages = ["incubation", "fermentation", "realization", "decay"]
if new_stage in valid_stages:
self.lifecycle_stage = new_stage
self.last_updated = datetime.now().strftime("%Y-%m-%d")
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"id": self.id,
"topic": self.topic,
"related_etfs": self.related_etfs,
"lifecycle_stage": self.lifecycle_stage,
"base_score": self.base_score,
"decay_factor": self.decay_factor,
"current_weight": round(self.current_weight, 2),
"last_updated": self.last_updated
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Narrative':
"""从字典创建对象"""
return cls(
id=data["id"],
topic=data["topic"],
related_etfs=data["related_etfs"],
lifecycle_stage=data.get("lifecycle_stage", "incubation"),
base_score=data.get("base_score", 50.0),
decay_factor=data.get("decay_factor", 0.95),
current_weight=data.get("current_weight")
)
class MacroCycle:
"""宏观周期状态"""
def __init__(
self,
status: str = "neutral", # upward | downward | neutral
liquidity: str = "neutral", # loose | neutral | tight
policy_wind: str = "wait_and_see" # stimulus | regulation | wait_and_see
):
self.status = status
self.liquidity = liquidity
self.policy_wind = policy_wind
def to_dict(self) -> Dict[str, str]:
return {
"status": self.status,
"liquidity": self.liquidity,
"policy_wind": self.policy_wind
}
@classmethod
def from_dict(cls, data: Dict[str, str]) -> 'MacroCycle':
return cls(
status=data.get("status", "neutral"),
liquidity=data.get("liquidity", "neutral"),
policy_wind=data.get("policy_wind", "wait_and_see")
)
class WorldBook:
"""
MCP World Book - 系统核心数据中心
======================================
动态更新的状态管理器,存储:
1. 宏观周期状态
2. 活跃叙事对象列表
3. 宏观因子影响矩阵快照
"""
def __init__(self, data_dir: str = "data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
self.world_book_path = self.data_dir / "world_book.json"
# 初始化数据结构
self.timestamp = datetime.now().isoformat()
self.macro_cycle = MacroCycle()
self.active_narratives: Dict[str, Narrative] = {}
self.static_matrix_snapshot: Dict[str, Dict[str, float]] = {}
# 新增: 宏观因子向量 (用于向量点积计算)
self.macro_factor_vector: Dict[str, float] = {}
# 尝试加载已有数据
self.load()
def load(self) -> bool:
"""从磁盘加载 World Book"""
if not self.world_book_path.exists():
print(f"[WorldBook] 未找到已有数据,初始化新的 World Book")
self._load_static_matrix()
return False
try:
with open(self.world_book_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.timestamp = data.get("timestamp", datetime.now().isoformat())
self.macro_cycle = MacroCycle.from_dict(data.get("macro_cycle", {}))
# 加载叙事对象
self.active_narratives = {}
for narrative_data in data.get("active_narratives", []):
narrative = Narrative.from_dict(narrative_data)
self.active_narratives[narrative.id] = narrative
# 加载矩阵快照
self.static_matrix_snapshot = data.get("static_matrix_snapshot", {})
# 加载宏观因子向量
self.macro_factor_vector = data.get("macro_factor_vector", {})
print(f"[WorldBook] 成功加载数据,包含 {len(self.active_narratives)} 个活跃叙事")
return True
except Exception as e:
print(f"[WorldBook] 加载数据失败: {e}")
self._load_static_matrix()
return False
def save(self) -> bool:
"""保存 World Book 到磁盘"""
try:
self.timestamp = datetime.now().isoformat()
data = {
"timestamp": self.timestamp,
"macro_cycle": self.macro_cycle.to_dict(),
"active_narratives": [
narrative.to_dict()
for narrative in self.active_narratives.values()
],
"static_matrix_snapshot": self.static_matrix_snapshot,
"macro_factor_vector": self.macro_factor_vector
}
with open(self.world_book_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"[WorldBook] 成功保存数据到 {self.world_book_path}")
return True
except Exception as e:
print(f"[WorldBook] 保存数据失败: {e}")
return False
def _load_static_matrix(self) -> None:
"""加载静态宏观矩阵"""
matrix_path = Path("core") / "macro_matrix.json"
if matrix_path.exists():
try:
with open(matrix_path, 'r', encoding='utf-8') as f:
self.static_matrix_snapshot = json.load(f)
print(f"[WorldBook] 加载宏观矩阵成功")
except Exception as e:
print(f"[WorldBook] 加载宏观矩阵失败: {e}")
self.static_matrix_snapshot = {}
def add_narrative(self, narrative: Narrative) -> None:
"""添加新叙事"""
self.active_narratives[narrative.id] = narrative
print(f"[WorldBook] 添加新叙事: {narrative.topic} (ID: {narrative.id})")
def update_narrative(self, narrative_id: str, **kwargs) -> bool:
"""更新叙事属性"""
if narrative_id not in self.active_narratives:
print(f"[WorldBook] 叙事不存在: {narrative_id}")
return False
narrative = self.active_narratives[narrative_id]
if 'new_score' in kwargs:
narrative.boost(kwargs['new_score'])
if 'stage' in kwargs:
narrative.update_stage(kwargs['stage'])
return True
def decay_all_narratives(self) -> None:
"""对所有叙事应用时间衰减"""
for narrative in self.active_narratives.values():
narrative.decay()
print(f"[WorldBook] 对 {len(self.active_narratives)} 个叙事应用了时间衰减")
def remove_weak_narratives(self, threshold: float = 10.0) -> List[str]:
"""移除权重过低的叙事"""
to_remove = [
nid for nid, narrative in self.active_narratives.items()
if narrative.current_weight < threshold
]
for nid in to_remove:
topic = self.active_narratives[nid].topic
del self.active_narratives[nid]
print(f"[WorldBook] 移除衰退叙事: {topic} (ID: {nid})")
return to_remove
def get_narrative_by_topic(self, topic: str) -> Optional[Narrative]:
"""根据主题获取叙事"""
for narrative in self.active_narratives.values():
if narrative.topic == topic:
return narrative
return None
def get_narratives_by_etf(self, etf_code: str) -> List[Narrative]:
"""获取与特定 ETF 相关的所有叙事"""
return [
narrative for narrative in self.active_narratives.values()
if etf_code in narrative.related_etfs
]
def update_macro_cycle(self, **kwargs) -> None:
"""更新宏观周期状态"""
if 'status' in kwargs:
self.macro_cycle.status = kwargs['status']
if 'liquidity' in kwargs:
self.macro_cycle.liquidity = kwargs['liquidity']
if 'policy_wind' in kwargs:
self.macro_cycle.policy_wind = kwargs['policy_wind']
print(f"[WorldBook] 宏观周期更新: {self.macro_cycle.to_dict()}")
def update_macro_factor_vector(self, factor_updates: Dict[str, float]) -> None:
"""
更新宏观因子向量
Args:
factor_updates: 宏观因子及其强度,例如:
{
"interest_rate_down": 1.0, # 强烈降息预期
"geopolitics_tension": 0.5, # 中等地缘紧张
"policy_digital_economy": 1.0 # 数字经济政策头条
}
"""
self.macro_factor_vector.update(factor_updates)
print(f"[WorldBook] 宏观因子向量更新: {self.macro_factor_vector}")
def get_macro_factor_value(self, factor_name: str) -> float:
"""获取特定宏观因子的当前值"""
return self.macro_factor_vector.get(factor_name, 0.0)
def clear_macro_factor_vector(self) -> None:
"""清空宏观因子向量(通常在每日开盘前重置)"""
self.macro_factor_vector = {}
print("[WorldBook] 宏观因子向量已清空")
def get_top_narratives(self, top_n: int = 5) -> List[Narrative]:
"""获取权重最高的 N 个叙事"""
sorted_narratives = sorted(
self.active_narratives.values(),
key=lambda n: n.current_weight,
reverse=True
)
return sorted_narratives[:top_n]
def export_snapshot(self) -> Dict[str, Any]:
"""导出完整快照(供其他 Agent 读取)"""
return {
"timestamp": self.timestamp,
"macro_cycle": self.macro_cycle.to_dict(),
"macro_factor_vector": self.macro_factor_vector,
"active_narratives": [n.to_dict() for n in self.active_narratives.values()],
"static_matrix_snapshot": self.static_matrix_snapshot,
"summary": {
"total_narratives": len(self.active_narratives),
"top_3_topics": [n.topic for n in self.get_top_narratives(3)]
}
}
def __repr__(self) -> str:
return (
f"<WorldBook: {len(self.active_narratives)} narratives, "
f"macro={self.macro_cycle.status}, "
f"updated={self.timestamp}>"
)
# ==================== 工具函数 ====================
def create_narrative_id(topic: str, date: Optional[str] = None) -> str:
"""生成叙事 ID"""
if date is None:
date = datetime.now().strftime("%Y%m")
# 简化主题名称作为 ID 的一部分
topic_slug = topic.replace(" ", "_").replace("", "_").replace("", "_")
return f"narrative_{topic_slug}_{date}"
if __name__ == "__main__":
# 测试代码
print("=" * 50)
print("World Book 核心模块测试")
print("=" * 50)
# 创建 World Book 实例
wb = WorldBook(data_dir="data")
# 添加测试叙事
narrative1 = Narrative(
id=create_narrative_id("低空经济"),
topic="低空经济",
related_etfs=["512980", "159969"],
lifecycle_stage="fermentation",
base_score=85.0,
decay_factor=0.95
)
narrative2 = Narrative(
id=create_narrative_id("AI算力"),
topic="AI算力",
related_etfs=["515980", "159813"],
lifecycle_stage="realization",
base_score=92.0,
decay_factor=0.93
)
wb.add_narrative(narrative1)
wb.add_narrative(narrative2)
# 更新宏观周期
wb.update_macro_cycle(
status="upward",
liquidity="loose",
policy_wind="stimulus"
)
# 保存数据
wb.save()
# 测试导出快照
snapshot = wb.export_snapshot()
print("\n当前状态快照:")
print(json.dumps(snapshot, ensure_ascii=False, indent=2))
# 测试衰减
print("\n应用时间衰减...")
wb.decay_all_narratives()
# 测试获取 Top 叙事
top = wb.get_top_narratives(2)
print(f"\nTop 2 叙事:")
for n in top:
print(f" - {n.topic}: {n.current_weight:.2f}")
print("\n✅ World Book 核心模块测试完成")