""" 任务相关 API 路由 """ import logging from typing import Optional, List from datetime import datetime from fastapi import APIRouter, Query, HTTPException from pydantic import BaseModel from ...services.db import get_connection logger = logging.getLogger(__name__) router = APIRouter() class TaskResponse(BaseModel): """任务响应模型""" id: int task_no: str group_id: int dealer_grouping_id: int dealer_grouping_name: Optional[str] = None brand_grouping_id: Optional[int] = None plan_amount: float = 0 actual_amount: float = 0 part_count: int = 0 base_ratio: Optional[float] = None status: int = 0 status_text: str = "" error_message: Optional[str] = None llm_provider: Optional[str] = None llm_model: Optional[str] = None llm_total_tokens: int = 0 statistics_date: Optional[str] = None start_time: Optional[str] = None end_time: Optional[str] = None duration_seconds: Optional[int] = None create_time: Optional[str] = None class TaskListResponse(BaseModel): """任务列表响应""" total: int page: int page_size: int items: List[TaskResponse] class DetailResponse(BaseModel): """配件建议明细响应""" id: int task_no: str shop_id: int shop_name: Optional[str] = None part_code: str part_name: Optional[str] = None unit: Optional[str] = None cost_price: float = 0 current_ratio: Optional[float] = None base_ratio: Optional[float] = None post_plan_ratio: Optional[float] = None valid_storage_cnt: float = 0 avg_sales_cnt: float = 0 suggest_cnt: int = 0 suggest_amount: float = 0 suggestion_reason: Optional[str] = None priority: int = 2 llm_confidence: Optional[float] = None statistics_date: Optional[str] = None class DetailListResponse(BaseModel): """配件建议明细列表响应""" total: int page: int page_size: int items: List[DetailResponse] def format_datetime(dt) -> Optional[str]: """格式化日期时间""" if dt is None: return None if isinstance(dt, datetime): return dt.strftime("%Y-%m-%d %H:%M:%S") return str(dt) def get_status_text(status: int) -> str: """获取状态文本""" status_map = {0: "运行中", 1: "成功", 2: "失败"} return status_map.get(status, "未知") @router.get("/tasks", response_model=TaskListResponse) async def list_tasks( page: int = Query(1, ge=1, description="页码"), page_size: int = Query(20, ge=1, le=100, description="每页数量"), status: Optional[int] = Query(None, description="状态筛选: 0-运行中 1-成功 2-失败"), dealer_grouping_id: Optional[int] = Query(None, description="商家组合ID"), statistics_date: Optional[str] = Query(None, description="统计日期"), ): """获取任务列表""" conn = get_connection() cursor = conn.cursor(dictionary=True) try: # 构建查询条件 where_clauses = [] params = [] if status is not None: where_clauses.append("status = %s") params.append(status) if dealer_grouping_id is not None: where_clauses.append("dealer_grouping_id = %s") params.append(dealer_grouping_id) if statistics_date: where_clauses.append("statistics_date = %s") params.append(statistics_date) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_replenishment_task WHERE {where_sql}" cursor.execute(count_sql, params) total = cursor.fetchone()["total"] # 查询分页数据 offset = (page - 1) * page_size data_sql = f""" SELECT * FROM ai_replenishment_task WHERE {where_sql} ORDER BY create_time DESC LIMIT %s OFFSET %s """ cursor.execute(data_sql, params + [page_size, offset]) rows = cursor.fetchall() items = [] for row in rows: # 计算执行时长 duration = None if row.get("start_time") and row.get("end_time"): duration = int((row["end_time"] - row["start_time"]).total_seconds()) items.append(TaskResponse( id=row["id"], task_no=row["task_no"], group_id=row["group_id"], dealer_grouping_id=row["dealer_grouping_id"], dealer_grouping_name=row.get("dealer_grouping_name"), brand_grouping_id=row.get("brand_grouping_id"), plan_amount=float(row.get("plan_amount") or 0), actual_amount=float(row.get("actual_amount") or 0), part_count=row.get("part_count") or 0, base_ratio=float(row["base_ratio"]) if row.get("base_ratio") else None, status=row.get("status") or 0, status_text=get_status_text(row.get("status") or 0), error_message=row.get("error_message"), llm_provider=row.get("llm_provider"), llm_model=row.get("llm_model"), llm_total_tokens=int(row.get("llm_total_tokens") or 0), statistics_date=row.get("statistics_date"), start_time=format_datetime(row.get("start_time")), end_time=format_datetime(row.get("end_time")), duration_seconds=duration, create_time=format_datetime(row.get("create_time")), )) return TaskListResponse( total=total, page=page, page_size=page_size, items=items, ) finally: cursor.close() conn.close() @router.get("/tasks/{task_no}", response_model=TaskResponse) async def get_task(task_no: str): """获取任务详情""" conn = get_connection() cursor = conn.cursor(dictionary=True) try: cursor.execute( """ SELECT * FROM ai_replenishment_task WHERE task_no = %s """, (task_no,) ) row = cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="任务不存在") duration = None if row.get("start_time") and row.get("end_time"): duration = int((row["end_time"] - row["start_time"]).total_seconds()) return TaskResponse( id=row["id"], task_no=row["task_no"], group_id=row["group_id"], dealer_grouping_id=row["dealer_grouping_id"], dealer_grouping_name=row.get("dealer_grouping_name"), brand_grouping_id=row.get("brand_grouping_id"), plan_amount=float(row.get("plan_amount") or 0), actual_amount=float(row.get("actual_amount") or 0), part_count=row.get("part_count") or 0, base_ratio=float(row["base_ratio"]) if row.get("base_ratio") else None, status=row.get("status") or 0, status_text=get_status_text(row.get("status") or 0), error_message=row.get("error_message"), llm_provider=row.get("llm_provider"), llm_model=row.get("llm_model"), llm_total_tokens=int(row.get("llm_total_tokens") or 0), statistics_date=row.get("statistics_date"), start_time=format_datetime(row.get("start_time")), end_time=format_datetime(row.get("end_time")), duration_seconds=duration, create_time=format_datetime(row.get("create_time")), ) finally: cursor.close() conn.close() @router.get("/tasks/{task_no}/details", response_model=DetailListResponse) async def get_task_details( task_no: str, page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), sort_by: str = Query("suggest_amount", description="排序字段"), sort_order: str = Query("desc", description="排序方向: asc/desc"), part_code: Optional[str] = Query(None, description="配件编码搜索"), ): """获取任务的配件建议明细""" conn = get_connection() cursor = conn.cursor(dictionary=True) try: # 验证排序字段 allowed_sort_fields = [ "suggest_amount", "suggest_cnt", "cost_price", "avg_sales_cnt", "current_ratio", "part_code" ] if sort_by not in allowed_sort_fields: sort_by = "suggest_amount" sort_direction = "DESC" if sort_order.lower() == "desc" else "ASC" # 构建查询条件 where_sql = "task_no = %s" params = [task_no] if part_code: where_sql += " AND part_code LIKE %s" params.append(f"%{part_code}%") # 查询总数 cursor.execute( f"SELECT COUNT(*) as total FROM ai_replenishment_detail WHERE {where_sql}", params ) total = cursor.fetchone()["total"] # 查询分页数据 offset = (page - 1) * page_size cursor.execute( f""" SELECT * FROM ai_replenishment_detail WHERE {where_sql} ORDER BY {sort_by} {sort_direction} LIMIT %s OFFSET %s """, params + [page_size, offset] ) rows = cursor.fetchall() items = [] for row in rows: items.append(DetailResponse( id=row["id"], task_no=row["task_no"], shop_id=row["shop_id"], shop_name=row.get("shop_name"), part_code=row["part_code"], part_name=row.get("part_name"), unit=row.get("unit"), cost_price=float(row.get("cost_price") or 0), current_ratio=float(row["current_ratio"]) if row.get("current_ratio") else None, base_ratio=float(row["base_ratio"]) if row.get("base_ratio") else None, post_plan_ratio=float(row["post_plan_ratio"]) if row.get("post_plan_ratio") else None, valid_storage_cnt=float(row.get("valid_storage_cnt") or 0), avg_sales_cnt=float(row.get("avg_sales_cnt") or 0), suggest_cnt=row.get("suggest_cnt") or 0, suggest_amount=float(row.get("suggest_amount") or 0), suggestion_reason=row.get("suggestion_reason"), priority=row.get("priority") or 2, llm_confidence=float(row["llm_confidence"]) if row.get("llm_confidence") else None, statistics_date=row.get("statistics_date"), )) return DetailListResponse( total=total, page=page, page_size=page_size, items=items, ) finally: cursor.close() conn.close() class PartSummaryResponse(BaseModel): """配件汇总响应""" id: int task_no: str part_code: str part_name: Optional[str] = None unit: Optional[str] = None cost_price: float = 0 total_storage_cnt: float = 0 total_avg_sales_cnt: float = 0 group_current_ratio: Optional[float] = None total_suggest_cnt: int = 0 total_suggest_amount: float = 0 shop_count: int = 0 need_replenishment_shop_count: int = 0 part_decision_reason: Optional[str] = None priority: int = 2 llm_confidence: Optional[float] = None statistics_date: Optional[str] = None group_post_plan_ratio: Optional[float] = None class PartSummaryListResponse(BaseModel): """配件汇总列表响应""" total: int page: int page_size: int items: List[PartSummaryResponse] @router.get("/tasks/{task_no}/part-summaries", response_model=PartSummaryListResponse) async def get_task_part_summaries( task_no: str, page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), sort_by: str = Query("total_suggest_amount", description="排序字段"), sort_order: str = Query("desc", description="排序方向: asc/desc"), part_code: Optional[str] = Query(None, description="配件编码筛选"), priority: Optional[int] = Query(None, description="优先级筛选"), ): """获取任务的配件汇总列表""" conn = get_connection() cursor = conn.cursor(dictionary=True) try: # 验证排序字段 allowed_sort_fields = [ "total_suggest_amount", "total_suggest_cnt", "cost_price", "total_avg_sales_cnt", "group_current_ratio", "part_code", "priority", "need_replenishment_shop_count", "total_storage_cnt", "shop_count", "group_post_plan_ratio" ] if sort_by not in allowed_sort_fields: sort_by = "total_suggest_amount" sort_direction = "DESC" if sort_order.lower() == "desc" else "ASC" # 构建查询条件 where_clauses = ["task_no = %s"] params = [task_no] if part_code: where_clauses.append("part_code LIKE %s") params.append(f"%{part_code}%") if priority is not None: where_clauses.append("priority = %s") params.append(priority) where_sql = " AND ".join(where_clauses) # 查询总数 cursor.execute( f"SELECT COUNT(*) as total FROM ai_replenishment_part_summary WHERE {where_sql}", params ) total = cursor.fetchone()["total"] # 查询分页数据 offset = (page - 1) * page_size # 动态计算计划后库销比: (库存 + 建议) / 月均销 query_sql = f""" SELECT *, ( (COALESCE(total_storage_cnt, 0) + COALESCE(total_suggest_cnt, 0)) / NULLIF(total_avg_sales_cnt, 0) ) as group_post_plan_ratio FROM ai_replenishment_part_summary WHERE {where_sql} ORDER BY {sort_by} {sort_direction} LIMIT %s OFFSET %s """ cursor.execute(query_sql, params + [page_size, offset]) rows = cursor.fetchall() items = [] for row in rows: items.append(PartSummaryResponse( id=row["id"], task_no=row["task_no"], part_code=row["part_code"], part_name=row.get("part_name"), unit=row.get("unit"), cost_price=float(row.get("cost_price") or 0), total_storage_cnt=float(row.get("total_storage_cnt") or 0), total_avg_sales_cnt=float(row.get("total_avg_sales_cnt") or 0), group_current_ratio=float(row["group_current_ratio"]) if row.get("group_current_ratio") else None, group_post_plan_ratio=float(row["group_post_plan_ratio"]) if row.get("group_post_plan_ratio") is not None else None, total_suggest_cnt=row.get("total_suggest_cnt") or 0, total_suggest_amount=float(row.get("total_suggest_amount") or 0), shop_count=row.get("shop_count") or 0, need_replenishment_shop_count=row.get("need_replenishment_shop_count") or 0, part_decision_reason=row.get("part_decision_reason"), priority=row.get("priority") or 2, llm_confidence=float(row["llm_confidence"]) if row.get("llm_confidence") else None, statistics_date=row.get("statistics_date"), )) return PartSummaryListResponse( total=total, page=page, page_size=page_size, items=items, ) finally: cursor.close() conn.close() @router.get("/tasks/{task_no}/parts/{part_code}/shops") async def get_part_shop_details( task_no: str, part_code: str, ): """获取指定配件的门店明细""" conn = get_connection() cursor = conn.cursor(dictionary=True) try: cursor.execute( """ SELECT * FROM ai_replenishment_detail WHERE task_no = %s AND part_code = %s ORDER BY suggest_amount DESC """, (task_no, part_code) ) rows = cursor.fetchall() items = [] for row in rows: items.append(DetailResponse( id=row["id"], task_no=row["task_no"], shop_id=row["shop_id"], shop_name=row.get("shop_name"), part_code=row["part_code"], part_name=row.get("part_name"), unit=row.get("unit"), cost_price=float(row.get("cost_price") or 0), current_ratio=float(row["current_ratio"]) if row.get("current_ratio") else None, base_ratio=float(row["base_ratio"]) if row.get("base_ratio") else None, post_plan_ratio=float(row["post_plan_ratio"]) if row.get("post_plan_ratio") else None, valid_storage_cnt=float(row.get("valid_storage_cnt") or 0), avg_sales_cnt=float(row.get("avg_sales_cnt") or 0), suggest_cnt=row.get("suggest_cnt") or 0, suggest_amount=float(row.get("suggest_amount") or 0), suggestion_reason=row.get("suggestion_reason"), priority=row.get("priority") or 2, llm_confidence=float(row["llm_confidence"]) if row.get("llm_confidence") else None, statistics_date=row.get("statistics_date"), )) return { "total": len(items), "items": items, } finally: cursor.close() conn.close()