""" 分析报告生成节点 在补货建议工作流的最后一个节点执行,生成结构化分析报告 """ import logging import time import json import os from typing import Dict, Any from decimal import Decimal from datetime import datetime from langchain_core.messages import HumanMessage from ..llm import get_llm_client from ..models import AnalysisReport from ..services.result_writer import ResultWriter logger = logging.getLogger(__name__) def _load_prompt(filename: str) -> str: """从prompts目录加载提示词文件""" prompts_dir = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), "prompts" ) filepath = os.path.join(prompts_dir, filename) if not os.path.exists(filepath): raise FileNotFoundError(f"Prompt文件未找到: {filepath}") with open(filepath, "r", encoding="utf-8") as f: return f.read() def _calculate_suggestion_stats(part_results: list) -> dict: """ 基于完整数据计算补货建议统计 统计维度: 1. 总体统计:总数量、总金额 2. 优先级分布:高/中/低优先级配件数及金额 3. 价格区间分布:低价/中价/高价配件分布 4. 周转频次分布:高频/中频/低频配件分布 5. 补货规模分布:大额/中额/小额补货配件分布 """ stats = { # 总体统计 "total_parts_cnt": 0, "total_suggest_cnt": 0, "total_suggest_amount": Decimal("0"), # 优先级分布: 1=高, 2=中, 3=低 "priority_high_cnt": 0, "priority_high_amount": Decimal("0"), "priority_medium_cnt": 0, "priority_medium_amount": Decimal("0"), "priority_low_cnt": 0, "priority_low_amount": Decimal("0"), # 价格区间分布 (成本价) "price_low_cnt": 0, "price_low_amount": Decimal("0"), "price_medium_cnt": 0, "price_medium_amount": Decimal("0"), "price_high_cnt": 0, "price_high_amount": Decimal("0"), # 周转频次分布 (月均销量) "turnover_high_cnt": 0, "turnover_high_amount": Decimal("0"), "turnover_medium_cnt": 0, "turnover_medium_amount": Decimal("0"), "turnover_low_cnt": 0, "turnover_low_amount": Decimal("0"), # 补货金额分布 "replenish_large_cnt": 0, "replenish_large_amount": Decimal("0"), "replenish_medium_cnt": 0, "replenish_medium_amount": Decimal("0"), "replenish_small_cnt": 0, "replenish_small_amount": Decimal("0"), } if not part_results: return stats for pr in part_results: # 兼容对象和字典两种形式 if hasattr(pr, "total_suggest_cnt"): suggest_cnt = pr.total_suggest_cnt suggest_amount = pr.total_suggest_amount cost_price = pr.cost_price avg_sales = pr.total_avg_sales_cnt priority = pr.priority else: suggest_cnt = int(pr.get("total_suggest_cnt", 0)) suggest_amount = Decimal(str(pr.get("total_suggest_amount", 0))) cost_price = Decimal(str(pr.get("cost_price", 0))) avg_sales = Decimal(str(pr.get("total_avg_sales_cnt", 0))) priority = int(pr.get("priority", 2)) # 总体统计 stats["total_parts_cnt"] += 1 stats["total_suggest_cnt"] += suggest_cnt stats["total_suggest_amount"] += suggest_amount # 优先级分布 if priority == 1: stats["priority_high_cnt"] += 1 stats["priority_high_amount"] += suggest_amount elif priority == 2: stats["priority_medium_cnt"] += 1 stats["priority_medium_amount"] += suggest_amount else: stats["priority_low_cnt"] += 1 stats["priority_low_amount"] += suggest_amount # 价格区间分布: <50低价, 50-200中价, >200高价 if cost_price < 50: stats["price_low_cnt"] += 1 stats["price_low_amount"] += suggest_amount elif cost_price <= 200: stats["price_medium_cnt"] += 1 stats["price_medium_amount"] += suggest_amount else: stats["price_high_cnt"] += 1 stats["price_high_amount"] += suggest_amount # 周转频次分布: 月均销量 >=5高频, 1-5中频, <1低频 if avg_sales >= 5: stats["turnover_high_cnt"] += 1 stats["turnover_high_amount"] += suggest_amount elif avg_sales >= 1: stats["turnover_medium_cnt"] += 1 stats["turnover_medium_amount"] += suggest_amount else: stats["turnover_low_cnt"] += 1 stats["turnover_low_amount"] += suggest_amount # 补货金额分布: >=5000大额, 1000-5000中额, <1000小额 if suggest_amount >= 5000: stats["replenish_large_cnt"] += 1 stats["replenish_large_amount"] += suggest_amount elif suggest_amount >= 1000: stats["replenish_medium_cnt"] += 1 stats["replenish_medium_amount"] += suggest_amount else: stats["replenish_small_cnt"] += 1 stats["replenish_small_amount"] += suggest_amount return stats def _calculate_risk_stats(part_ratios: list) -> dict: """计算风险统计数据""" stats = { "shortage_cnt": 0, "shortage_amount": Decimal("0"), "stagnant_cnt": 0, "stagnant_amount": Decimal("0"), "low_freq_cnt": 0, "low_freq_amount": Decimal("0"), } for pr in part_ratios: valid_storage = Decimal(str(pr.get("valid_storage_cnt", 0) or 0)) avg_sales = Decimal(str(pr.get("avg_sales_cnt", 0) or 0)) out_stock = Decimal(str(pr.get("out_stock_cnt", 0) or 0)) cost_price = Decimal(str(pr.get("cost_price", 0) or 0)) # 呆滞件: 有库存但90天无出库 if valid_storage > 0 and out_stock == 0: stats["stagnant_cnt"] += 1 stats["stagnant_amount"] += valid_storage * cost_price # 低频件: 无库存且月均销量<1 elif valid_storage == 0 and avg_sales < 1: stats["low_freq_cnt"] += 1 # 缺货件: 无库存且月均销量>=1 elif valid_storage == 0 and avg_sales >= 1: stats["shortage_cnt"] += 1 # 缺货损失估算:月均销量 * 成本价 stats["shortage_amount"] += avg_sales * cost_price return stats def _build_suggestion_summary(suggestion_stats: dict) -> str: """ 基于预计算的统计数据构建结构化补货建议摘要 摘要包含: - 补货总体规模 - 优先级分布 - 价格区间分布 - 周转频次分布 - 补货金额分布 """ if suggestion_stats["total_parts_cnt"] == 0: return "暂无补货建议" lines = [] # 总体规模 lines.append(f"### 补货总体规模") lines.append(f"- 涉及配件种类: {suggestion_stats['total_parts_cnt']}种") lines.append(f"- 建议补货总数量: {suggestion_stats['total_suggest_cnt']}件") lines.append(f"- 建议补货总金额: {suggestion_stats['total_suggest_amount']:.2f}元") lines.append("") # 优先级分布 lines.append(f"### 优先级分布") lines.append(f"| 优先级 | 配件数 | 金额(元) | 占比 |") lines.append(f"|--------|--------|----------|------|") total_amount = suggestion_stats['total_suggest_amount'] or Decimal("1") if suggestion_stats['priority_high_cnt'] > 0: pct = suggestion_stats['priority_high_amount'] / total_amount * 100 lines.append(f"| 高优先级 | {suggestion_stats['priority_high_cnt']} | {suggestion_stats['priority_high_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['priority_medium_cnt'] > 0: pct = suggestion_stats['priority_medium_amount'] / total_amount * 100 lines.append(f"| 中优先级 | {suggestion_stats['priority_medium_cnt']} | {suggestion_stats['priority_medium_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['priority_low_cnt'] > 0: pct = suggestion_stats['priority_low_amount'] / total_amount * 100 lines.append(f"| 低优先级 | {suggestion_stats['priority_low_cnt']} | {suggestion_stats['priority_low_amount']:.2f} | {pct:.1f}% |") lines.append("") # 价格区间分布 lines.append(f"### 价格区间分布 (按成本价)") lines.append(f"| 价格区间 | 配件数 | 金额(元) | 占比 |") lines.append(f"|----------|--------|----------|------|") if suggestion_stats['price_low_cnt'] > 0: pct = suggestion_stats['price_low_amount'] / total_amount * 100 lines.append(f"| 低价(<50元) | {suggestion_stats['price_low_cnt']} | {suggestion_stats['price_low_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['price_medium_cnt'] > 0: pct = suggestion_stats['price_medium_amount'] / total_amount * 100 lines.append(f"| 中价(50-200元) | {suggestion_stats['price_medium_cnt']} | {suggestion_stats['price_medium_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['price_high_cnt'] > 0: pct = suggestion_stats['price_high_amount'] / total_amount * 100 lines.append(f"| 高价(>200元) | {suggestion_stats['price_high_cnt']} | {suggestion_stats['price_high_amount']:.2f} | {pct:.1f}% |") lines.append("") # 周转频次分布 lines.append(f"### 周转频次分布 (按月均销量)") lines.append(f"| 周转频次 | 配件数 | 金额(元) | 占比 |") lines.append(f"|----------|--------|----------|------|") if suggestion_stats['turnover_high_cnt'] > 0: pct = suggestion_stats['turnover_high_amount'] / total_amount * 100 lines.append(f"| 高频(≥5件/月) | {suggestion_stats['turnover_high_cnt']} | {suggestion_stats['turnover_high_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['turnover_medium_cnt'] > 0: pct = suggestion_stats['turnover_medium_amount'] / total_amount * 100 lines.append(f"| 中频(1-5件/月) | {suggestion_stats['turnover_medium_cnt']} | {suggestion_stats['turnover_medium_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['turnover_low_cnt'] > 0: pct = suggestion_stats['turnover_low_amount'] / total_amount * 100 lines.append(f"| 低频(<1件/月) | {suggestion_stats['turnover_low_cnt']} | {suggestion_stats['turnover_low_amount']:.2f} | {pct:.1f}% |") lines.append("") # 补货金额分布 lines.append(f"### 单配件补货金额分布") lines.append(f"| 补货规模 | 配件数 | 金额(元) | 占比 |") lines.append(f"|----------|--------|----------|------|") if suggestion_stats['replenish_large_cnt'] > 0: pct = suggestion_stats['replenish_large_amount'] / total_amount * 100 lines.append(f"| 大额(≥5000元) | {suggestion_stats['replenish_large_cnt']} | {suggestion_stats['replenish_large_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['replenish_medium_cnt'] > 0: pct = suggestion_stats['replenish_medium_amount'] / total_amount * 100 lines.append(f"| 中额(1000-5000元) | {suggestion_stats['replenish_medium_cnt']} | {suggestion_stats['replenish_medium_amount']:.2f} | {pct:.1f}% |") if suggestion_stats['replenish_small_cnt'] > 0: pct = suggestion_stats['replenish_small_amount'] / total_amount * 100 lines.append(f"| 小额(<1000元) | {suggestion_stats['replenish_small_cnt']} | {suggestion_stats['replenish_small_amount']:.2f} | {pct:.1f}% |") return "\n".join(lines) def generate_analysis_report_node(state: dict) -> dict: """ 生成分析报告节点 输入: part_ratios, llm_suggestions, allocated_details, part_results 输出: analysis_report """ start_time = time.time() task_no = state.get("task_no", "") group_id = state.get("group_id", 0) dealer_grouping_id = state.get("dealer_grouping_id", 0) dealer_grouping_name = state.get("dealer_grouping_name", "") brand_grouping_id = state.get("brand_grouping_id") statistics_date = state.get("statistics_date", "") part_ratios = state.get("part_ratios", []) part_results = state.get("part_results", []) allocated_details = state.get("allocated_details", []) logger.info(f"[{task_no}] 开始生成分析报告: dealer={dealer_grouping_name}") try: # 计算风险统计 risk_stats = _calculate_risk_stats(part_ratios) # 计算补货建议统计 (基于完整数据) suggestion_stats = _calculate_suggestion_stats(part_results) # 构建结构化建议汇总 suggestion_summary = _build_suggestion_summary(suggestion_stats) # 加载 Prompt prompt_template = _load_prompt("analysis_report.md") # 填充 Prompt 变量 prompt = prompt_template.format( dealer_grouping_id=dealer_grouping_id, dealer_grouping_name=dealer_grouping_name, statistics_date=statistics_date, suggestion_summary=suggestion_summary, shortage_cnt=risk_stats["shortage_cnt"], shortage_amount=f"{risk_stats['shortage_amount']:.2f}", stagnant_cnt=risk_stats["stagnant_cnt"], stagnant_amount=f"{risk_stats['stagnant_amount']:.2f}", low_freq_cnt=risk_stats["low_freq_cnt"], low_freq_amount="0.00", # 低频件无库存 ) # 调用 LLM llm_client = get_llm_client() response = llm_client.invoke( messages=[HumanMessage(content=prompt)], ) # 解析 JSON 响应 response_text = response.content.strip() # 移除可能的 markdown 代码块 if response_text.startswith("```"): lines = response_text.split("\n") response_text = "\n".join(lines[1:-1]) report_data = json.loads(response_text) # 复用已计算的统计数据 total_suggest_cnt = suggestion_stats["total_suggest_cnt"] total_suggest_amount = suggestion_stats["total_suggest_amount"] execution_time_ms = int((time.time() - start_time) * 1000) # 创建报告对象 # 新 prompt 字段名映射到现有数据库字段: # overall_assessment -> replenishment_insights # risk_alerts -> urgency_assessment # procurement_strategy -> strategy_recommendations # expected_impact -> expected_outcomes # execution_guide 已移除,置为 None report = AnalysisReport( task_no=task_no, group_id=group_id, dealer_grouping_id=dealer_grouping_id, dealer_grouping_name=dealer_grouping_name, brand_grouping_id=brand_grouping_id, report_type="replenishment", replenishment_insights=report_data.get("overall_assessment"), urgency_assessment=report_data.get("risk_alerts"), strategy_recommendations=report_data.get("procurement_strategy"), execution_guide=None, expected_outcomes=report_data.get("expected_impact"), total_suggest_cnt=total_suggest_cnt, total_suggest_amount=total_suggest_amount, shortage_risk_cnt=risk_stats["shortage_cnt"], excess_risk_cnt=risk_stats["stagnant_cnt"], stagnant_cnt=risk_stats["stagnant_cnt"], low_freq_cnt=risk_stats["low_freq_cnt"], llm_provider=getattr(llm_client, "provider", ""), llm_model=getattr(llm_client, "model", ""), llm_tokens=response.usage.total_tokens, execution_time_ms=execution_time_ms, statistics_date=statistics_date, ) # 保存到数据库 result_writer = ResultWriter() try: result_writer.save_analysis_report(report) finally: result_writer.close() logger.info( f"[{task_no}] 分析报告生成完成: " f"shortage={risk_stats['shortage_cnt']}, " f"stagnant={risk_stats['stagnant_cnt']}, " f"time={execution_time_ms}ms" ) return { "analysis_report": report.to_dict(), "end_time": time.time(), } except Exception as e: logger.error(f"[{task_no}] 分析报告生成失败: {e}", exc_info=True) # 返回空报告,不中断整个流程 return { "analysis_report": { "error": str(e), "task_no": task_no, }, "end_time": time.time(), }