""" 配件分析器模块 负责配件分组分析、LLM 调用和结果解析 """ import logging import time import json import concurrent.futures from typing import Any, Dict, List, Optional, Tuple from decimal import Decimal from langchain_core.messages import SystemMessage, HumanMessage from .prompts import ( load_prompt, SUGGESTION_PROMPT, SUGGESTION_SYSTEM_PROMPT, PART_SHOP_ANALYSIS_PROMPT, PART_SHOP_ANALYSIS_SYSTEM_PROMPT, ) from ...llm import get_llm_client from ...models import ReplenishmentSuggestion, PartAnalysisResult logger = logging.getLogger(__name__) class PartAnalyzer: """配件分析器 - 负责 LLM 分析和结果解析""" def __init__(self): self._llm = get_llm_client() def group_parts_by_code(self, part_ratios: List[Dict]) -> Dict[str, List[Dict]]: """ 按配件编码分组 Args: part_ratios: 配件库销比数据列表 Returns: {part_code: [各门店数据列表]} """ grouped = {} for pr in part_ratios: part_code = pr.get("part_code", "") if not part_code: continue if part_code not in grouped: grouped[part_code] = [] grouped[part_code].append(pr) logger.info(f"配件分组完成: 总配件数={len(grouped)}, 总记录数={len(part_ratios)}") return grouped def generate_suggestions( self, part_data: List[Dict], dealer_grouping_id: int, dealer_grouping_name: str, statistics_date: str, ) -> Tuple[List[ReplenishmentSuggestion], Dict]: """ 生成补货建议 Args: part_data: 配件数据 dealer_grouping_id: 商家组合ID dealer_grouping_name: 商家组合名称 statistics_date: 统计日期 Returns: (补货建议列表, LLM统计信息) """ if not part_data: return [], {"prompt_tokens": 0, "completion_tokens": 0} # 将所有数据传给LLM分析 part_data_str = json.dumps(part_data, ensure_ascii=False, indent=2, default=str) prompt = SUGGESTION_PROMPT.format( dealer_grouping_id=dealer_grouping_id, dealer_grouping_name=dealer_grouping_name, statistics_date=statistics_date, part_data=part_data_str, ) messages = [ SystemMessage(content=SUGGESTION_SYSTEM_PROMPT), HumanMessage(content=prompt), ] response = self._llm.invoke(messages) content = response.content.strip() suggestions = [] try: # 提取JSON if "```json" in content: content = content.split("```json")[1].split("```")[0].strip() elif "```" in content: content = content.split("```")[1].split("```")[0].strip() raw_suggestions = json.loads(content) for item in raw_suggestions: suggestions.append(ReplenishmentSuggestion( shop_id=item.get("shop_id", 0), shop_name=item.get("shop_name", ""), part_code=item.get("part_code", ""), part_name=item.get("part_name", ""), unit=item.get("unit", ""), cost_price=Decimal(str(item.get("cost_price", 0))), current_storage_cnt=Decimal(str(item.get("current_storage_cnt", 0))), avg_sales_cnt=Decimal(str(item.get("avg_sales_cnt", 0))), current_ratio=Decimal(str(item.get("current_ratio", 0))), suggest_cnt=int(item.get("suggest_cnt", 0)), suggest_amount=Decimal(str(item.get("suggest_amount", 0))), suggestion_reason=item.get("suggestion_reason", ""), priority=int(item.get("priority", 2)), confidence=float(item.get("confidence", 0.8)), )) except json.JSONDecodeError as e: logger.error(f"解析LLM建议失败: {e}") llm_stats = { "prompt_tokens": response.usage.prompt_tokens if response.usage else 0, "completion_tokens": response.usage.completion_tokens if response.usage else 0, } logger.info(f"生成补货建议: {len(suggestions)}条") return suggestions, llm_stats def analyze_parts_by_group( self, part_ratios: List[Dict], dealer_grouping_id: int, dealer_grouping_name: str, statistics_date: str, target_ratio: Decimal = Decimal("1.3"), limit: Optional[int] = None, callback: Optional[Any] = None, ) -> Tuple[List[ReplenishmentSuggestion], List[PartAnalysisResult], Dict]: """ 按配件分组分析补货建议 Args: part_ratios: 配件库销比数据列表 dealer_grouping_id: 商家组合ID dealer_grouping_name: 商家组合名称 statistics_date: 统计日期 target_ratio: 目标库销比(基准库销比) limit: 测试限制数量 callback: 批处理回调函数(suggestions) Returns: (补货建议列表, 配件分析结果列表, LLM统计信息) """ if not part_ratios: return [], [], {"prompt_tokens": 0, "completion_tokens": 0} # 按 part_code 分组 grouped_parts = self.group_parts_by_code(part_ratios) # 应用限制 all_part_codes = list(grouped_parts.keys()) if limit and limit > 0: logger.warning(f"启用测试限制: 仅处理前 {limit} 个配件 (总数: {len(all_part_codes)})") all_part_codes = all_part_codes[:limit] all_suggestions = [] all_part_results: List[PartAnalysisResult] = [] total_prompt_tokens = 0 total_completion_tokens = 0 system_prompt = PART_SHOP_ANALYSIS_SYSTEM_PROMPT user_prompt_template = PART_SHOP_ANALYSIS_PROMPT # 将目标库销比格式化到 Prompt 中 target_ratio_str = f"{float(target_ratio):.2f}" system_prompt = system_prompt.replace("{target_ratio}", target_ratio_str) def process_single_part(part_code: str) -> Tuple[PartAnalysisResult, List[ReplenishmentSuggestion], int, int]: """处理单个配件""" shop_data_list = grouped_parts[part_code] if not shop_data_list: return None, [], 0, 0 # 获取配件基本信息 first_item = shop_data_list[0] part_name = first_item.get("part_name", "") cost_price = first_item.get("cost_price", 0) unit = first_item.get("unit", "") # 构建门店数据 shop_data_str = json.dumps(shop_data_list, ensure_ascii=False, indent=2, default=str) prompt = user_prompt_template.format( part_code=part_code, part_name=part_name, cost_price=cost_price, unit=unit, dealer_grouping_name=dealer_grouping_name, statistics_date=statistics_date, shop_data=shop_data_str, target_ratio=target_ratio_str, ) messages = [ SystemMessage(content=system_prompt), HumanMessage(content=prompt), ] p_tokens = 0 c_tokens = 0 try: response = self._llm.invoke(messages) content = response.content.strip() if response.usage: p_tokens = response.usage.prompt_tokens c_tokens = response.usage.completion_tokens # 解析 LLM 响应 part_result, suggestions = self._parse_part_analysis_response( content, part_code, part_name, unit, cost_price, shop_data_list, target_ratio ) # 请求间延迟,避免触发速率限制 time.sleep(0.5) return part_result, suggestions, p_tokens, c_tokens except Exception as e: logger.error(f"分析配件 {part_code} 失败: {e}") # 失败后等待更长时间再继续 time.sleep(2.0) return None, [], 0, 0 # 并发执行 batch_size = 10 current_batch = [] finished_count = 0 total_count = len(all_part_codes) # 最大并发数150,但不超过配件数量 max_workers = min(150, total_count) if total_count > 0 else 1 logger.info(f"开始并行分析: workers={max_workers}, parts={total_count}") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_part = { executor.submit(process_single_part, code): code for code in all_part_codes } for future in concurrent.futures.as_completed(future_to_part): part_code = future_to_part[future] finished_count += 1 try: part_result, suggestions, p_t, c_t = future.result() if part_result: all_part_results.append(part_result) if suggestions: all_suggestions.extend(suggestions) current_batch.extend(suggestions) total_prompt_tokens += p_t total_completion_tokens += c_t # 批量回调处理 if callback and len(current_batch) >= batch_size: try: callback(current_batch) logger.info(f"批次落库: {len(current_batch)} 条") current_batch = [] except Exception as e: logger.error(f"回调执行失败: {e}") except Exception as e: logger.error(f"任务执行异常 {part_code}: {e}") if finished_count % 10 == 0: logger.info(f"进度: {finished_count}/{total_count} ({(finished_count/total_count*100):.1f}%)") # 处理剩余批次 if callback and current_batch: try: callback(current_batch) logger.info(f"最后批次落库: {len(current_batch)} 条") except Exception as e: logger.error(f"最后回调执行失败: {e}") llm_stats = { "prompt_tokens": total_prompt_tokens, "completion_tokens": total_completion_tokens, } logger.info( f"分组分析完成: 配件数={len(grouped_parts)}, " f"配件汇总数={len(all_part_results)}, " f"建议数={len(all_suggestions)}, tokens={total_prompt_tokens + total_completion_tokens}" ) return all_suggestions, all_part_results, llm_stats def _calculate_priority_by_ratio( self, current_ratio: Decimal, avg_sales: Decimal, target_ratio: Decimal, ) -> int: """ 根据库销比计算优先级 规则: - 库销比 < 0.5 且月均销量 >= 1: 高优先级 (1) - 库销比 0.5-1.0 且月均销量 >= 1: 中优先级 (2) - 库销比 1.0-target_ratio 且月均销量 >= 1: 低优先级 (3) - 其他情况: 无需补货 (0) Args: current_ratio: 当前库销比 avg_sales: 月均销量 target_ratio: 目标库销比 Returns: 优先级 (0=无需补货, 1=高, 2=中, 3=低) """ if avg_sales < 1: return 0 if current_ratio < Decimal("0.5"): return 1 elif current_ratio < Decimal("1.0"): return 2 elif current_ratio < target_ratio: return 3 else: return 0 def _parse_part_analysis_response( self, content: str, part_code: str, part_name: str, unit: str, cost_price: float, shop_data_list: List[Dict], target_ratio: Decimal = Decimal("1.3"), ) -> Tuple[PartAnalysisResult, List[ReplenishmentSuggestion]]: """ 解析单配件分析响应 Args: content: LLM 响应内容 part_code: 配件编码 part_name: 配件名称 unit: 单位 cost_price: 成本价 shop_data_list: 门店数据列表 Returns: (配件分析结果, 补货建议列表) """ suggestions = [] # 计算默认配件汇总数据 total_storage = sum(Decimal(str(s.get("valid_storage_cnt", 0))) for s in shop_data_list) total_avg_sales = sum(Decimal(str(s.get("avg_sales_cnt", 0))) for s in shop_data_list) group_ratio = total_storage / total_avg_sales if total_avg_sales > 0 else Decimal("0") part_result = PartAnalysisResult( part_code=part_code, part_name=part_name, unit=unit, cost_price=Decimal(str(cost_price)), total_storage_cnt=total_storage, total_avg_sales_cnt=total_avg_sales, group_current_ratio=group_ratio, need_replenishment=False, total_suggest_cnt=0, total_suggest_amount=Decimal("0"), shop_count=len(shop_data_list), need_replenishment_shop_count=0, part_decision_reason="", priority=2, confidence=0.8, suggestions=[], ) try: # 提取 JSON if "```json" in content: content = content.split("```json")[1].split("```")[0].strip() elif "```" in content: content = content.split("```")[1].split("```")[0].strip() result = json.loads(content) # 获取配件级汇总信息 confidence = float(result.get("confidence", 0.8)) part_decision_reason = result.get("part_decision_reason", "") need_replenishment = result.get("need_replenishment", False) priority = int(result.get("priority", 2)) # 更新配件结果 part_result.need_replenishment = need_replenishment part_result.total_suggest_cnt = int(result.get("total_suggest_cnt", 0)) part_result.total_suggest_amount = Decimal(str(result.get("total_suggest_amount", 0))) part_result.shop_count = int(result.get("shop_count", len(shop_data_list))) part_result.part_decision_reason = part_decision_reason part_result.priority = priority part_result.confidence = confidence # 如果LLM返回了商家组合级数据,使用LLM的数据 if "total_storage_cnt" in result: part_result.total_storage_cnt = Decimal(str(result["total_storage_cnt"])) if "total_avg_sales_cnt" in result: part_result.total_avg_sales_cnt = Decimal(str(result["total_avg_sales_cnt"])) if "group_current_ratio" in result: part_result.group_current_ratio = Decimal(str(result["group_current_ratio"])) # 构建建议字典以便快速查找 shop_suggestion_map = {} shop_suggestions_data = result.get("shop_suggestions", []) if shop_suggestions_data: for shop in shop_suggestions_data: s_id = int(shop.get("shop_id", 0)) shop_suggestion_map[s_id] = shop # 统计需要补货的门店数 need_replenishment_shop_count = len([s for s in shop_suggestions_data if int(s.get("suggest_cnt", 0)) > 0]) part_result.need_replenishment_shop_count = need_replenishment_shop_count # 递归所有输入门店,确保每个门店都有记录 for shop_data in shop_data_list: shop_id = int(shop_data.get("shop_id", 0)) shop_name = shop_data.get("shop_name", "") # 检查LLM是否有针对该门店的建议 if shop_id in shop_suggestion_map: s_item = shop_suggestion_map[shop_id] suggest_cnt = int(s_item.get("suggest_cnt", 0)) suggest_amount = Decimal(str(s_item.get("suggest_amount", 0))) reason = s_item.get("reason", part_decision_reason) shop_priority = int(s_item.get("priority", priority)) else: # LLM未提及该门店,根据门店数据生成个性化默认理由 suggest_cnt = 0 suggest_amount = Decimal("0") # 计算该门店的库存和销售数据 _storage = Decimal(str(shop_data.get("valid_storage_cnt", 0))) _avg_sales = Decimal(str(shop_data.get("avg_sales_cnt", 0))) _out_times = shop_data.get("out_times", 0) or 0 _out_duration = shop_data.get("out_duration", 0) or 0 _ratio = _storage / _avg_sales if _avg_sales > 0 else Decimal("0") # 根据库销比规则计算 priority shop_priority = self._calculate_priority_by_ratio(_ratio, _avg_sales, target_ratio) if _storage > 0 and _avg_sales <= 0: reason = f"「呆滞件」当前库存{_storage}件,但90天内无销售记录,库存滞销风险高,暂不补货。" shop_priority = 0 elif _storage <= 0 and _avg_sales < 1: reason = f"「低频件-需求不足」当前库存{_storage}件,月均销量{_avg_sales:.2f}件,需求过低,暂不纳入补货计划。" shop_priority = 0 elif _out_times < 3: reason = f"「低频件-出库次数不足」90天内仅出库{_out_times}次(阈值≥3次),周转频率过低,暂不纳入补货计划。" shop_priority = 0 elif _out_duration >= 30: reason = f"「低频件-出库间隔过长」平均出库间隔{_out_duration}天(阈值<30天),周转周期过长,暂不纳入补货计划。" shop_priority = 0 elif _avg_sales > 0 and _ratio >= target_ratio: _days = int(_storage / _avg_sales * 30) if _avg_sales > 0 else 0 reason = f"「库存充足」当前库存{_storage}件,月均销量{_avg_sales:.2f}件,库销比{_ratio:.2f},可支撑约{_days}天销售,无需补货。" shop_priority = 0 elif shop_priority == 1: _days = int(_storage / _avg_sales * 30) if _avg_sales > 0 else 0 reason = f"「急需补货」当前库存{_storage}件,月均销量{_avg_sales:.2f}件,库销比{_ratio:.2f},仅可支撑约{_days}天销售,存在缺货风险。" elif shop_priority == 2: _days = int(_storage / _avg_sales * 30) if _avg_sales > 0 else 0 reason = f"「建议补货」当前库存{_storage}件,月均销量{_avg_sales:.2f}件,库销比{_ratio:.2f},可支撑约{_days}天销售,库存偏低建议补货。" elif shop_priority == 3: _days = int(_storage / _avg_sales * 30) if _avg_sales > 0 else 0 reason = f"「可选补货」当前库存{_storage}件,月均销量{_avg_sales:.2f}件,库销比{_ratio:.2f},可支撑约{_days}天销售,可根据资金情况酌情补货。" else: reason = f"「无需补货」当前库存{_storage}件,月均销量{_avg_sales:.2f}件,AI分析判定暂不补货。" curr_storage = Decimal(str(shop_data.get("valid_storage_cnt", 0))) avg_sales = Decimal(str(shop_data.get("avg_sales_cnt", 0))) if avg_sales > 0: current_ratio = curr_storage / avg_sales else: current_ratio = Decimal("0") suggestion = ReplenishmentSuggestion( shop_id=shop_id, shop_name=shop_name, part_code=part_code, part_name=part_name, unit=unit, cost_price=Decimal(str(cost_price)), current_storage_cnt=curr_storage, avg_sales_cnt=avg_sales, current_ratio=current_ratio, suggest_cnt=suggest_cnt, suggest_amount=suggest_amount, suggestion_reason=reason, priority=shop_priority, confidence=confidence, ) suggestions.append(suggestion) except json.JSONDecodeError as e: logger.error(f"解析配件 {part_code} 分析结果失败: {e}") part_result.part_decision_reason = f"分析失败: {str(e)}" for shop_data in shop_data_list: suggestions.append(ReplenishmentSuggestion( shop_id=int(shop_data.get("shop_id", 0)), shop_name=shop_data.get("shop_name", ""), part_code=part_code, part_name=part_name, unit=unit, cost_price=Decimal(str(cost_price)), current_storage_cnt=Decimal(str(shop_data.get("valid_storage_cnt", 0))), avg_sales_cnt=Decimal(str(shop_data.get("avg_sales_cnt", 0))), current_ratio=Decimal("0"), suggest_cnt=0, suggest_amount=Decimal("0"), suggestion_reason=f"分析失败: {str(e)}", priority=3, confidence=0.0, )) except Exception as e: logger.error(f"处理配件 {part_code} 分析结果异常: {e}") part_result.suggestions = suggestions return part_result, suggestions