diff --git a/README.md b/README.md index bdd9417..87b5d5d 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,8 @@ graph LR B --> C[FetchPartRatio] C --> D[SQLAgent
LLM分析] D --> E[AllocateBudget] - E --> G[SaveResult] + E --> F[AnalysisReport] + F --> G[SaveResult] ``` 详细架构图见 [docs/architecture.md](docs/architecture.md) @@ -95,7 +96,8 @@ fw-pms-ai/ 1. FetchPartRatio - 从 part_ratio 表获取库销比数据 2. SQLAgent - LLM 分析数据,生成补货建议 3. AllocateBudget - 转换建议为补货明细 -4. SaveResult - 写入数据库 +4. AnalysisReport - 生成分析报告(风险评估、行动方案) +5. SaveResult - 写入数据库 ``` ### 业务术语 diff --git a/docs/architecture.md b/docs/architecture.md index 93550ca..9ca54e3 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -27,7 +27,8 @@ flowchart TB C --> D{需要重试?} D -->|是| C D -->|否| E[allocate_budget] - E --> F[END] + E --> E2[generate_analysis_report] + E2 --> F[END] end subgraph Services ["业务服务层"] @@ -62,6 +63,7 @@ flowchart TB | `fetch_part_ratio` | 获取商家组合的配件库销比数据 | dealer_grouping_id | part_ratios[] | | `sql_agent` | LLM 分析配件数据,生成补货建议 | part_ratios[] | llm_suggestions[], part_results[] | | `allocate_budget` | 转换 LLM 建议为补货明细 | llm_suggestions[] | details[] | +| `generate_analysis_report` | 生成分析报告 | part_ratios[], details[] | analysis_report | --- @@ -121,3 +123,4 @@ src/fw_pms_ai/ | `ai_replenishment_detail` | 补货明细 | | `ai_replenishment_part_summary` | 配件级汇总 | | `ai_task_execution_log` | 执行日志 | +| `ai_analysis_report` | 分析报告(JSON结构化) | diff --git a/prompts/analysis_report.md b/prompts/analysis_report.md new file mode 100644 index 0000000..df88696 --- /dev/null +++ b/prompts/analysis_report.md @@ -0,0 +1,174 @@ +# 智能补货建议分析报告 + +你是一位资深汽车配件采购顾问。AI系统已经生成了详细的补货建议明细(包含每个配件的补货数量、理由等),现在需要你站在更宏观的视角,为采购决策者提供**整体性分析**。 + +> **核心定位**: 补货明细已回答了"补什么、补多少"的问题,本报告聚焦于"整体策略、风险预警、资金规划"等**决策层面**的洞察。 + +--- + +## 商家组合信息 + +| 项目 | 数值 | +|------|------| +| 商家组合ID | {dealer_grouping_id} | +| 商家组合名称 | {dealer_grouping_name} | +| 报告生成日期 | {statistics_date} | + +--- + +## 本期补货建议概览 + +{suggestion_summary} + +--- + +## 库存健康度参考 + +| 状态分类 | 配件数量 | 涉及金额 | +|----------|----------|----------| +| 缺货件 | {shortage_cnt} | {shortage_amount} | +| 呆滞件 | {stagnant_cnt} | {stagnant_amount} | +| 低频件 | {low_freq_cnt} | {low_freq_amount} | + +--- + +## 分析任务 + +请严格按以下4个模块输出 **JSON格式** 的整体分析报告。 + +**注意**: 不要重复补货明细中已有的配件级别分析,聚焦于以下**宏观维度**。 + +### 模块1: 整体态势研判 (overall_assessment) + +从全局视角评估本次补货建议: + +1. **补货规模评估**: 本期补货总金额与历史同期相比是偏高、正常还是偏低?可能的原因是什么? +2. **结构特征分析**: 补货建议在品类、价格区间、周转频次上呈现什么分布特征?是否存在明显的集中或失衡? +3. **时机判断**: 当前是否处于补货的有利时机?需要考虑哪些时间因素(如节假日、促销季、供应商备货周期)? + +### 模块2: 风险预警与应对 (risk_alerts) + +识别本次补货可能面临的风险并给出应对建议: + +1. **供应风险**: 是否有配件可能面临缺货、涨价、交期延长等供应端问题? +2. **资金风险**: 本期补货是否会造成资金压力?是否存在呆滞风险较高的配件需要谨慎采购? +3. **市场风险**: 是否有配件需求可能下滑(如车型停产、季节性波动)? +4. **执行风险**: 补货建议中是否有需要人工复核的异常项?(如建议量远超历史、首次采购配件等) + +### 模块3: 采购策略建议 (procurement_strategy) + +提供整体性的采购执行策略: + +1. **优先级排序原则**: 如果预算或精力有限,应按什么顺序安排采购?给出清晰的分级标准 +2. **批量采购机会**: 是否有可以合并下单以降低成本的机会?涉及哪些品类或供应商? +3. **分批采购建议**: 哪些配件可以分批次补货?建议的节奏是什么? +4. **供应商协调要点**: 是否需要提前与供应商确认交期、价格或备货?关键沟通事项有哪些? + +### 模块4: 效果预期与建议 (expected_impact) + +预估按建议执行后的整体效果: + +1. **库存健康度改善**: 补货后整体库存结构预计如何变化?缺货率预计下降多少? +2. **资金效率预估**: 本期补货的预计投入产出如何?资金周转是否会改善? +3. **后续关注点**: 补货完成后需要持续关注哪些指标或配件?下一步建议行动是什么? + +--- + +## 输出格式 + +直接输出JSON对象,**不要**包含 ```json 标记: + +{{ + "overall_assessment": {{ + "scale_evaluation": {{ + "current_vs_historical": "与历史同期对比结论", + "possible_reasons": "规模变化的可能原因" + }}, + "structure_analysis": {{ + "category_distribution": "品类分布特征", + "price_range_distribution": "价格区间分布特征", + "turnover_distribution": "周转频次分布特征", + "imbalance_warning": "是否存在失衡及说明" + }}, + "timing_judgment": {{ + "is_favorable": true或false, + "timing_factors": "需要考虑的时间因素", + "recommendation": "时机相关建议" + }} + }}, + "risk_alerts": {{ + "supply_risks": [ + {{ + "risk_type": "风险类型(缺货/涨价/交期延长等)", + "affected_scope": "影响范围描述", + "likelihood": "可能性评估(高/中/低)", + "mitigation": "应对建议" + }} + ], + "capital_risks": {{ + "cash_flow_pressure": "资金压力评估", + "stagnation_warning": "呆滞风险提示", + "recommendation": "资金风险应对建议" + }}, + "market_risks": [ + {{ + "risk_description": "市场风险描述", + "affected_parts": "影响配件范围", + "recommendation": "应对建议" + }} + ], + "execution_anomalies": [ + {{ + "anomaly_type": "异常类型", + "description": "异常描述", + "review_suggestion": "复核建议" + }} + ] + }}, + "procurement_strategy": {{ + "priority_principle": {{ + "tier1_criteria": "第一优先级标准及说明", + "tier2_criteria": "第二优先级标准及说明", + "tier3_criteria": "可延后采购的标准及说明" + }}, + "batch_opportunities": {{ + "potential_savings": "潜在节省金额或比例", + "applicable_categories": "适用品类或供应商", + "execution_suggestion": "具体操作建议" + }}, + "phased_procurement": {{ + "recommended_parts": "建议分批采购的配件范围", + "suggested_rhythm": "建议的采购节奏" + }}, + "supplier_coordination": {{ + "key_communications": "关键沟通事项", + "timing_suggestions": "沟通时机建议" + }} + }}, + "expected_impact": {{ + "inventory_health": {{ + "structure_improvement": "库存结构改善预期", + "shortage_reduction": "缺货率预计下降幅度" + }}, + "capital_efficiency": {{ + "investment_amount": 本期补货投入金额, + "expected_return": "预期收益描述", + "turnover_improvement": "周转改善预期" + }}, + "follow_up_actions": {{ + "key_metrics_to_watch": "需持续关注的指标", + "next_steps": "下一步建议行动" + }} + }} +}} + +--- + +## 重要约束 + +1. **输出必须是合法的JSON对象** +2. **所有金额单位为元,保留2位小数** +3. **聚焦宏观分析,不要重复明细中已有的配件级别信息** +4. **风险和效果预估尽量量化** +5. **策略建议要具体可执行,避免空泛描述** +6. **分析基于提供的汇总数据,保持客观理性** diff --git a/sql/migrate_analysis_report.sql b/sql/migrate_analysis_report.sql new file mode 100644 index 0000000..0a90acd --- /dev/null +++ b/sql/migrate_analysis_report.sql @@ -0,0 +1,47 @@ +-- ============================================================================ +-- AI 补货建议分析报告表 +-- ============================================================================ +-- 版本: 2.0.0 +-- 更新日期: 2026-02-05 +-- 变更说明: 重构报告模块,聚焦补货决策支持(区别于传统库销分析) +-- ============================================================================ + +DROP TABLE IF EXISTS ai_analysis_report; +CREATE TABLE ai_analysis_report ( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', + task_no VARCHAR(32) NOT NULL COMMENT '任务编号', + group_id BIGINT NOT NULL COMMENT '集团ID', + dealer_grouping_id BIGINT NOT NULL COMMENT '商家组合ID', + dealer_grouping_name VARCHAR(128) COMMENT '商家组合名称', + brand_grouping_id BIGINT COMMENT '品牌组合ID', + report_type VARCHAR(32) DEFAULT 'replenishment' COMMENT '报告类型', + + -- 报告各模块 (JSON 结构化存储) - 宏观决策分析 + -- 注:字段名保持兼容,实际存储内容已更新为新模块 + replenishment_insights JSON COMMENT '整体态势研判(规模评估/结构分析/时机判断) - 原overall_assessment', + urgency_assessment JSON COMMENT '风险预警与应对(供应/资金/市场/执行风险) - 原risk_alerts', + strategy_recommendations JSON COMMENT '采购策略建议(优先级/批量机会/分批/供应商协调) - 原procurement_strategy', + execution_guide JSON COMMENT '已废弃,置为NULL', + expected_outcomes JSON COMMENT '效果预期与建议(库存健康/资金效率/后续行动) - 原expected_impact', + + -- 统计信息 + total_suggest_cnt INT DEFAULT 0 COMMENT '总建议数量', + total_suggest_amount DECIMAL(14,2) DEFAULT 0 COMMENT '总建议金额', + shortage_risk_cnt INT DEFAULT 0 COMMENT '缺货风险配件数', + excess_risk_cnt INT DEFAULT 0 COMMENT '过剩风险配件数', + stagnant_cnt INT DEFAULT 0 COMMENT '呆滞件数量', + low_freq_cnt INT DEFAULT 0 COMMENT '低频件数量', + + -- LLM 元数据 + llm_provider VARCHAR(32) COMMENT 'LLM提供商', + llm_model VARCHAR(64) COMMENT 'LLM模型名称', + llm_tokens INT DEFAULT 0 COMMENT 'LLM Token消耗', + execution_time_ms INT DEFAULT 0 COMMENT '执行耗时(毫秒)', + + statistics_date VARCHAR(16) COMMENT '统计日期', + create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + + INDEX idx_task_no (task_no), + INDEX idx_group_date (group_id, statistics_date), + INDEX idx_dealer_grouping (dealer_grouping_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AI补货建议分析报告表-结构化补货决策支持报告'; diff --git a/src/fw_pms_ai/agent/analysis_report_node.py b/src/fw_pms_ai/agent/analysis_report_node.py new file mode 100644 index 0000000..e95daef --- /dev/null +++ b/src/fw_pms_ai/agent/analysis_report_node.py @@ -0,0 +1,240 @@ +""" +分析报告生成节点 + +在补货建议工作流的最后一个节点执行,生成结构化分析报告 +""" + +import logging +import time +import json +import os +from typing import Dict, Any +from decimal import Decimal +from datetime import datetime + +from langchain_core.messages import HumanMessage + +from ..llm import get_llm_client +from ..models import AnalysisReport +from ..services.result_writer import ResultWriter + +logger = logging.getLogger(__name__) + + +def _load_prompt(filename: str) -> str: + """从prompts目录加载提示词文件""" + prompts_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), + "prompts" + ) + filepath = os.path.join(prompts_dir, filename) + + if not os.path.exists(filepath): + raise FileNotFoundError(f"Prompt文件未找到: {filepath}") + + with open(filepath, "r", encoding="utf-8") as f: + return f.read() + + +def _calculate_risk_stats(part_ratios: list) -> dict: + """计算风险统计数据""" + stats = { + "shortage_cnt": 0, + "shortage_amount": Decimal("0"), + "stagnant_cnt": 0, + "stagnant_amount": Decimal("0"), + "low_freq_cnt": 0, + "low_freq_amount": Decimal("0"), + } + + for pr in part_ratios: + valid_storage = Decimal(str(pr.get("valid_storage_cnt", 0) or 0)) + avg_sales = Decimal(str(pr.get("avg_sales_cnt", 0) or 0)) + out_stock = Decimal(str(pr.get("out_stock_cnt", 0) or 0)) + cost_price = Decimal(str(pr.get("cost_price", 0) or 0)) + + # 呆滞件: 有库存但90天无出库 + if valid_storage > 0 and out_stock == 0: + stats["stagnant_cnt"] += 1 + stats["stagnant_amount"] += valid_storage * cost_price + + # 低频件: 无库存且月均销量<1 + elif valid_storage == 0 and avg_sales < 1: + stats["low_freq_cnt"] += 1 + + # 缺货件: 无库存且月均销量>=1 + elif valid_storage == 0 and avg_sales >= 1: + stats["shortage_cnt"] += 1 + # 缺货损失估算:月均销量 * 成本价 + stats["shortage_amount"] += avg_sales * cost_price + + return stats + + +def _build_suggestion_summary(part_results: list, allocated_details: list) -> str: + """构建补货建议汇总文本""" + if not part_results and not allocated_details: + return "暂无补货建议" + + lines = [] + total_cnt = 0 + total_amount = Decimal("0") + + # 优先使用 part_results (配件级汇总) + if part_results: + for pr in part_results[:10]: # 只取前10个 + if hasattr(pr, "part_code"): + lines.append( + f"- {pr.part_code} {pr.part_name}: " + f"建议{pr.total_suggest_cnt}件, " + f"金额{pr.total_suggest_amount:.2f}元, " + f"优先级{pr.priority}" + ) + total_cnt += pr.total_suggest_cnt + total_amount += pr.total_suggest_amount + elif isinstance(pr, dict): + lines.append( + f"- {pr.get('part_code', '')} {pr.get('part_name', '')}: " + f"建议{pr.get('total_suggest_cnt', 0)}件, " + f"金额{pr.get('total_suggest_amount', 0):.2f}元" + ) + + lines.insert(0, f"**总计**: {total_cnt}件配件, 金额{total_amount:.2f}元\n") + return "\n".join(lines) + + +def generate_analysis_report_node(state: dict) -> dict: + """ + 生成分析报告节点 + + 输入: part_ratios, llm_suggestions, allocated_details, part_results + 输出: analysis_report + """ + start_time = time.time() + + task_no = state.get("task_no", "") + group_id = state.get("group_id", 0) + dealer_grouping_id = state.get("dealer_grouping_id", 0) + dealer_grouping_name = state.get("dealer_grouping_name", "") + brand_grouping_id = state.get("brand_grouping_id") + statistics_date = state.get("statistics_date", "") + + part_ratios = state.get("part_ratios", []) + part_results = state.get("part_results", []) + allocated_details = state.get("allocated_details", []) + + logger.info(f"[{task_no}] 开始生成分析报告: dealer={dealer_grouping_name}") + + try: + # 计算风险统计 + risk_stats = _calculate_risk_stats(part_ratios) + + # 构建建议汇总 + suggestion_summary = _build_suggestion_summary(part_results, allocated_details) + + # 加载 Prompt + prompt_template = _load_prompt("analysis_report.md") + + # 填充 Prompt 变量 + prompt = prompt_template.format( + dealer_grouping_id=dealer_grouping_id, + dealer_grouping_name=dealer_grouping_name, + statistics_date=statistics_date, + suggestion_summary=suggestion_summary, + shortage_cnt=risk_stats["shortage_cnt"], + shortage_amount=f"{risk_stats['shortage_amount']:.2f}", + stagnant_cnt=risk_stats["stagnant_cnt"], + stagnant_amount=f"{risk_stats['stagnant_amount']:.2f}", + low_freq_cnt=risk_stats["low_freq_cnt"], + low_freq_amount="0.00", # 低频件无库存 + ) + + # 调用 LLM + llm_client = get_llm_client() + response = llm_client.invoke( + messages=[HumanMessage(content=prompt)], + ) + + # 解析 JSON 响应 + response_text = response.content.strip() + # 移除可能的 markdown 代码块 + if response_text.startswith("```"): + lines = response_text.split("\n") + response_text = "\n".join(lines[1:-1]) + + report_data = json.loads(response_text) + + # 计算统计信息 + total_suggest_cnt = sum( + d.suggest_cnt if hasattr(d, "suggest_cnt") else d.get("suggest_cnt", 0) + for d in allocated_details + ) + total_suggest_amount = sum( + d.suggest_amount if hasattr(d, "suggest_amount") else Decimal(str(d.get("suggest_amount", 0))) + for d in allocated_details + ) + + execution_time_ms = int((time.time() - start_time) * 1000) + + # 创建报告对象 + # 新 prompt 字段名映射到现有数据库字段: + # overall_assessment -> replenishment_insights + # risk_alerts -> urgency_assessment + # procurement_strategy -> strategy_recommendations + # expected_impact -> expected_outcomes + # execution_guide 已移除,置为 None + report = AnalysisReport( + task_no=task_no, + group_id=group_id, + dealer_grouping_id=dealer_grouping_id, + dealer_grouping_name=dealer_grouping_name, + brand_grouping_id=brand_grouping_id, + report_type="replenishment", + replenishment_insights=report_data.get("overall_assessment"), + urgency_assessment=report_data.get("risk_alerts"), + strategy_recommendations=report_data.get("procurement_strategy"), + execution_guide=None, + expected_outcomes=report_data.get("expected_impact"), + total_suggest_cnt=total_suggest_cnt, + total_suggest_amount=total_suggest_amount, + shortage_risk_cnt=risk_stats["shortage_cnt"], + excess_risk_cnt=risk_stats["stagnant_cnt"], + stagnant_cnt=risk_stats["stagnant_cnt"], + low_freq_cnt=risk_stats["low_freq_cnt"], + llm_provider=getattr(llm_client, "provider", ""), + llm_model=getattr(llm_client, "model", ""), + llm_tokens=response.usage.total_tokens, + execution_time_ms=execution_time_ms, + statistics_date=statistics_date, + ) + + # 保存到数据库 + result_writer = ResultWriter() + try: + result_writer.save_analysis_report(report) + finally: + result_writer.close() + + logger.info( + f"[{task_no}] 分析报告生成完成: " + f"shortage={risk_stats['shortage_cnt']}, " + f"stagnant={risk_stats['stagnant_cnt']}, " + f"time={execution_time_ms}ms" + ) + + return { + "analysis_report": report.to_dict(), + "end_time": time.time(), + } + + except Exception as e: + logger.error(f"[{task_no}] 分析报告生成失败: {e}", exc_info=True) + + # 返回空报告,不中断整个流程 + return { + "analysis_report": { + "error": str(e), + "task_no": task_no, + }, + "end_time": time.time(), + } diff --git a/src/fw_pms_ai/agent/replenishment.py b/src/fw_pms_ai/agent/replenishment.py index 4b5bb52..8121bfc 100644 --- a/src/fw_pms_ai/agent/replenishment.py +++ b/src/fw_pms_ai/agent/replenishment.py @@ -20,6 +20,7 @@ from .nodes import ( allocate_budget_node, should_retry_sql, ) +from .analysis_report_node import generate_analysis_report_node from ..models import ReplenishmentTask, TaskStatus, TaskExecutionLog, LogStatus, ReplenishmentPartSummary from ..services import ResultWriter @@ -45,7 +46,7 @@ class ReplenishmentAgent: 构建 LangGraph 工作流 工作流结构: - fetch_part_ratio → sql_agent → allocate_budget → END + fetch_part_ratio → sql_agent → allocate_budget → generate_analysis_report → END """ workflow = StateGraph(AgentState) @@ -53,6 +54,7 @@ class ReplenishmentAgent: workflow.add_node("fetch_part_ratio", fetch_part_ratio_node) workflow.add_node("sql_agent", sql_agent_node) workflow.add_node("allocate_budget", allocate_budget_node) + workflow.add_node("generate_analysis_report", generate_analysis_report_node) # 设置入口 workflow.set_entry_point("fetch_part_ratio") @@ -70,8 +72,9 @@ class ReplenishmentAgent: } ) - # allocate_budget → END - workflow.add_edge("allocate_budget", END) + # allocate_budget → generate_analysis_report → END + workflow.add_edge("allocate_budget", "generate_analysis_report") + workflow.add_edge("generate_analysis_report", END) return workflow.compile() diff --git a/src/fw_pms_ai/agent/sql_agent/analyzer.py b/src/fw_pms_ai/agent/sql_agent/analyzer.py index c50164d..11e569e 100644 --- a/src/fw_pms_ai/agent/sql_agent/analyzer.py +++ b/src/fw_pms_ai/agent/sql_agent/analyzer.py @@ -406,13 +406,13 @@ class PartAnalyzer: confidence = float(result.get("confidence", 0.8)) part_decision_reason = result.get("part_decision_reason", "") need_replenishment = result.get("need_replenishment", False) - priority = int(result.get("priority", 2)) + priority = int(result.get("priority") or 2) # 更新配件结果 part_result.need_replenishment = need_replenishment - part_result.total_suggest_cnt = int(result.get("total_suggest_cnt", 0)) + part_result.total_suggest_cnt = int(result.get("total_suggest_cnt") or 0) part_result.total_suggest_amount = Decimal(str(result.get("total_suggest_amount", 0))) - part_result.shop_count = int(result.get("shop_count", len(shop_data_list))) + part_result.shop_count = int(result.get("shop_count") or len(shop_data_list)) part_result.part_decision_reason = part_decision_reason part_result.priority = priority part_result.confidence = confidence @@ -430,11 +430,11 @@ class PartAnalyzer: shop_suggestions_data = result.get("shop_suggestions", []) if shop_suggestions_data: for shop in shop_suggestions_data: - s_id = int(shop.get("shop_id", 0)) + s_id = int(shop.get("shop_id") or 0) shop_suggestion_map[s_id] = shop # 统计需要补货的门店数 - need_replenishment_shop_count = len([s for s in shop_suggestions_data if int(s.get("suggest_cnt", 0)) > 0]) + need_replenishment_shop_count = len([s for s in shop_suggestions_data if int(s.get("suggest_cnt") or 0) > 0]) part_result.need_replenishment_shop_count = need_replenishment_shop_count # 递归所有输入门店,确保每个门店都有记录 @@ -445,10 +445,10 @@ class PartAnalyzer: # 检查LLM是否有针对该门店的建议 if shop_id in shop_suggestion_map: s_item = shop_suggestion_map[shop_id] - suggest_cnt = int(s_item.get("suggest_cnt", 0)) - suggest_amount = Decimal(str(s_item.get("suggest_amount", 0))) - reason = s_item.get("reason", part_decision_reason) - shop_priority = int(s_item.get("priority", priority)) + suggest_cnt = int(s_item.get("suggest_cnt") or 0) + suggest_amount = Decimal(str(s_item.get("suggest_amount") or 0)) + reason = s_item.get("reason") or part_decision_reason + shop_priority = int(s_item.get("priority") or priority) else: # LLM未提及该门店,根据门店数据生成个性化默认理由 suggest_cnt = 0 diff --git a/src/fw_pms_ai/agent/state.py b/src/fw_pms_ai/agent/state.py index dfdff73..2387a28 100644 --- a/src/fw_pms_ai/agent/state.py +++ b/src/fw_pms_ai/agent/state.py @@ -73,6 +73,9 @@ class AgentState(TypedDict, total=False): # 配件汇总结果 part_results: Annotated[List[Any], merge_lists] + # 分析报告 + analysis_report: Annotated[Optional[dict], keep_last] + # LLM 统计(使用累加,合并多个并行节点的 token 使用量) llm_provider: Annotated[str, keep_last] llm_model: Annotated[str, keep_last] diff --git a/src/fw_pms_ai/models/__init__.py b/src/fw_pms_ai/models/__init__.py index ab795c3..d06a8c2 100644 --- a/src/fw_pms_ai/models/__init__.py +++ b/src/fw_pms_ai/models/__init__.py @@ -6,6 +6,7 @@ from .execution_log import TaskExecutionLog, LogStatus from .part_summary import ReplenishmentPartSummary from .sql_result import SQLExecutionResult from .suggestion import ReplenishmentSuggestion, PartAnalysisResult +from .analysis_report import AnalysisReport __all__ = [ "PartRatio", @@ -18,6 +19,7 @@ __all__ = [ "SQLExecutionResult", "ReplenishmentSuggestion", "PartAnalysisResult", + "AnalysisReport", ] diff --git a/src/fw_pms_ai/models/analysis_report.py b/src/fw_pms_ai/models/analysis_report.py new file mode 100644 index 0000000..ba51bb5 --- /dev/null +++ b/src/fw_pms_ai/models/analysis_report.py @@ -0,0 +1,73 @@ +""" +数据模型 - 分析报告 +""" + +from dataclasses import dataclass, field +from decimal import Decimal +from datetime import datetime +from typing import Optional, Dict, Any + + +@dataclass +class AnalysisReport: + """AI补货建议分析报告""" + + task_no: str + group_id: int + dealer_grouping_id: int + + id: Optional[int] = None + dealer_grouping_name: Optional[str] = None + brand_grouping_id: Optional[int] = None + report_type: str = "replenishment" + + # 报告各模块 (字典结构) + replenishment_insights: Optional[Dict[str, Any]] = None + urgency_assessment: Optional[Dict[str, Any]] = None + strategy_recommendations: Optional[Dict[str, Any]] = None + execution_guide: Optional[Dict[str, Any]] = None + expected_outcomes: Optional[Dict[str, Any]] = None + + # 统计信息 + total_suggest_cnt: int = 0 + total_suggest_amount: Decimal = Decimal("0") + shortage_risk_cnt: int = 0 + excess_risk_cnt: int = 0 + stagnant_cnt: int = 0 + low_freq_cnt: int = 0 + + # LLM 元数据 + llm_provider: str = "" + llm_model: str = "" + llm_tokens: int = 0 + execution_time_ms: int = 0 + + statistics_date: str = "" + create_time: Optional[datetime] = None + + def to_dict(self) -> dict: + """转换为字典""" + return { + "task_no": self.task_no, + "group_id": self.group_id, + "dealer_grouping_id": self.dealer_grouping_id, + "dealer_grouping_name": self.dealer_grouping_name, + "brand_grouping_id": self.brand_grouping_id, + "report_type": self.report_type, + "replenishment_insights": self.replenishment_insights, + "urgency_assessment": self.urgency_assessment, + "strategy_recommendations": self.strategy_recommendations, + "execution_guide": self.execution_guide, + "expected_outcomes": self.expected_outcomes, + "total_suggest_cnt": self.total_suggest_cnt, + "total_suggest_amount": float(self.total_suggest_amount), + "shortage_risk_cnt": self.shortage_risk_cnt, + "excess_risk_cnt": self.excess_risk_cnt, + "stagnant_cnt": self.stagnant_cnt, + "low_freq_cnt": self.low_freq_cnt, + "llm_provider": self.llm_provider, + "llm_model": self.llm_model, + "llm_tokens": self.llm_tokens, + "execution_time_ms": self.execution_time_ms, + "statistics_date": self.statistics_date, + } diff --git a/src/fw_pms_ai/services/result_writer.py b/src/fw_pms_ai/services/result_writer.py index 99b5a39..bba883e 100644 --- a/src/fw_pms_ai/services/result_writer.py +++ b/src/fw_pms_ai/services/result_writer.py @@ -14,6 +14,7 @@ from ..models import ( ReplenishmentDetail, TaskExecutionLog, ReplenishmentPartSummary, + AnalysisReport, ) logger = logging.getLogger(__name__) @@ -322,3 +323,67 @@ class ResultWriter: finally: cursor.close() + def save_analysis_report(self, report: AnalysisReport) -> int: + """ + 保存分析报告 + + Returns: + 插入的报告ID + """ + conn = self._get_connection() + cursor = conn.cursor() + + try: + sql = """ + INSERT INTO ai_analysis_report ( + task_no, group_id, dealer_grouping_id, dealer_grouping_name, + brand_grouping_id, report_type, + replenishment_insights, urgency_assessment, strategy_recommendations, + execution_guide, expected_outcomes, + total_suggest_cnt, total_suggest_amount, shortage_risk_cnt, + excess_risk_cnt, stagnant_cnt, low_freq_cnt, + llm_provider, llm_model, llm_tokens, execution_time_ms, + statistics_date, create_time + ) VALUES ( + %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, NOW() + ) + """ + + values = ( + report.task_no, + report.group_id, + report.dealer_grouping_id, + report.dealer_grouping_name, + report.brand_grouping_id, + report.report_type, + json.dumps(report.replenishment_insights, ensure_ascii=False) if report.replenishment_insights else None, + json.dumps(report.urgency_assessment, ensure_ascii=False) if report.urgency_assessment else None, + json.dumps(report.strategy_recommendations, ensure_ascii=False) if report.strategy_recommendations else None, + json.dumps(report.execution_guide, ensure_ascii=False) if report.execution_guide else None, + json.dumps(report.expected_outcomes, ensure_ascii=False) if report.expected_outcomes else None, + report.total_suggest_cnt, + float(report.total_suggest_amount), + report.shortage_risk_cnt, + report.excess_risk_cnt, + report.stagnant_cnt, + report.low_freq_cnt, + report.llm_provider, + report.llm_model, + report.llm_tokens, + report.execution_time_ms, + report.statistics_date, + ) + + cursor.execute(sql, values) + conn.commit() + + report_id = cursor.lastrowid + logger.info(f"保存分析报告: task_no={report.task_no}, id={report_id}") + return report_id + + finally: + cursor.close() + diff --git a/tests/test_analysis_report.py b/tests/test_analysis_report.py new file mode 100644 index 0000000..2ef222e --- /dev/null +++ b/tests/test_analysis_report.py @@ -0,0 +1,58 @@ +""" +测试分析报告生成功能 +""" +import sys +sys.path.insert(0, "src") + +from fw_pms_ai.agent.analysis_report_node import generate_analysis_report_node + +def test_generate_analysis_report(): + """测试为 AI-FB34CA0EE6C4 生成分析报告""" + + # 模拟数据 + part_ratios = [ + {"part_code": "C211F280503-1800-AA", "part_name": "牌照灯总成", + "valid_storage_cnt": 0, "avg_sales_cnt": 2, "out_stock_cnt": 5, "cost_price": 14}, + {"part_code": "TEST-001", "part_name": "测试配件1", + "valid_storage_cnt": 10, "avg_sales_cnt": 0, "out_stock_cnt": 0, "cost_price": 100}, + {"part_code": "TEST-002", "part_name": "测试配件2", + "valid_storage_cnt": 0, "avg_sales_cnt": 0.5, "out_stock_cnt": 0, "cost_price": 50}, + ] + + part_results = [ + {"part_code": "C211F280503-1800-AA", "part_name": "牌照灯总成", + "total_suggest_cnt": 4, "total_suggest_amount": 56.0, "priority": 1}, + ] + + allocated_details = [ + {"part_code": "C211F280503-1800-AA", "suggest_cnt": 4, "suggest_amount": 56.0}, + ] + + # 构建 state + state = { + "task_no": "AI-FB34CA0EE6C4", + "group_id": 2, + "dealer_grouping_id": 48, + "dealer_grouping_name": "测试分组", + "brand_grouping_id": None, + "statistics_date": "2026-02-05", + "part_ratios": part_ratios, + "part_results": part_results, + "allocated_details": allocated_details, + } + + print("开始生成分析报告...") + result = generate_analysis_report_node(state) + + if "error" in result.get("analysis_report", {}): + print(f"\n❌ 生成失败: {result['analysis_report']['error']}") + return False + else: + print(f"\n✅ 分析报告生成成功!") + report = result.get("analysis_report", {}) + print(f" - replenishment_insights: {str(report.get('replenishment_insights', ''))[:100]}...") + print(f" - urgency_assessment: {str(report.get('urgency_assessment', ''))[:100]}...") + return True + +if __name__ == "__main__": + test_generate_analysis_report()