401 lines
14 KiB
Python
401 lines
14 KiB
Python
"""
|
||
MND-IA Core: World Book (世界书)
|
||
====================================
|
||
这是系统的数据中心和逻辑基石,存储宏观状态、活跃叙事对象和历史经验。
|
||
所有 Agent 必须通过此模块访问和更新系统状态。
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Optional, Any
|
||
from pathlib import Path
|
||
from copy import deepcopy
|
||
|
||
|
||
class Narrative:
|
||
"""单个叙事对象 (Narrative Object)"""
|
||
|
||
def __init__(
|
||
self,
|
||
id: str,
|
||
topic: str,
|
||
related_etfs: List[str],
|
||
lifecycle_stage: str = "incubation",
|
||
base_score: float = 50.0,
|
||
decay_factor: float = 0.95,
|
||
current_weight: float = None
|
||
):
|
||
self.id = id
|
||
self.topic = topic
|
||
self.related_etfs = related_etfs
|
||
self.lifecycle_stage = lifecycle_stage # incubation | fermentation | realization | decay
|
||
self.base_score = base_score
|
||
self.decay_factor = decay_factor
|
||
self.current_weight = current_weight if current_weight is not None else base_score
|
||
self.last_updated = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
def decay(self) -> None:
|
||
"""应用时间衰减"""
|
||
self.current_weight *= self.decay_factor
|
||
self.last_updated = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
def boost(self, new_score: float) -> None:
|
||
"""新闻强化叙事权重"""
|
||
# 使用加权平均,新信息占40%,历史占60%
|
||
self.current_weight = self.current_weight * 0.6 + new_score * 0.4
|
||
self.last_updated = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
def update_stage(self, new_stage: str) -> None:
|
||
"""更新生命周期阶段"""
|
||
valid_stages = ["incubation", "fermentation", "realization", "decay"]
|
||
if new_stage in valid_stages:
|
||
self.lifecycle_stage = new_stage
|
||
self.last_updated = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典格式"""
|
||
return {
|
||
"id": self.id,
|
||
"topic": self.topic,
|
||
"related_etfs": self.related_etfs,
|
||
"lifecycle_stage": self.lifecycle_stage,
|
||
"base_score": self.base_score,
|
||
"decay_factor": self.decay_factor,
|
||
"current_weight": round(self.current_weight, 2),
|
||
"last_updated": self.last_updated
|
||
}
|
||
|
||
@classmethod
|
||
def from_dict(cls, data: Dict[str, Any]) -> 'Narrative':
|
||
"""从字典创建对象"""
|
||
return cls(
|
||
id=data["id"],
|
||
topic=data["topic"],
|
||
related_etfs=data["related_etfs"],
|
||
lifecycle_stage=data.get("lifecycle_stage", "incubation"),
|
||
base_score=data.get("base_score", 50.0),
|
||
decay_factor=data.get("decay_factor", 0.95),
|
||
current_weight=data.get("current_weight")
|
||
)
|
||
|
||
|
||
class MacroCycle:
|
||
"""宏观周期状态"""
|
||
|
||
def __init__(
|
||
self,
|
||
status: str = "neutral", # upward | downward | neutral
|
||
liquidity: str = "neutral", # loose | neutral | tight
|
||
policy_wind: str = "wait_and_see" # stimulus | regulation | wait_and_see
|
||
):
|
||
self.status = status
|
||
self.liquidity = liquidity
|
||
self.policy_wind = policy_wind
|
||
|
||
def to_dict(self) -> Dict[str, str]:
|
||
return {
|
||
"status": self.status,
|
||
"liquidity": self.liquidity,
|
||
"policy_wind": self.policy_wind
|
||
}
|
||
|
||
@classmethod
|
||
def from_dict(cls, data: Dict[str, str]) -> 'MacroCycle':
|
||
return cls(
|
||
status=data.get("status", "neutral"),
|
||
liquidity=data.get("liquidity", "neutral"),
|
||
policy_wind=data.get("policy_wind", "wait_and_see")
|
||
)
|
||
|
||
|
||
class WorldBook:
|
||
"""
|
||
MCP World Book - 系统核心数据中心
|
||
======================================
|
||
动态更新的状态管理器,存储:
|
||
1. 宏观周期状态
|
||
2. 活跃叙事对象列表
|
||
3. 宏观因子影响矩阵快照
|
||
"""
|
||
|
||
def __init__(self, data_dir: str = "data"):
|
||
self.data_dir = Path(data_dir)
|
||
self.data_dir.mkdir(exist_ok=True)
|
||
|
||
self.world_book_path = self.data_dir / "world_book.json"
|
||
|
||
# 初始化数据结构
|
||
self.timestamp = datetime.now().isoformat()
|
||
self.macro_cycle = MacroCycle()
|
||
self.active_narratives: Dict[str, Narrative] = {}
|
||
self.static_matrix_snapshot: Dict[str, Dict[str, float]] = {}
|
||
|
||
# 新增: 宏观因子向量 (用于向量点积计算)
|
||
self.macro_factor_vector: Dict[str, float] = {}
|
||
|
||
# 尝试加载已有数据
|
||
self.load()
|
||
|
||
def load(self) -> bool:
|
||
"""从磁盘加载 World Book"""
|
||
if not self.world_book_path.exists():
|
||
print(f"[WorldBook] 未找到已有数据,初始化新的 World Book")
|
||
self._load_static_matrix()
|
||
return False
|
||
|
||
try:
|
||
with open(self.world_book_path, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
|
||
self.timestamp = data.get("timestamp", datetime.now().isoformat())
|
||
self.macro_cycle = MacroCycle.from_dict(data.get("macro_cycle", {}))
|
||
|
||
# 加载叙事对象
|
||
self.active_narratives = {}
|
||
for narrative_data in data.get("active_narratives", []):
|
||
narrative = Narrative.from_dict(narrative_data)
|
||
self.active_narratives[narrative.id] = narrative
|
||
|
||
# 加载矩阵快照
|
||
self.static_matrix_snapshot = data.get("static_matrix_snapshot", {})
|
||
|
||
# 加载宏观因子向量
|
||
self.macro_factor_vector = data.get("macro_factor_vector", {})
|
||
|
||
print(f"[WorldBook] 成功加载数据,包含 {len(self.active_narratives)} 个活跃叙事")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[WorldBook] 加载数据失败: {e}")
|
||
self._load_static_matrix()
|
||
return False
|
||
|
||
def save(self) -> bool:
|
||
"""保存 World Book 到磁盘"""
|
||
try:
|
||
self.timestamp = datetime.now().isoformat()
|
||
|
||
data = {
|
||
"timestamp": self.timestamp,
|
||
"macro_cycle": self.macro_cycle.to_dict(),
|
||
"active_narratives": [
|
||
narrative.to_dict()
|
||
for narrative in self.active_narratives.values()
|
||
],
|
||
"static_matrix_snapshot": self.static_matrix_snapshot,
|
||
"macro_factor_vector": self.macro_factor_vector
|
||
}
|
||
|
||
with open(self.world_book_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"[WorldBook] 成功保存数据到 {self.world_book_path}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[WorldBook] 保存数据失败: {e}")
|
||
return False
|
||
|
||
def _load_static_matrix(self) -> None:
|
||
"""加载静态宏观矩阵"""
|
||
matrix_path = Path("core") / "macro_matrix.json"
|
||
if matrix_path.exists():
|
||
try:
|
||
with open(matrix_path, 'r', encoding='utf-8') as f:
|
||
self.static_matrix_snapshot = json.load(f)
|
||
print(f"[WorldBook] 加载宏观矩阵成功")
|
||
except Exception as e:
|
||
print(f"[WorldBook] 加载宏观矩阵失败: {e}")
|
||
self.static_matrix_snapshot = {}
|
||
|
||
def add_narrative(self, narrative: Narrative) -> None:
|
||
"""添加新叙事"""
|
||
self.active_narratives[narrative.id] = narrative
|
||
print(f"[WorldBook] 添加新叙事: {narrative.topic} (ID: {narrative.id})")
|
||
|
||
def update_narrative(self, narrative_id: str, **kwargs) -> bool:
|
||
"""更新叙事属性"""
|
||
if narrative_id not in self.active_narratives:
|
||
print(f"[WorldBook] 叙事不存在: {narrative_id}")
|
||
return False
|
||
|
||
narrative = self.active_narratives[narrative_id]
|
||
|
||
if 'new_score' in kwargs:
|
||
narrative.boost(kwargs['new_score'])
|
||
if 'stage' in kwargs:
|
||
narrative.update_stage(kwargs['stage'])
|
||
|
||
return True
|
||
|
||
def decay_all_narratives(self) -> None:
|
||
"""对所有叙事应用时间衰减"""
|
||
for narrative in self.active_narratives.values():
|
||
narrative.decay()
|
||
print(f"[WorldBook] 对 {len(self.active_narratives)} 个叙事应用了时间衰减")
|
||
|
||
def remove_weak_narratives(self, threshold: float = 10.0) -> List[str]:
|
||
"""移除权重过低的叙事"""
|
||
to_remove = [
|
||
nid for nid, narrative in self.active_narratives.items()
|
||
if narrative.current_weight < threshold
|
||
]
|
||
|
||
for nid in to_remove:
|
||
topic = self.active_narratives[nid].topic
|
||
del self.active_narratives[nid]
|
||
print(f"[WorldBook] 移除衰退叙事: {topic} (ID: {nid})")
|
||
|
||
return to_remove
|
||
|
||
def get_narrative_by_topic(self, topic: str) -> Optional[Narrative]:
|
||
"""根据主题获取叙事"""
|
||
for narrative in self.active_narratives.values():
|
||
if narrative.topic == topic:
|
||
return narrative
|
||
return None
|
||
|
||
def get_narratives_by_etf(self, etf_code: str) -> List[Narrative]:
|
||
"""获取与特定 ETF 相关的所有叙事"""
|
||
return [
|
||
narrative for narrative in self.active_narratives.values()
|
||
if etf_code in narrative.related_etfs
|
||
]
|
||
|
||
def update_macro_cycle(self, **kwargs) -> None:
|
||
"""更新宏观周期状态"""
|
||
if 'status' in kwargs:
|
||
self.macro_cycle.status = kwargs['status']
|
||
if 'liquidity' in kwargs:
|
||
self.macro_cycle.liquidity = kwargs['liquidity']
|
||
if 'policy_wind' in kwargs:
|
||
self.macro_cycle.policy_wind = kwargs['policy_wind']
|
||
|
||
print(f"[WorldBook] 宏观周期更新: {self.macro_cycle.to_dict()}")
|
||
|
||
def update_macro_factor_vector(self, factor_updates: Dict[str, float]) -> None:
|
||
"""
|
||
更新宏观因子向量
|
||
|
||
Args:
|
||
factor_updates: 宏观因子及其强度,例如:
|
||
{
|
||
"interest_rate_down": 1.0, # 强烈降息预期
|
||
"geopolitics_tension": 0.5, # 中等地缘紧张
|
||
"policy_digital_economy": 1.0 # 数字经济政策头条
|
||
}
|
||
"""
|
||
self.macro_factor_vector.update(factor_updates)
|
||
print(f"[WorldBook] 宏观因子向量更新: {self.macro_factor_vector}")
|
||
|
||
def get_macro_factor_value(self, factor_name: str) -> float:
|
||
"""获取特定宏观因子的当前值"""
|
||
return self.macro_factor_vector.get(factor_name, 0.0)
|
||
|
||
def clear_macro_factor_vector(self) -> None:
|
||
"""清空宏观因子向量(通常在每日开盘前重置)"""
|
||
self.macro_factor_vector = {}
|
||
print("[WorldBook] 宏观因子向量已清空")
|
||
|
||
def get_top_narratives(self, top_n: int = 5) -> List[Narrative]:
|
||
"""获取权重最高的 N 个叙事"""
|
||
sorted_narratives = sorted(
|
||
self.active_narratives.values(),
|
||
key=lambda n: n.current_weight,
|
||
reverse=True
|
||
)
|
||
return sorted_narratives[:top_n]
|
||
|
||
def export_snapshot(self) -> Dict[str, Any]:
|
||
"""导出完整快照(供其他 Agent 读取)"""
|
||
return {
|
||
"timestamp": self.timestamp,
|
||
"macro_cycle": self.macro_cycle.to_dict(),
|
||
"macro_factor_vector": self.macro_factor_vector,
|
||
"active_narratives": [n.to_dict() for n in self.active_narratives.values()],
|
||
"static_matrix_snapshot": self.static_matrix_snapshot,
|
||
"summary": {
|
||
"total_narratives": len(self.active_narratives),
|
||
"top_3_topics": [n.topic for n in self.get_top_narratives(3)]
|
||
}
|
||
}
|
||
|
||
def __repr__(self) -> str:
|
||
return (
|
||
f"<WorldBook: {len(self.active_narratives)} narratives, "
|
||
f"macro={self.macro_cycle.status}, "
|
||
f"updated={self.timestamp}>"
|
||
)
|
||
|
||
|
||
# ==================== 工具函数 ====================
|
||
|
||
def create_narrative_id(topic: str, date: Optional[str] = None) -> str:
|
||
"""生成叙事 ID"""
|
||
if date is None:
|
||
date = datetime.now().strftime("%Y%m")
|
||
|
||
# 简化主题名称作为 ID 的一部分
|
||
topic_slug = topic.replace(" ", "_").replace(",", "_").replace("、", "_")
|
||
return f"narrative_{topic_slug}_{date}"
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试代码
|
||
print("=" * 50)
|
||
print("World Book 核心模块测试")
|
||
print("=" * 50)
|
||
|
||
# 创建 World Book 实例
|
||
wb = WorldBook(data_dir="data")
|
||
|
||
# 添加测试叙事
|
||
narrative1 = Narrative(
|
||
id=create_narrative_id("低空经济"),
|
||
topic="低空经济",
|
||
related_etfs=["512980", "159969"],
|
||
lifecycle_stage="fermentation",
|
||
base_score=85.0,
|
||
decay_factor=0.95
|
||
)
|
||
|
||
narrative2 = Narrative(
|
||
id=create_narrative_id("AI算力"),
|
||
topic="AI算力",
|
||
related_etfs=["515980", "159813"],
|
||
lifecycle_stage="realization",
|
||
base_score=92.0,
|
||
decay_factor=0.93
|
||
)
|
||
|
||
wb.add_narrative(narrative1)
|
||
wb.add_narrative(narrative2)
|
||
|
||
# 更新宏观周期
|
||
wb.update_macro_cycle(
|
||
status="upward",
|
||
liquidity="loose",
|
||
policy_wind="stimulus"
|
||
)
|
||
|
||
# 保存数据
|
||
wb.save()
|
||
|
||
# 测试导出快照
|
||
snapshot = wb.export_snapshot()
|
||
print("\n当前状态快照:")
|
||
print(json.dumps(snapshot, ensure_ascii=False, indent=2))
|
||
|
||
# 测试衰减
|
||
print("\n应用时间衰减...")
|
||
wb.decay_all_narratives()
|
||
|
||
# 测试获取 Top 叙事
|
||
top = wb.get_top_narratives(2)
|
||
print(f"\nTop 2 叙事:")
|
||
for n in top:
|
||
print(f" - {n.topic}: {n.current_weight:.2f}")
|
||
|
||
print("\n✅ World Book 核心模块测试完成")
|