""" 分析报告生成节点 在补货建议工作流的最后一个节点执行,生成结构化分析报告。 包含四大板块的统计计算函数:库存概览、销量分析、库存健康度、补货建议。 """ import logging from decimal import Decimal, ROUND_HALF_UP logger = logging.getLogger(__name__) def _to_decimal(value) -> Decimal: """安全转换为 Decimal""" if value is None: return Decimal("0") return Decimal(str(value)) def calculate_inventory_overview(part_ratios: list[dict]) -> dict: """ 计算库存总体概览统计数据 有效库存 = in_stock_unlocked_cnt + on_the_way_cnt + has_plan_cnt 资金占用 = in_stock_unlocked_cnt + on_the_way_cnt(仅计算实际占用资金的库存) Args: part_ratios: PartRatio 字典列表 Returns: 库存概览统计字典 """ total_in_stock_unlocked_cnt = Decimal("0") total_in_stock_unlocked_amount = Decimal("0") total_on_the_way_cnt = Decimal("0") total_on_the_way_amount = Decimal("0") total_has_plan_cnt = Decimal("0") total_has_plan_amount = Decimal("0") total_avg_sales_cnt = Decimal("0") # 资金占用合计 = (在库未锁 + 在途) * 成本价 total_capital_occupation = Decimal("0") for p in part_ratios: cost_price = _to_decimal(p.get("cost_price", 0)) in_stock = _to_decimal(p.get("in_stock_unlocked_cnt", 0)) on_way = _to_decimal(p.get("on_the_way_cnt", 0)) has_plan = _to_decimal(p.get("has_plan_cnt", 0)) total_in_stock_unlocked_cnt += in_stock total_in_stock_unlocked_amount += in_stock * cost_price total_on_the_way_cnt += on_way total_on_the_way_amount += on_way * cost_price total_has_plan_cnt += has_plan total_has_plan_amount += has_plan * cost_price # 资金占用 = 在库未锁 + 在途 total_capital_occupation += (in_stock + on_way) * cost_price # 月均销量 out_stock = _to_decimal(p.get("out_stock_cnt", 0)) locked = _to_decimal(p.get("storage_locked_cnt", 0)) ongoing = _to_decimal(p.get("out_stock_ongoing_cnt", 0)) buy = _to_decimal(p.get("buy_cnt", 0)) avg_sales = (out_stock + locked + ongoing + buy) / Decimal("3") total_avg_sales_cnt += avg_sales total_valid_storage_cnt = ( total_in_stock_unlocked_cnt + total_on_the_way_cnt + total_has_plan_cnt ) total_valid_storage_amount = ( total_in_stock_unlocked_amount + total_on_the_way_amount + total_has_plan_amount ) # 库销比:月均销量为零时标记为特殊值 if total_avg_sales_cnt > 0: overall_ratio = total_valid_storage_cnt / total_avg_sales_cnt else: overall_ratio = Decimal("999") return { "total_valid_storage_cnt": total_valid_storage_cnt, "total_valid_storage_amount": total_valid_storage_amount, "total_capital_occupation": total_capital_occupation, "total_in_stock_unlocked_cnt": total_in_stock_unlocked_cnt, "total_in_stock_unlocked_amount": total_in_stock_unlocked_amount, "total_on_the_way_cnt": total_on_the_way_cnt, "total_on_the_way_amount": total_on_the_way_amount, "total_has_plan_cnt": total_has_plan_cnt, "total_has_plan_amount": total_has_plan_amount, "total_avg_sales_cnt": total_avg_sales_cnt, "overall_ratio": overall_ratio, "part_count": len(part_ratios), } def calculate_sales_analysis(part_ratios: list[dict]) -> dict: """ 计算销量分析统计数据 月均销量 = (out_stock_cnt + storage_locked_cnt + out_stock_ongoing_cnt + buy_cnt) / 3 Args: part_ratios: PartRatio 字典列表 Returns: 销量分析统计字典 """ total_out_stock_cnt = Decimal("0") total_storage_locked_cnt = Decimal("0") total_out_stock_ongoing_cnt = Decimal("0") total_buy_cnt = Decimal("0") total_avg_sales_amount = Decimal("0") has_sales_part_count = 0 no_sales_part_count = 0 for p in part_ratios: cost_price = _to_decimal(p.get("cost_price", 0)) out_stock = _to_decimal(p.get("out_stock_cnt", 0)) locked = _to_decimal(p.get("storage_locked_cnt", 0)) ongoing = _to_decimal(p.get("out_stock_ongoing_cnt", 0)) buy = _to_decimal(p.get("buy_cnt", 0)) total_out_stock_cnt += out_stock total_storage_locked_cnt += locked total_out_stock_ongoing_cnt += ongoing total_buy_cnt += buy avg_sales = (out_stock + locked + ongoing + buy) / Decimal("3") total_avg_sales_amount += avg_sales * cost_price if avg_sales > 0: has_sales_part_count += 1 else: no_sales_part_count += 1 total_avg_sales_cnt = ( total_out_stock_cnt + total_storage_locked_cnt + total_out_stock_ongoing_cnt + total_buy_cnt ) / Decimal("3") return { "total_avg_sales_cnt": total_avg_sales_cnt, "total_avg_sales_amount": total_avg_sales_amount, "total_out_stock_cnt": total_out_stock_cnt, "total_storage_locked_cnt": total_storage_locked_cnt, "total_out_stock_ongoing_cnt": total_out_stock_ongoing_cnt, "total_buy_cnt": total_buy_cnt, "has_sales_part_count": has_sales_part_count, "no_sales_part_count": no_sales_part_count, } def _classify_part(p: dict) -> str: """ 将配件分类为缺货/呆滞/低频/正常 分类规则(按优先级顺序判断): - 缺货件: 有效库存 = 0 且 月均销量 >= 1 - 呆滞件: 有效库存 > 0 且 90天出库数 = 0 - 低频件: 月均销量 < 1 或 出库次数 < 3 或 出库间隔 >= 30天 - 正常件: 不属于以上三类 """ in_stock = _to_decimal(p.get("in_stock_unlocked_cnt", 0)) on_way = _to_decimal(p.get("on_the_way_cnt", 0)) has_plan = _to_decimal(p.get("has_plan_cnt", 0)) valid_storage = in_stock + on_way + has_plan out_stock = _to_decimal(p.get("out_stock_cnt", 0)) locked = _to_decimal(p.get("storage_locked_cnt", 0)) ongoing = _to_decimal(p.get("out_stock_ongoing_cnt", 0)) buy = _to_decimal(p.get("buy_cnt", 0)) avg_sales = (out_stock + locked + ongoing + buy) / Decimal("3") out_times = int(p.get("out_times", 0) or 0) out_duration = int(p.get("out_duration", 0) or 0) # 缺货件 if valid_storage == 0 and avg_sales >= 1: return "shortage" # 呆滞件 if valid_storage > 0 and out_stock == 0: return "stagnant" # 低频件 if avg_sales < 1 or out_times < 3 or out_duration >= 30: return "low_freq" return "normal" def calculate_inventory_health(part_ratios: list[dict]) -> dict: """ 计算库存构成健康度统计数据 将每个配件归类为缺货件/呆滞件/低频件/正常件,统计各类型数量/金额/百分比, 并生成 chart_data 供前端图表使用。 Args: part_ratios: PartRatio 字典列表 Returns: 健康度统计字典(含 chart_data) """ categories = { "shortage": {"count": 0, "amount": Decimal("0")}, "stagnant": {"count": 0, "amount": Decimal("0")}, "low_freq": {"count": 0, "amount": Decimal("0")}, "normal": {"count": 0, "amount": Decimal("0")}, } for p in part_ratios: cat = _classify_part(p) cost_price = _to_decimal(p.get("cost_price", 0)) # 有效库存金额 in_stock = _to_decimal(p.get("in_stock_unlocked_cnt", 0)) on_way = _to_decimal(p.get("on_the_way_cnt", 0)) has_plan = _to_decimal(p.get("has_plan_cnt", 0)) valid_storage = in_stock + on_way + has_plan amount = valid_storage * cost_price categories[cat]["count"] += 1 categories[cat]["amount"] += amount total_count = len(part_ratios) total_amount = sum(c["amount"] for c in categories.values()) # 计算百分比 result = {} for cat_name, data in categories.items(): count_pct = (data["count"] / total_count * 100) if total_count > 0 else 0.0 amount_pct = (float(data["amount"]) / float(total_amount) * 100) if total_amount > 0 else 0.0 result[cat_name] = { "count": data["count"], "amount": data["amount"], "count_pct": round(count_pct, 2), "amount_pct": round(amount_pct, 2), } result["total_count"] = total_count result["total_amount"] = total_amount # chart_data 供前端 Chart.js 使用 labels = ["缺货件", "呆滞件", "低频件", "正常件"] cat_keys = ["shortage", "stagnant", "low_freq", "normal"] result["chart_data"] = { "labels": labels, "count_values": [categories[k]["count"] for k in cat_keys], "amount_values": [float(categories[k]["amount"]) for k in cat_keys], } return result def calculate_replenishment_summary(part_results: list) -> dict: """ 计算补货建议生成情况统计数据 按优先级分类统计: - priority=1: 急需补货 - priority=2: 建议补货 - priority=3: 可选补货 Args: part_results: 配件汇总结果列表(字典或 ReplenishmentPartSummary 对象) Returns: 补货建议统计字典 """ urgent = {"count": 0, "amount": Decimal("0")} suggested = {"count": 0, "amount": Decimal("0")} optional = {"count": 0, "amount": Decimal("0")} for item in part_results: # 兼容字典和对象两种形式 if isinstance(item, dict): priority = int(item.get("priority", 0)) amount = _to_decimal(item.get("total_suggest_amount", 0)) else: priority = getattr(item, "priority", 0) amount = _to_decimal(getattr(item, "total_suggest_amount", 0)) if priority == 1: urgent["count"] += 1 urgent["amount"] += amount elif priority == 2: suggested["count"] += 1 suggested["amount"] += amount elif priority == 3: optional["count"] += 1 optional["amount"] += amount total_count = urgent["count"] + suggested["count"] + optional["count"] total_amount = urgent["amount"] + suggested["amount"] + optional["amount"] return { "urgent": urgent, "suggested": suggested, "optional": optional, "total_count": total_count, "total_amount": total_amount, } # ============================================================ # LLM 分析函数 # ============================================================ import os import json import time from langchain_core.messages import SystemMessage, HumanMessage def _load_prompt(filename: str) -> str: """从 prompts 目录加载提示词文件""" prompt_path = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), "prompts", filename, ) with open(prompt_path, "r", encoding="utf-8") as f: return f.read() def _format_decimal(value) -> str: """将 Decimal 格式化为字符串,用于填充提示词""" if value is None: return "0" return str(round(float(value), 2)) def _get_season_from_date(date_str: str) -> str: """ 根据日期字符串获取季节 Args: date_str: 日期字符串,格式如 "2024-01-15" 或 "20240115" Returns: 季节名称:春季/夏季/秋季/冬季 """ from datetime import datetime try: # 尝试解析不同格式的日期 if "-" in date_str: dt = datetime.strptime(date_str[:10], "%Y-%m-%d") else: dt = datetime.strptime(date_str[:8], "%Y%m%d") month = dt.month except (ValueError, TypeError): # 解析失败时使用当前月份 month = datetime.now().month if month in (3, 4, 5): return "春季(3-5月)" elif month in (6, 7, 8): return "夏季(6-8月)" elif month in (9, 10, 11): return "秋季(9-11月)" else: return "冬季(12-2月)" def _parse_llm_json(content: str) -> dict: """ 解析 LLM 返回的 JSON 内容 尝试直接解析,如果失败则尝试提取 ```json 代码块中的内容。 """ text = content.strip() # 尝试直接解析 try: return json.loads(text) except json.JSONDecodeError: pass # 尝试提取 ```json ... ``` 代码块 import re match = re.search(r"```json\s*(.*?)\s*```", text, re.DOTALL) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: pass # 尝试提取 { ... } 块 start = text.find("{") end = text.rfind("}") if start != -1 and end != -1 and end > start: try: return json.loads(text[start : end + 1]) except json.JSONDecodeError: pass # 解析失败 raise json.JSONDecodeError("无法从 LLM 响应中解析 JSON", text, 0) def llm_analyze_inventory_overview(stats: dict, statistics_date: str = "", llm_client=None) -> tuple[dict, dict]: """ LLM 分析库存概览 Args: stats: calculate_inventory_overview 的输出 statistics_date: 统计日期 llm_client: LLM 客户端实例,为 None 时自动获取 Returns: (llm_analysis_dict, usage_dict) """ from ..llm import get_llm_client if llm_client is None: llm_client = get_llm_client() current_season = _get_season_from_date(statistics_date) prompt_template = _load_prompt("report_inventory_overview.md") prompt = prompt_template.format( part_count=stats.get("part_count", 0), total_valid_storage_cnt=_format_decimal(stats.get("total_valid_storage_cnt")), total_valid_storage_amount=_format_decimal(stats.get("total_valid_storage_amount")), total_avg_sales_cnt=_format_decimal(stats.get("total_avg_sales_cnt")), overall_ratio=_format_decimal(stats.get("overall_ratio")), total_in_stock_unlocked_cnt=_format_decimal(stats.get("total_in_stock_unlocked_cnt")), total_in_stock_unlocked_amount=_format_decimal(stats.get("total_in_stock_unlocked_amount")), total_on_the_way_cnt=_format_decimal(stats.get("total_on_the_way_cnt")), total_on_the_way_amount=_format_decimal(stats.get("total_on_the_way_amount")), total_has_plan_cnt=_format_decimal(stats.get("total_has_plan_cnt")), total_has_plan_amount=_format_decimal(stats.get("total_has_plan_amount")), current_season=current_season, statistics_date=statistics_date or "未知", ) messages = [HumanMessage(content=prompt)] response = llm_client.invoke(messages) try: analysis = _parse_llm_json(response.content) except json.JSONDecodeError: logger.warning(f"库存概览 LLM JSON 解析失败,原始响应: {response.content[:200]}") analysis = {"error": "JSON解析失败", "raw": response.content[:200]} usage = { "provider": response.usage.provider, "model": response.usage.model, "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, } return analysis, usage def llm_analyze_sales(stats: dict, statistics_date: str = "", llm_client=None) -> tuple[dict, dict]: """ LLM 分析销量 Args: stats: calculate_sales_analysis 的输出 statistics_date: 统计日期 llm_client: LLM 客户端实例 Returns: (llm_analysis_dict, usage_dict) """ from ..llm import get_llm_client if llm_client is None: llm_client = get_llm_client() current_season = _get_season_from_date(statistics_date) prompt_template = _load_prompt("report_sales_analysis.md") prompt = prompt_template.format( total_avg_sales_cnt=_format_decimal(stats.get("total_avg_sales_cnt")), total_avg_sales_amount=_format_decimal(stats.get("total_avg_sales_amount")), has_sales_part_count=stats.get("has_sales_part_count", 0), no_sales_part_count=stats.get("no_sales_part_count", 0), total_out_stock_cnt=_format_decimal(stats.get("total_out_stock_cnt")), total_storage_locked_cnt=_format_decimal(stats.get("total_storage_locked_cnt")), total_out_stock_ongoing_cnt=_format_decimal(stats.get("total_out_stock_ongoing_cnt")), total_buy_cnt=_format_decimal(stats.get("total_buy_cnt")), current_season=current_season, statistics_date=statistics_date or "未知", ) messages = [HumanMessage(content=prompt)] response = llm_client.invoke(messages) try: analysis = _parse_llm_json(response.content) except json.JSONDecodeError: logger.warning(f"销量分析 LLM JSON 解析失败,原始响应: {response.content[:200]}") analysis = {"error": "JSON解析失败", "raw": response.content[:200]} usage = { "provider": response.usage.provider, "model": response.usage.model, "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, } return analysis, usage def llm_analyze_inventory_health(stats: dict, statistics_date: str = "", llm_client=None) -> tuple[dict, dict]: """ LLM 分析库存健康度 Args: stats: calculate_inventory_health 的输出 statistics_date: 统计日期 llm_client: LLM 客户端实例 Returns: (llm_analysis_dict, usage_dict) """ from ..llm import get_llm_client if llm_client is None: llm_client = get_llm_client() current_season = _get_season_from_date(statistics_date) prompt_template = _load_prompt("report_inventory_health.md") prompt = prompt_template.format( total_count=stats.get("total_count", 0), total_amount=_format_decimal(stats.get("total_amount")), shortage_count=stats.get("shortage", {}).get("count", 0), shortage_count_pct=stats.get("shortage", {}).get("count_pct", 0), shortage_amount=_format_decimal(stats.get("shortage", {}).get("amount")), shortage_amount_pct=stats.get("shortage", {}).get("amount_pct", 0), stagnant_count=stats.get("stagnant", {}).get("count", 0), stagnant_count_pct=stats.get("stagnant", {}).get("count_pct", 0), stagnant_amount=_format_decimal(stats.get("stagnant", {}).get("amount")), stagnant_amount_pct=stats.get("stagnant", {}).get("amount_pct", 0), low_freq_count=stats.get("low_freq", {}).get("count", 0), low_freq_count_pct=stats.get("low_freq", {}).get("count_pct", 0), low_freq_amount=_format_decimal(stats.get("low_freq", {}).get("amount")), low_freq_amount_pct=stats.get("low_freq", {}).get("amount_pct", 0), normal_count=stats.get("normal", {}).get("count", 0), normal_count_pct=stats.get("normal", {}).get("count_pct", 0), normal_amount=_format_decimal(stats.get("normal", {}).get("amount")), normal_amount_pct=stats.get("normal", {}).get("amount_pct", 0), current_season=current_season, statistics_date=statistics_date or "未知", ) messages = [HumanMessage(content=prompt)] response = llm_client.invoke(messages) try: analysis = _parse_llm_json(response.content) except json.JSONDecodeError: logger.warning(f"健康度 LLM JSON 解析失败,原始响应: {response.content[:200]}") analysis = {"error": "JSON解析失败", "raw": response.content[:200]} usage = { "provider": response.usage.provider, "model": response.usage.model, "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, } return analysis, usage def llm_analyze_replenishment_summary(stats: dict, statistics_date: str = "", llm_client=None) -> tuple[dict, dict]: """ LLM 分析补货建议 Args: stats: calculate_replenishment_summary 的输出 statistics_date: 统计日期 llm_client: LLM 客户端实例 Returns: (llm_analysis_dict, usage_dict) """ from ..llm import get_llm_client if llm_client is None: llm_client = get_llm_client() current_season = _get_season_from_date(statistics_date) prompt_template = _load_prompt("report_replenishment_summary.md") prompt = prompt_template.format( total_count=stats.get("total_count", 0), total_amount=_format_decimal(stats.get("total_amount")), urgent_count=stats.get("urgent", {}).get("count", 0), urgent_amount=_format_decimal(stats.get("urgent", {}).get("amount")), suggested_count=stats.get("suggested", {}).get("count", 0), suggested_amount=_format_decimal(stats.get("suggested", {}).get("amount")), optional_count=stats.get("optional", {}).get("count", 0), optional_amount=_format_decimal(stats.get("optional", {}).get("amount")), current_season=current_season, statistics_date=statistics_date or "未知", ) messages = [HumanMessage(content=prompt)] response = llm_client.invoke(messages) try: analysis = _parse_llm_json(response.content) except json.JSONDecodeError: logger.warning(f"补货建议 LLM JSON 解析失败,原始响应: {response.content[:200]}") analysis = {"error": "JSON解析失败", "raw": response.content[:200]} usage = { "provider": response.usage.provider, "model": response.usage.model, "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, } return analysis, usage # ============================================================ # LangGraph 并发子图 # ============================================================ from typing import TypedDict, Optional, Any, Annotated, Dict from langgraph.graph import StateGraph, START, END def _merge_dict(left: Optional[dict], right: Optional[dict]) -> Optional[dict]: """合并字典,保留非 None 的值""" if right is not None: return right return left def _sum_int(left: int, right: int) -> int: """累加整数""" return (left or 0) + (right or 0) def _merge_str(left: Optional[str], right: Optional[str]) -> Optional[str]: """合并字符串,保留非 None 的值""" if right is not None: return right return left class ReportLLMState(TypedDict, total=False): """并发 LLM 分析子图的状态""" # 输入:四大板块的统计数据(只读,由主函数写入) inventory_overview_stats: Annotated[Optional[dict], _merge_dict] sales_analysis_stats: Annotated[Optional[dict], _merge_dict] inventory_health_stats: Annotated[Optional[dict], _merge_dict] replenishment_summary_stats: Annotated[Optional[dict], _merge_dict] # 输入:统计日期(用于季节判断) statistics_date: Annotated[Optional[str], _merge_str] # 输出:四大板块的 LLM 分析结果(各节点独立写入) inventory_overview_analysis: Annotated[Optional[dict], _merge_dict] sales_analysis_analysis: Annotated[Optional[dict], _merge_dict] inventory_health_analysis: Annotated[Optional[dict], _merge_dict] replenishment_summary_analysis: Annotated[Optional[dict], _merge_dict] # LLM 使用量(累加) total_prompt_tokens: Annotated[int, _sum_int] total_completion_tokens: Annotated[int, _sum_int] llm_provider: Annotated[Optional[str], _merge_dict] llm_model: Annotated[Optional[str], _merge_dict] def _node_inventory_overview(state: ReportLLMState) -> ReportLLMState: """并发节点:库存概览 LLM 分析""" stats = state.get("inventory_overview_stats") statistics_date = state.get("statistics_date", "") if not stats: return {"inventory_overview_analysis": {"error": "无统计数据"}} try: analysis, usage = llm_analyze_inventory_overview(stats, statistics_date) return { "inventory_overview_analysis": analysis, "total_prompt_tokens": usage.get("prompt_tokens", 0), "total_completion_tokens": usage.get("completion_tokens", 0), "llm_provider": usage.get("provider", ""), "llm_model": usage.get("model", ""), } except Exception as e: logger.error(f"库存概览 LLM 分析失败: {e}") return {"inventory_overview_analysis": {"error": str(e)}} def _node_sales_analysis(state: ReportLLMState) -> ReportLLMState: """并发节点:销量分析 LLM 分析""" stats = state.get("sales_analysis_stats") statistics_date = state.get("statistics_date", "") if not stats: return {"sales_analysis_analysis": {"error": "无统计数据"}} try: analysis, usage = llm_analyze_sales(stats, statistics_date) return { "sales_analysis_analysis": analysis, "total_prompt_tokens": usage.get("prompt_tokens", 0), "total_completion_tokens": usage.get("completion_tokens", 0), "llm_provider": usage.get("provider", ""), "llm_model": usage.get("model", ""), } except Exception as e: logger.error(f"销量分析 LLM 分析失败: {e}") return {"sales_analysis_analysis": {"error": str(e)}} def _node_inventory_health(state: ReportLLMState) -> ReportLLMState: """并发节点:健康度 LLM 分析""" stats = state.get("inventory_health_stats") statistics_date = state.get("statistics_date", "") if not stats: return {"inventory_health_analysis": {"error": "无统计数据"}} try: analysis, usage = llm_analyze_inventory_health(stats, statistics_date) return { "inventory_health_analysis": analysis, "total_prompt_tokens": usage.get("prompt_tokens", 0), "total_completion_tokens": usage.get("completion_tokens", 0), "llm_provider": usage.get("provider", ""), "llm_model": usage.get("model", ""), } except Exception as e: logger.error(f"健康度 LLM 分析失败: {e}") return {"inventory_health_analysis": {"error": str(e)}} def _node_replenishment_summary(state: ReportLLMState) -> ReportLLMState: """并发节点:补货建议 LLM 分析""" stats = state.get("replenishment_summary_stats") statistics_date = state.get("statistics_date", "") if not stats: return {"replenishment_summary_analysis": {"error": "无统计数据"}} try: analysis, usage = llm_analyze_replenishment_summary(stats, statistics_date) return { "replenishment_summary_analysis": analysis, "total_prompt_tokens": usage.get("prompt_tokens", 0), "total_completion_tokens": usage.get("completion_tokens", 0), "llm_provider": usage.get("provider", ""), "llm_model": usage.get("model", ""), } except Exception as e: logger.error(f"补货建议 LLM 分析失败: {e}") return {"replenishment_summary_analysis": {"error": str(e)}} def build_report_llm_subgraph() -> StateGraph: """ 构建并发 LLM 分析子图 四个 LLM 节点从 START fan-out 并发执行,结果 fan-in 汇总到 END。 """ graph = StateGraph(ReportLLMState) # 添加四个并发节点 graph.add_node("inventory_overview_llm", _node_inventory_overview) graph.add_node("sales_analysis_llm", _node_sales_analysis) graph.add_node("inventory_health_llm", _node_inventory_health) graph.add_node("replenishment_summary_llm", _node_replenishment_summary) # fan-out: START → 四个节点 graph.add_edge(START, "inventory_overview_llm") graph.add_edge(START, "sales_analysis_llm") graph.add_edge(START, "inventory_health_llm") graph.add_edge(START, "replenishment_summary_llm") # fan-in: 四个节点 → END graph.add_edge("inventory_overview_llm", END) graph.add_edge("sales_analysis_llm", END) graph.add_edge("inventory_health_llm", END) graph.add_edge("replenishment_summary_llm", END) return graph.compile() # ============================================================ # 主节点函数 # ============================================================ def _serialize_stats(stats: dict) -> dict: """将统计数据中的 Decimal 转换为 float,以便 JSON 序列化""" result = {} for k, v in stats.items(): if isinstance(v, Decimal): result[k] = float(v) elif isinstance(v, dict): result[k] = _serialize_stats(v) elif isinstance(v, list): result[k] = [ _serialize_stats(item) if isinstance(item, dict) else (float(item) if isinstance(item, Decimal) else item) for item in v ] else: result[k] = v return result def generate_analysis_report_node(state: dict) -> dict: """ 分析报告生成主节点 串联流程: 1. 统计计算(四大板块) 2. 并发 LLM 分析(LangGraph 子图) 3. 汇总报告 4. 写入数据库 单板块 LLM 失败不影响其他板块。 Args: state: AgentState 字典 Returns: 更新后的 state 字典 """ from .state import AgentState from ..models import AnalysisReport from ..services.result_writer import ResultWriter logger.info("[AnalysisReport] ========== 开始生成分析报告 ==========") start_time = time.time() part_ratios = state.get("part_ratios", []) part_results = state.get("part_results", []) # ---- 1. 统计计算 ---- logger.info(f"[AnalysisReport] 统计计算: part_ratios={len(part_ratios)}, part_results={len(part_results)}") inventory_overview_stats = calculate_inventory_overview(part_ratios) sales_analysis_stats = calculate_sales_analysis(part_ratios) inventory_health_stats = calculate_inventory_health(part_ratios) replenishment_summary_stats = calculate_replenishment_summary(part_results) # 序列化统计数据(Decimal → float) io_stats_serialized = _serialize_stats(inventory_overview_stats) sa_stats_serialized = _serialize_stats(sales_analysis_stats) ih_stats_serialized = _serialize_stats(inventory_health_stats) rs_stats_serialized = _serialize_stats(replenishment_summary_stats) # ---- 2. 并发 LLM 分析 ---- logger.info("[AnalysisReport] 启动并发 LLM 分析子图") statistics_date = state.get("statistics_date", "") subgraph = build_report_llm_subgraph() llm_state: ReportLLMState = { "inventory_overview_stats": io_stats_serialized, "sales_analysis_stats": sa_stats_serialized, "inventory_health_stats": ih_stats_serialized, "replenishment_summary_stats": rs_stats_serialized, "statistics_date": statistics_date, "inventory_overview_analysis": None, "sales_analysis_analysis": None, "inventory_health_analysis": None, "replenishment_summary_analysis": None, "total_prompt_tokens": 0, "total_completion_tokens": 0, "llm_provider": None, "llm_model": None, } try: llm_result = subgraph.invoke(llm_state) except Exception as e: logger.error(f"[AnalysisReport] LLM 子图执行异常: {e}") llm_result = llm_state # 使用初始状态(所有分析为 None) # ---- 3. 汇总报告 ---- inventory_overview_data = { "stats": io_stats_serialized, "llm_analysis": llm_result.get("inventory_overview_analysis") or {"error": "未生成"}, } sales_analysis_data = { "stats": sa_stats_serialized, "llm_analysis": llm_result.get("sales_analysis_analysis") or {"error": "未生成"}, } inventory_health_data = { "stats": ih_stats_serialized, "chart_data": ih_stats_serialized.get("chart_data"), "llm_analysis": llm_result.get("inventory_health_analysis") or {"error": "未生成"}, } replenishment_summary_data = { "stats": rs_stats_serialized, "llm_analysis": llm_result.get("replenishment_summary_analysis") or {"error": "未生成"}, } total_tokens = ( (llm_result.get("total_prompt_tokens") or 0) + (llm_result.get("total_completion_tokens") or 0) ) execution_time_ms = int((time.time() - start_time) * 1000) # ---- 4. 写入数据库 ---- report = AnalysisReport( task_no=state.get("task_no", ""), group_id=state.get("group_id", 0), dealer_grouping_id=state.get("dealer_grouping_id", 0), dealer_grouping_name=state.get("dealer_grouping_name"), brand_grouping_id=state.get("brand_grouping_id"), inventory_overview=inventory_overview_data, sales_analysis=sales_analysis_data, inventory_health=inventory_health_data, replenishment_summary=replenishment_summary_data, llm_provider=llm_result.get("llm_provider") or "", llm_model=llm_result.get("llm_model") or "", llm_tokens=total_tokens, execution_time_ms=execution_time_ms, statistics_date=state.get("statistics_date", ""), ) try: writer = ResultWriter() report_id = writer.save_analysis_report(report) writer.close() logger.info(f"[AnalysisReport] 报告已保存: id={report_id}, tokens={total_tokens}, 耗时={execution_time_ms}ms") except Exception as e: logger.error(f"[AnalysisReport] 报告写入数据库失败: {e}") # 返回更新后的状态 return { "analysis_report": report.to_dict(), "llm_provider": llm_result.get("llm_provider") or state.get("llm_provider", ""), "llm_model": llm_result.get("llm_model") or state.get("llm_model", ""), "llm_prompt_tokens": llm_result.get("total_prompt_tokens") or 0, "llm_completion_tokens": llm_result.get("total_completion_tokens") or 0, "current_node": "generate_analysis_report", "next_node": "end", }