418 lines
14 KiB
Python
418 lines
14 KiB
Python
"""
|
||
MND-IA Main Orchestrator
|
||
=========================
|
||
主调度程序 - 协调所有 Agent Skills 完成每日投资决策流程
|
||
|
||
执行流程:
|
||
1. Ingest (数据摄取): Analyst 读取新闻, Quant 读取行情
|
||
2. Cognition (认知更新): Strategist 更新 WorldBook
|
||
3. Decision (决策生成): PM 计算 Trust Index 并生成订单
|
||
4. Execution (执行输出): 输出交易指令
|
||
5. Audit (复盘审计): Auditor 评估昨日预测准确性
|
||
"""
|
||
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
import json
|
||
import argparse
|
||
|
||
# 导入核心模块
|
||
from core.world_book import WorldBook
|
||
|
||
# 导入 Agent Skills
|
||
from skills.analyst import NewsAnalyzer
|
||
from skills.quant import QuantAnalyzer, get_all_etf_codes_from_asset_map
|
||
from skills.strategist import MacroStrategist
|
||
from skills.pm import PortfolioManager, save_orders_to_file
|
||
from skills.auditor import PerformanceAuditor
|
||
from skills.crawler import XinwenLianboCrawler
|
||
|
||
|
||
class MNDAgent:
|
||
"""MND-IA 主智能体"""
|
||
|
||
def __init__(
|
||
self,
|
||
data_dir: str = "data",
|
||
total_capital: float = 1000000.0,
|
||
llm_client=None,
|
||
use_llm: bool = True
|
||
):
|
||
"""
|
||
Args:
|
||
data_dir: 数据目录
|
||
total_capital: 总资金(元)
|
||
llm_client: LLM 客户端(可选,用于 Analyst 增强)
|
||
use_llm: 是否使用 LLM 进行智能分析(默认 True)
|
||
"""
|
||
self.data_dir = Path(data_dir)
|
||
self.data_dir.mkdir(exist_ok=True)
|
||
self.use_llm = use_llm
|
||
|
||
# 初始化核心
|
||
print("=" * 60)
|
||
print("🚀 MND-IA 系统启动")
|
||
print(f" 模式: {'LLM 智能分析' if use_llm else '规则引擎(降级模式)'}")
|
||
print("=" * 60)
|
||
|
||
self.world_book = WorldBook(data_dir=data_dir)
|
||
|
||
# 初始化 Agent Skills(传递 use_llm 参数)
|
||
self.analyst = NewsAnalyzer(llm_client=llm_client, use_llm=use_llm)
|
||
self.quant = QuantAnalyzer()
|
||
self.strategist = MacroStrategist(self.world_book, use_llm=use_llm)
|
||
self.pm = PortfolioManager(self.world_book, total_capital=total_capital, use_llm=use_llm)
|
||
self.auditor = PerformanceAuditor(data_dir=data_dir)
|
||
self.crawler = XinwenLianboCrawler()
|
||
|
||
print("✅ 所有 Agent Skills 初始化完成\n")
|
||
|
||
def daily_batch_workflow(
|
||
self,
|
||
news_list: list = None,
|
||
etf_codes: list = None,
|
||
use_crawler: bool = True
|
||
) -> dict:
|
||
"""
|
||
每日批处理流程(08:30 AM 执行)
|
||
|
||
Args:
|
||
news_list: 新闻列表(如为 None 则使用爬虫或测试数据)
|
||
etf_codes: ETF 代码列表(如为 None 则使用全部)
|
||
use_crawler: 是否使用爬虫抓取新闻联播(默认 True)
|
||
|
||
Returns:
|
||
执行结果汇总
|
||
"""
|
||
print(f"📅 开始执行每日批处理流程")
|
||
print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print("=" * 60)
|
||
|
||
# ========== Phase 1: 数据摄取 (Ingest) ==========
|
||
print("\n【Phase 1: 数据摄取】")
|
||
print("-" * 60)
|
||
|
||
# 1.1 获取新闻数据
|
||
if news_list is None:
|
||
if use_crawler:
|
||
print("🕷️ 使用爬虫抓取新闻联播...")
|
||
news_list = self.crawler.crawl_sync()
|
||
|
||
if not news_list:
|
||
print("⚠️ 爬虫未获取到数据,使用测试数据")
|
||
news_list = self._get_test_news()
|
||
else:
|
||
print("⚠️ 未提供新闻数据,使用测试数据")
|
||
news_list = self._get_test_news()
|
||
|
||
print(f"📰 新闻分析: {len(news_list)} 条")
|
||
news_analysis = self.analyst.batch_analyze(news_list)
|
||
narrative_json = self.analyst.generate_narrative_json(news_analysis)
|
||
|
||
# 1.2 获取行情数据
|
||
if etf_codes is None:
|
||
etf_codes = get_all_etf_codes_from_asset_map()
|
||
print(f"📊 行情获取: {len(etf_codes)} 个 ETF")
|
||
|
||
market_data = self.quant.batch_analyze(etf_codes)
|
||
market_report = self.quant.generate_market_report(market_data)
|
||
|
||
# ========== Phase 2: 认知更新 (Cognition) ==========
|
||
print("\n【Phase 2: 认知更新】")
|
||
print("-" * 60)
|
||
|
||
# 2.1 处理叙事 JSON
|
||
self.strategist.process_narrative_json(narrative_json)
|
||
|
||
# 2.2 检测宏观周期变化
|
||
cycle_change = self.strategist.detect_macro_cycle_change(
|
||
narrative_json,
|
||
market_report
|
||
)
|
||
|
||
if cycle_change:
|
||
self.strategist.apply_macro_cycle_change(cycle_change)
|
||
|
||
# 2.3 每日维护
|
||
maintenance_report = self.strategist.daily_maintenance()
|
||
|
||
# 2.4 生成 WorldBook 快照
|
||
wb_snapshot = self.strategist.generate_world_book_snapshot()
|
||
|
||
# ========== Phase 3: 决策生成 (Decision) ==========
|
||
print("\n【Phase 3: 决策生成】")
|
||
print("-" * 60)
|
||
|
||
# 3.1 计算 Trust Index
|
||
trust_results = self.pm.batch_calculate_trust_index(market_data)
|
||
|
||
# 3.2 生成交易订单
|
||
raw_orders = self.pm.generate_trade_orders(trust_results, market_data)
|
||
|
||
# 3.3 风控检查
|
||
final_orders = self.pm.apply_risk_control(raw_orders)
|
||
|
||
# 3.4 生成投资组合报告
|
||
portfolio_report = self.pm.generate_portfolio_report(
|
||
trust_results,
|
||
final_orders
|
||
)
|
||
|
||
# ========== Phase 4: 执行输出 (Execution) ==========
|
||
print("\n【Phase 4: 执行输出】")
|
||
print("-" * 60)
|
||
|
||
if final_orders:
|
||
# 保存订单到文件
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
save_orders_to_file(final_orders, f"orders_{timestamp}.json")
|
||
|
||
print(f"📋 生成 {len(final_orders)} 条交易指令:")
|
||
for order in final_orders[:5]: # 显示前5条
|
||
print(f" {order['action'].upper()} {order['code']}: "
|
||
f"{order['shares']}股 @ ¥{order['price']}")
|
||
else:
|
||
print("⚠️ 无符合条件的交易机会")
|
||
|
||
# 保存 WorldBook
|
||
self.world_book.save()
|
||
|
||
# ========== Phase 5: 结果汇总 ==========
|
||
execution_result = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"workflow": "daily_batch",
|
||
"phases": {
|
||
"ingest": {
|
||
"news_count": len(news_list),
|
||
"etf_count": len(etf_codes),
|
||
"narratives_identified": len(narrative_json['narratives'])
|
||
},
|
||
"cognition": {
|
||
"active_narratives": len(self.world_book.active_narratives),
|
||
"macro_cycle": self.world_book.macro_cycle.to_dict(),
|
||
"cycle_changed": cycle_change is not None
|
||
},
|
||
"decision": {
|
||
"trust_results": len(trust_results),
|
||
"orders_generated": len(final_orders),
|
||
"capital_utilization": portfolio_report['risk_control']['capital_utilization']
|
||
}
|
||
},
|
||
"top_opportunities": portfolio_report['top_opportunities'][:3],
|
||
"orders": final_orders
|
||
}
|
||
|
||
# 保存执行记录
|
||
self._save_execution_log(execution_result)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("✅ 每日批处理流程完成")
|
||
print("=" * 60)
|
||
|
||
return execution_result
|
||
|
||
def post_market_audit(
|
||
self,
|
||
yesterday_predictions: list = None
|
||
) -> dict:
|
||
"""
|
||
盘后复盘流程(16:00 PM 执行)
|
||
|
||
Args:
|
||
yesterday_predictions: 昨日的预测记录
|
||
|
||
Returns:
|
||
审计报告
|
||
"""
|
||
print("\n📊 开始盘后复盘审计")
|
||
print("=" * 60)
|
||
|
||
if yesterday_predictions is None:
|
||
print("⚠️ 无昨日预测数据,跳过审计")
|
||
return {"status": "skipped", "reason": "no_predictions"}
|
||
|
||
# 获取实际表现数据(这里需要从行情数据计算)
|
||
actual_performances = self._fetch_actual_performances(yesterday_predictions)
|
||
|
||
# 执行审计
|
||
audit_summary = self.auditor.batch_audit(
|
||
yesterday_predictions,
|
||
actual_performances
|
||
)
|
||
|
||
# 生成报告
|
||
report = self.auditor.generate_correction_report(audit_summary)
|
||
|
||
# 保存报告
|
||
report_path = self.data_dir / f"audit_report_{datetime.now().strftime('%Y%m%d')}.md"
|
||
with open(report_path, 'w', encoding='utf-8') as f:
|
||
f.write(report)
|
||
|
||
print(f"\n📄 审计报告已保存: {report_path}")
|
||
print(f"准确率: {audit_summary['performance']['accuracy']:.1f}%")
|
||
|
||
return audit_summary
|
||
|
||
def intraday_sentinel(
|
||
self,
|
||
breaking_news: dict
|
||
) -> dict:
|
||
"""
|
||
盘中哨兵流程(09:30 - 15:00)
|
||
|
||
当检测到 Level A 重大新闻时触发
|
||
|
||
Args:
|
||
breaking_news: 快讯内容
|
||
|
||
Returns:
|
||
应急响应结果
|
||
"""
|
||
print("\n🚨 盘中哨兵触发")
|
||
print("=" * 60)
|
||
|
||
# 分析快讯
|
||
analysis = self.analyst.analyze_news(
|
||
breaking_news.get('text', ''),
|
||
breaking_news.get('title', '')
|
||
)
|
||
|
||
# 如果不是 Level A,忽略
|
||
if analysis['level'] != 'A':
|
||
print("⚠️ 非 A 级新闻,不触发应急响应")
|
||
return {"triggered": False, "reason": "not_level_a"}
|
||
|
||
print(f"⚠️ 检测到 Level A 重大新闻: {analysis['title']}")
|
||
print(f" 涉及板块: {', '.join(analysis['sectors'])}")
|
||
|
||
# 立即更新 WorldBook
|
||
narrative_json = self.analyst.generate_narrative_json([analysis])
|
||
self.strategist.process_narrative_json(narrative_json)
|
||
|
||
# 检查宏观周期变化
|
||
cycle_change = self.strategist.detect_macro_cycle_change(narrative_json)
|
||
if cycle_change:
|
||
self.strategist.apply_macro_cycle_change(cycle_change)
|
||
|
||
# 重新评估持仓(如果有)
|
||
# 这里简化处理,实际应该重新计算 Trust Index
|
||
|
||
self.world_book.save()
|
||
|
||
return {
|
||
"triggered": True,
|
||
"news_level": "A",
|
||
"sectors_affected": analysis['sectors'],
|
||
"macro_cycle_changed": cycle_change is not None,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
def _get_test_news(self) -> list:
|
||
"""获取测试新闻数据"""
|
||
return [
|
||
{
|
||
"title": "工信部发布低空经济指导意见 2027年规模达1万亿",
|
||
"text": "工业和信息化部发布《低空经济高质量发展指导意见》,推动 eVTOL 产业化。"
|
||
},
|
||
{
|
||
"title": "AI算力需求持续旺盛 多家公司业绩预增50%",
|
||
"text": "智算中心建设加速,算力租赁业务快速增长。"
|
||
}
|
||
]
|
||
|
||
def _fetch_actual_performances(
|
||
self,
|
||
predictions: list
|
||
) -> dict:
|
||
"""获取实际表现数据(简化版)"""
|
||
# 实际应该从 Quant 模块获取真实数据
|
||
# 这里返回模拟数据
|
||
performances = {}
|
||
for pred in predictions:
|
||
code = pred['code']
|
||
performances[code] = {
|
||
"code": code,
|
||
"price_change": 0, # 需要实际计算
|
||
"volume_change": 1.0,
|
||
"date": datetime.now().strftime("%Y-%m-%d")
|
||
}
|
||
return performances
|
||
|
||
def _save_execution_log(self, result: dict) -> None:
|
||
"""保存执行日志"""
|
||
log_path = self.data_dir / "execution_logs.json"
|
||
|
||
logs = []
|
||
if log_path.exists():
|
||
with open(log_path, 'r', encoding='utf-8') as f:
|
||
logs = json.load(f)
|
||
|
||
logs.append(result)
|
||
|
||
# 只保留最近30天
|
||
logs = logs[-30:]
|
||
|
||
with open(log_path, 'w', encoding='utf-8') as f:
|
||
json.dump(logs, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def main():
|
||
"""命令行入口"""
|
||
parser = argparse.ArgumentParser(description="MND-IA 投资智能体")
|
||
parser.add_argument(
|
||
'--mode',
|
||
choices=['daily', 'audit', 'test'],
|
||
default='test',
|
||
help='运行模式: daily(每日批处理), audit(盘后审计), test(测试)'
|
||
)
|
||
parser.add_argument(
|
||
'--capital',
|
||
type=float,
|
||
default=1000000.0,
|
||
help='总资金(元)'
|
||
)
|
||
parser.add_argument(
|
||
'--no-crawler',
|
||
action='store_true',
|
||
help='不使用爬虫,使用测试数据'
|
||
)
|
||
parser.add_argument(
|
||
'--no-llm',
|
||
action='store_true',
|
||
help='不使用 LLM,使用规则引擎(降级模式)'
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 创建 Agent 实例
|
||
agent = MNDAgent(
|
||
total_capital=args.capital,
|
||
use_llm=not args.no_llm
|
||
)
|
||
|
||
if args.mode == 'daily':
|
||
# 每日批处理
|
||
result = agent.daily_batch_workflow(use_crawler=not args.no_crawler)
|
||
print(f"\n执行结果:")
|
||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||
|
||
elif args.mode == 'audit':
|
||
# 盘后审计(需要提供昨日数据)
|
||
print("盘后审计模式需要提供昨日预测数据")
|
||
|
||
else:
|
||
# 测试模式
|
||
print("\n🧪 测试模式:执行完整流程演示\n")
|
||
result = agent.daily_batch_workflow(use_crawler=not args.no_crawler)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("📈 Top 投资机会:")
|
||
print("=" * 60)
|
||
for opp in result['top_opportunities']:
|
||
print(f" {opp['code']}: Trust Index {opp['trust_index']:.1f} - {opp['verdict']}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|