""" MND-IA Main Orchestrator ========================= 主调度程序 - 协调所有 Agent Skills 完成每日投资决策流程 执行流程: 1. Ingest (数据摄取): Analyst 读取新闻, Quant 读取行情 2. Cognition (认知更新): Strategist 更新 WorldBook 3. Decision (决策生成): PM 计算 Trust Index 并生成订单 4. Execution (执行输出): 输出交易指令 5. Audit (复盘审计): Auditor 评估昨日预测准确性 """ import sys from pathlib import Path from datetime import datetime import json import argparse # 导入核心模块 from core.world_book import WorldBook # 导入 Agent Skills from skills.analyst import NewsAnalyzer from skills.quant import QuantAnalyzer, get_all_etf_codes_from_asset_map from skills.strategist import MacroStrategist from skills.pm import PortfolioManager, save_orders_to_file from skills.auditor import PerformanceAuditor from skills.crawler import XinwenLianboCrawler class MNDAgent: """MND-IA 主智能体""" def __init__( self, data_dir: str = "data", total_capital: float = 1000000.0, llm_client=None, use_llm: bool = True ): """ Args: data_dir: 数据目录 total_capital: 总资金(元) llm_client: LLM 客户端(可选,用于 Analyst 增强) use_llm: 是否使用 LLM 进行智能分析(默认 True) """ self.data_dir = Path(data_dir) self.data_dir.mkdir(exist_ok=True) self.use_llm = use_llm # 初始化核心 print("=" * 60) print("🚀 MND-IA 系统启动") print(f" 模式: {'LLM 智能分析' if use_llm else '规则引擎(降级模式)'}") print("=" * 60) self.world_book = WorldBook(data_dir=data_dir) # 初始化 Agent Skills(传递 use_llm 参数) self.analyst = NewsAnalyzer(llm_client=llm_client, use_llm=use_llm) self.quant = QuantAnalyzer() self.strategist = MacroStrategist(self.world_book, use_llm=use_llm) self.pm = PortfolioManager(self.world_book, total_capital=total_capital, use_llm=use_llm) self.auditor = PerformanceAuditor(data_dir=data_dir) self.crawler = XinwenLianboCrawler() print("✅ 所有 Agent Skills 初始化完成\n") def daily_batch_workflow( self, news_list: list = None, etf_codes: list = None, use_crawler: bool = True ) -> dict: """ 每日批处理流程(08:30 AM 执行) Args: news_list: 新闻列表(如为 None 则使用爬虫或测试数据) etf_codes: ETF 代码列表(如为 None 则使用全部) use_crawler: 是否使用爬虫抓取新闻联播(默认 True) Returns: 执行结果汇总 """ print(f"📅 开始执行每日批处理流程") print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) # ========== Phase 1: 数据摄取 (Ingest) ========== print("\n【Phase 1: 数据摄取】") print("-" * 60) # 1.1 获取新闻数据 if news_list is None: if use_crawler: print("🕷️ 使用爬虫抓取新闻联播...") news_list = self.crawler.crawl_sync() if not news_list: print("⚠️ 爬虫未获取到数据,使用测试数据") news_list = self._get_test_news() else: print("⚠️ 未提供新闻数据,使用测试数据") news_list = self._get_test_news() print(f"📰 新闻分析: {len(news_list)} 条") news_analysis = self.analyst.batch_analyze(news_list) narrative_json = self.analyst.generate_narrative_json(news_analysis) # 1.2 获取行情数据 if etf_codes is None: etf_codes = get_all_etf_codes_from_asset_map() print(f"📊 行情获取: {len(etf_codes)} 个 ETF") market_data = self.quant.batch_analyze(etf_codes) market_report = self.quant.generate_market_report(market_data) # ========== Phase 2: 认知更新 (Cognition) ========== print("\n【Phase 2: 认知更新】") print("-" * 60) # 2.1 处理叙事 JSON self.strategist.process_narrative_json(narrative_json) # 2.2 检测宏观周期变化 cycle_change = self.strategist.detect_macro_cycle_change( narrative_json, market_report ) if cycle_change: self.strategist.apply_macro_cycle_change(cycle_change) # 2.3 每日维护 maintenance_report = self.strategist.daily_maintenance() # 2.4 生成 WorldBook 快照 wb_snapshot = self.strategist.generate_world_book_snapshot() # ========== Phase 3: 决策生成 (Decision) ========== print("\n【Phase 3: 决策生成】") print("-" * 60) # 3.1 计算 Trust Index trust_results = self.pm.batch_calculate_trust_index(market_data) # 3.2 生成交易订单 raw_orders = self.pm.generate_trade_orders(trust_results, market_data) # 3.3 风控检查 final_orders = self.pm.apply_risk_control(raw_orders) # 3.4 生成投资组合报告 portfolio_report = self.pm.generate_portfolio_report( trust_results, final_orders ) # ========== Phase 4: 执行输出 (Execution) ========== print("\n【Phase 4: 执行输出】") print("-" * 60) if final_orders: # 保存订单到文件 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") save_orders_to_file(final_orders, f"orders_{timestamp}.json") print(f"📋 生成 {len(final_orders)} 条交易指令:") for order in final_orders[:5]: # 显示前5条 print(f" {order['action'].upper()} {order['code']}: " f"{order['shares']}股 @ ¥{order['price']}") else: print("⚠️ 无符合条件的交易机会") # 保存 WorldBook self.world_book.save() # ========== Phase 5: 结果汇总 ========== execution_result = { "timestamp": datetime.now().isoformat(), "workflow": "daily_batch", "phases": { "ingest": { "news_count": len(news_list), "etf_count": len(etf_codes), "narratives_identified": len(narrative_json['narratives']) }, "cognition": { "active_narratives": len(self.world_book.active_narratives), "macro_cycle": self.world_book.macro_cycle.to_dict(), "cycle_changed": cycle_change is not None }, "decision": { "trust_results": len(trust_results), "orders_generated": len(final_orders), "capital_utilization": portfolio_report['risk_control']['capital_utilization'] } }, "top_opportunities": portfolio_report['top_opportunities'][:3], "orders": final_orders } # 保存执行记录 self._save_execution_log(execution_result) print("\n" + "=" * 60) print("✅ 每日批处理流程完成") print("=" * 60) return execution_result def post_market_audit( self, yesterday_predictions: list = None ) -> dict: """ 盘后复盘流程(16:00 PM 执行) Args: yesterday_predictions: 昨日的预测记录 Returns: 审计报告 """ print("\n📊 开始盘后复盘审计") print("=" * 60) if yesterday_predictions is None: print("⚠️ 无昨日预测数据,跳过审计") return {"status": "skipped", "reason": "no_predictions"} # 获取实际表现数据(这里需要从行情数据计算) actual_performances = self._fetch_actual_performances(yesterday_predictions) # 执行审计 audit_summary = self.auditor.batch_audit( yesterday_predictions, actual_performances ) # 生成报告 report = self.auditor.generate_correction_report(audit_summary) # 保存报告 report_path = self.data_dir / f"audit_report_{datetime.now().strftime('%Y%m%d')}.md" with open(report_path, 'w', encoding='utf-8') as f: f.write(report) print(f"\n📄 审计报告已保存: {report_path}") print(f"准确率: {audit_summary['performance']['accuracy']:.1f}%") return audit_summary def intraday_sentinel( self, breaking_news: dict ) -> dict: """ 盘中哨兵流程(09:30 - 15:00) 当检测到 Level A 重大新闻时触发 Args: breaking_news: 快讯内容 Returns: 应急响应结果 """ print("\n🚨 盘中哨兵触发") print("=" * 60) # 分析快讯 analysis = self.analyst.analyze_news( breaking_news.get('text', ''), breaking_news.get('title', '') ) # 如果不是 Level A,忽略 if analysis['level'] != 'A': print("⚠️ 非 A 级新闻,不触发应急响应") return {"triggered": False, "reason": "not_level_a"} print(f"⚠️ 检测到 Level A 重大新闻: {analysis['title']}") print(f" 涉及板块: {', '.join(analysis['sectors'])}") # 立即更新 WorldBook narrative_json = self.analyst.generate_narrative_json([analysis]) self.strategist.process_narrative_json(narrative_json) # 检查宏观周期变化 cycle_change = self.strategist.detect_macro_cycle_change(narrative_json) if cycle_change: self.strategist.apply_macro_cycle_change(cycle_change) # 重新评估持仓(如果有) # 这里简化处理,实际应该重新计算 Trust Index self.world_book.save() return { "triggered": True, "news_level": "A", "sectors_affected": analysis['sectors'], "macro_cycle_changed": cycle_change is not None, "timestamp": datetime.now().isoformat() } def _get_test_news(self) -> list: """获取测试新闻数据""" return [ { "title": "工信部发布低空经济指导意见 2027年规模达1万亿", "text": "工业和信息化部发布《低空经济高质量发展指导意见》,推动 eVTOL 产业化。" }, { "title": "AI算力需求持续旺盛 多家公司业绩预增50%", "text": "智算中心建设加速,算力租赁业务快速增长。" } ] def _fetch_actual_performances( self, predictions: list ) -> dict: """获取实际表现数据(简化版)""" # 实际应该从 Quant 模块获取真实数据 # 这里返回模拟数据 performances = {} for pred in predictions: code = pred['code'] performances[code] = { "code": code, "price_change": 0, # 需要实际计算 "volume_change": 1.0, "date": datetime.now().strftime("%Y-%m-%d") } return performances def _save_execution_log(self, result: dict) -> None: """保存执行日志""" log_path = self.data_dir / "execution_logs.json" logs = [] if log_path.exists(): with open(log_path, 'r', encoding='utf-8') as f: logs = json.load(f) logs.append(result) # 只保留最近30天 logs = logs[-30:] with open(log_path, 'w', encoding='utf-8') as f: json.dump(logs, f, ensure_ascii=False, indent=2) def main(): """命令行入口""" parser = argparse.ArgumentParser(description="MND-IA 投资智能体") parser.add_argument( '--mode', choices=['daily', 'audit', 'test'], default='test', help='运行模式: daily(每日批处理), audit(盘后审计), test(测试)' ) parser.add_argument( '--capital', type=float, default=1000000.0, help='总资金(元)' ) parser.add_argument( '--no-crawler', action='store_true', help='不使用爬虫,使用测试数据' ) parser.add_argument( '--no-llm', action='store_true', help='不使用 LLM,使用规则引擎(降级模式)' ) args = parser.parse_args() # 创建 Agent 实例 agent = MNDAgent( total_capital=args.capital, use_llm=not args.no_llm ) if args.mode == 'daily': # 每日批处理 result = agent.daily_batch_workflow(use_crawler=not args.no_crawler) print(f"\n执行结果:") print(json.dumps(result, ensure_ascii=False, indent=2)) elif args.mode == 'audit': # 盘后审计(需要提供昨日数据) print("盘后审计模式需要提供昨日预测数据") else: # 测试模式 print("\n🧪 测试模式:执行完整流程演示\n") result = agent.daily_batch_workflow(use_crawler=not args.no_crawler) print("\n" + "=" * 60) print("📈 Top 投资机会:") print("=" * 60) for opp in result['top_opportunities']: print(f" {opp['code']}: Trust Index {opp['trust_index']:.1f} - {opp['verdict']}") if __name__ == "__main__": main()