MND-IA/skills/auditor.py
2025-12-31 19:58:09 +08:00

476 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Skill E: Auditor (复盘审计员)
==============================
职能:绩效归因、偏差修正
输入:历史预测 vs 实际走势
输出Correction_Report修正建议
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.config import config, llm_call
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json
import pandas as pd
class PerformanceAuditor:
"""
复盘审计员 - 负责绩效归因和策略修正
核心职责:
1. 对比预测与实际表现
2. 分析偏差原因
3. 提出修正建议
4. 记录经验教训
"""
def __init__(self, data_dir: str = "data"):
self.data_dir = Path(data_dir)
self.audit_log_path = self.data_dir / "audit_logs.json"
self.correction_history = self._load_correction_history()
def _load_correction_history(self) -> List[Dict]:
"""加载历史修正记录"""
if not self.audit_log_path.exists():
return []
try:
with open(self.audit_log_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"[Auditor] 加载修正历史失败: {e}")
return []
def _save_correction_history(self) -> None:
"""保存修正记录"""
try:
with open(self.audit_log_path, 'w', encoding='utf-8') as f:
json.dump(self.correction_history, f, ensure_ascii=False, indent=2)
print(f"[Auditor] 修正记录已保存")
except Exception as e:
print(f"[Auditor] 保存失败: {e}")
def audit_prediction(
self,
prediction: Dict,
actual_performance: Dict
) -> Dict:
"""
审计单个预测
Args:
prediction: 预测记录
{
"code": "515980",
"trust_index": 75.0,
"narrative_score": 85.0,
"flow_score": 60.0,
"predicted_action": "buy",
"timestamp": "2025-12-29T08:30:00"
}
actual_performance: 实际表现
{
"code": "515980",
"price_change": -2.5, # 实际涨跌幅 %
"volume_change": 1.3, # 成交量变化倍数
"date": "2025-12-29"
}
Returns:
审计结果
"""
code = prediction['code']
predicted_action = prediction['predicted_action']
trust_index = prediction['trust_index']
price_change = actual_performance['price_change']
# 判断预测是否正确
if predicted_action == "buy":
is_correct = price_change > 0
expected_direction = "上涨"
elif predicted_action == "sell":
is_correct = price_change < 0
expected_direction = "下跌"
else:
is_correct = abs(price_change) < 2
expected_direction = "震荡"
# 计算偏差程度
deviation = abs(price_change) if not is_correct else 0
# 分析原因
reason = self._analyze_deviation_reason(
prediction,
actual_performance,
is_correct
)
# 生成修正建议
correction = self._generate_correction(
prediction,
actual_performance,
is_correct,
reason
)
audit_result = {
"code": code,
"prediction": {
"action": predicted_action,
"trust_index": trust_index,
"expected": expected_direction
},
"actual": {
"price_change": price_change,
"volume_change": actual_performance.get('volume_change', 1.0)
},
"result": {
"is_correct": is_correct,
"deviation": round(deviation, 2),
"reason": reason
},
"correction": correction,
"timestamp": datetime.now().isoformat()
}
return audit_result
def _analyze_deviation_reason(
self,
prediction: Dict,
actual: Dict,
is_correct: bool
) -> str:
"""分析偏差原因"""
if is_correct:
return "预测准确"
narrative_score = prediction.get('narrative_score', 0)
flow_score = prediction.get('flow_score', 0)
price_change = actual['price_change']
volume_change = actual.get('volume_change', 1.0)
# 情况1叙事高但资金未跟进
if narrative_score > 70 and flow_score < 40:
return "叙事未兑现:新闻热度高但资金流入不足,市场观望情绪浓厚"
# 情况2资金放量但方向相反
if volume_change > 1.5 and price_change < -2:
return "放量下跌:资金大量流出,可能遭遇利空或止盈盘涌出"
# 情况3黑天鹅事件
if abs(price_change) > 5:
return "极端波动:可能受突发事件或外部市场剧烈波动影响"
# 情况4叙事过度衰减
if narrative_score < 50 and prediction['predicted_action'] == "buy":
return "叙事衰减过快:市场关注度下降速度超预期"
return "其他因素:需进一步分析宏观环境或板块轮动"
def _generate_correction(
self,
prediction: Dict,
actual: Dict,
is_correct: bool,
reason: str
) -> Dict:
"""生成修正建议"""
if is_correct:
return {
"action": "none",
"message": "预测准确,保持当前策略"
}
narrative_score = prediction.get('narrative_score', 0)
flow_score = prediction.get('flow_score', 0)
corrections = []
# 修正1提高资金流权重
if "资金流入不足" in reason:
corrections.append({
"target": "trust_index_formula",
"suggestion": "提高资金流权重至 0.5,降低叙事权重至 0.5",
"reason": "叙事未能转化为实际资金流动"
})
# 修正2加强止损
if "放量下跌" in reason:
corrections.append({
"target": "risk_control",
"suggestion": "设置 -3% 硬止损线,出现放量下跌立即清仓",
"reason": "未能及时识别资金流出信号"
})
# 修正3降低 Trust Index 阈值
if narrative_score > 60 and flow_score > 50 and not is_correct:
corrections.append({
"target": "min_trust_score",
"suggestion": "提高最低信任指数阈值至 65",
"reason": "当前阈值可能过于宽松,需要更高确定性"
})
# 修正4叙事衰减系数调整
if "衰减过快" in reason:
corrections.append({
"target": "narrative_decay",
"suggestion": "对该板块叙事的衰减系数从 0.95 调整为 0.97",
"reason": "市场对该主题的持续关注度被低估"
})
return {
"action": "adjust" if corrections else "monitor",
"corrections": corrections,
"priority": "high" if len(corrections) >= 2 else "medium"
}
def batch_audit(
self,
predictions: List[Dict],
actual_performances: Dict[str, Dict]
) -> Dict:
"""
批量审计
Args:
predictions: 预测列表
actual_performances: 实际表现字典 {code: performance}
Returns:
汇总审计报告
"""
print(f"[Auditor] 开始审计 {len(predictions)} 条预测...")
audit_results = []
correct_count = 0
for prediction in predictions:
code = prediction['code']
if code not in actual_performances:
print(f"[Auditor] 警告: 缺少 {code} 的实际数据")
continue
result = self.audit_prediction(
prediction,
actual_performances[code]
)
audit_results.append(result)
if result['result']['is_correct']:
correct_count += 1
# 计算准确率
accuracy = correct_count / len(audit_results) * 100 if audit_results else 0
# 统计偏差原因
reason_counts = {}
for result in audit_results:
reason = result['result']['reason']
reason_counts[reason] = reason_counts.get(reason, 0) + 1
# 收集修正建议
all_corrections = []
for result in audit_results:
if result['correction']['action'] != "none":
all_corrections.extend(result['correction'].get('corrections', []))
# 生成汇总报告
summary_report = {
"timestamp": datetime.now().isoformat(),
"audit_period": {
"start": min(r['timestamp'] for r in audit_results) if audit_results else None,
"end": max(r['timestamp'] for r in audit_results) if audit_results else None
},
"performance": {
"total_predictions": len(audit_results),
"correct_predictions": correct_count,
"wrong_predictions": len(audit_results) - correct_count,
"accuracy": round(accuracy, 2)
},
"deviation_analysis": reason_counts,
"corrections_needed": len(all_corrections),
"high_priority_corrections": [
c for c in all_corrections
if any('止损' in c.get('suggestion', '') for c in all_corrections)
],
"detailed_results": audit_results
}
# 保存到历史记录
self.correction_history.append(summary_report)
self._save_correction_history()
print(f"[Auditor] 审计完成,准确率: {accuracy:.1f}%")
return summary_report
def generate_correction_report(
self,
audit_summary: Dict
) -> str:
"""
生成人类可读的修正报告
Args:
audit_summary: batch_audit() 的输出
Returns:
Markdown 格式的报告
"""
accuracy = audit_summary['performance']['accuracy']
total = audit_summary['performance']['total_predictions']
correct = audit_summary['performance']['correct_predictions']
report = f"""
# 投资决策复盘审计报告
**生成时间:** {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
## 一、整体表现
- **预测数量:** {total}
- **正确预测:** {correct}
- **错误预测:** {total - correct}
- **准确率:** {accuracy:.1f}%
## 二、偏差原因分析
"""
for reason, count in audit_summary['deviation_analysis'].items():
report += f"- **{reason}:** {count}\n"
report += f"\n## 三、修正建议\n\n"
report += f"需要修正的参数: {audit_summary['corrections_needed']}\n\n"
# 列出高优先级修正
high_priority = audit_summary.get('high_priority_corrections', [])
if high_priority:
report += "### 🚨 高优先级修正\n\n"
for i, correction in enumerate(high_priority[:3], 1):
report += f"{i}. **{correction['target']}**\n"
report += f" - 建议: {correction['suggestion']}\n"
report += f" - 原因: {correction['reason']}\n\n"
report += "\n## 四、经验总结\n\n"
if accuracy >= 70:
report += "✅ 当前策略表现良好,建议保持现有参数设置。\n"
elif accuracy >= 50:
report += "⚠️ 策略有效但存在改进空间,建议根据修正建议调整参数。\n"
else:
report += "❌ 策略表现不佳,需要重大调整或暂停交易,重新评估市场环境。\n"
return report
def get_historical_accuracy_trend(self) -> List[Dict]:
"""获取历史准确率趋势"""
trend = []
for record in self.correction_history[-10:]: # 最近10次
trend.append({
"timestamp": record['timestamp'],
"accuracy": record['performance']['accuracy'],
"total_predictions": record['performance']['total_predictions']
})
return trend
# ==================== 测试代码 ====================
if __name__ == "__main__":
print("=" * 50)
print("Skill E: Auditor 复盘审计员测试")
print("=" * 50)
# 创建审计员实例
auditor = PerformanceAuditor(data_dir="data")
# 模拟昨日的预测记录
mock_predictions = [
{
"code": "515980",
"trust_index": 75.0,
"narrative_score": 85.0,
"flow_score": 60.0,
"predicted_action": "buy",
"timestamp": "2025-12-29T08:30:00"
},
{
"code": "512480",
"trust_index": 45.0,
"narrative_score": 55.0,
"flow_score": 30.0,
"predicted_action": "hold",
"timestamp": "2025-12-29T08:30:00"
},
{
"code": "159928",
"trust_index": 35.0,
"narrative_score": 70.0,
"flow_score": 20.0,
"predicted_action": "sell",
"timestamp": "2025-12-29T08:30:00"
}
]
# 模拟今日的实际表现
mock_actual = {
"515980": {
"code": "515980",
"price_change": 3.2, # 上涨3.2%
"volume_change": 1.5,
"date": "2025-12-29"
},
"512480": {
"code": "512480",
"price_change": -1.8, # 下跌1.8%
"volume_change": 0.9,
"date": "2025-12-29"
},
"159928": {
"code": "159928",
"price_change": 0.5, # 微涨(预测失误)
"volume_change": 2.1,
"date": "2025-12-29"
}
}
# 1. 批量审计
print("\n1. 批量审计:")
summary = auditor.batch_audit(mock_predictions, mock_actual)
print(f"\n准确率: {summary['performance']['accuracy']:.1f}%")
print(f"正确: {summary['performance']['correct_predictions']}")
print(f"错误: {summary['performance']['wrong_predictions']}")
# 2. 查看详细结果
print("\n2. 详细审计结果:")
for result in summary['detailed_results']:
print(f"\n {result['code']}:")
print(f" - 预测: {result['prediction']['action']} (Trust={result['prediction']['trust_index']})")
print(f" - 实际: {result['actual']['price_change']:+.1f}%")
print(f" - 结果: {'✅ 正确' if result['result']['is_correct'] else '❌ 错误'}")
print(f" - 原因: {result['result']['reason']}")
# 3. 生成修正报告
print("\n3. 修正报告:")
report = auditor.generate_correction_report(summary)
print(report)
# 4. 历史趋势
print("\n4. 历史准确率趋势:")
trend = auditor.get_historical_accuracy_trend()
for record in trend:
print(f" {record['timestamp'][:10]}: {record['accuracy']:.1f}%")
print("\n✅ Auditor 模块测试完成")