replenishment.py
4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
补货建议触发接口
替代原定时任务,提供接口触发补货建议生成
"""
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from ...agent import ReplenishmentAgent
from ...services import DataService
logger = logging.getLogger(__name__)
router = APIRouter()
class InitRequest(BaseModel):
"""初始化请求"""
group_id: int
dealer_grouping_id: Optional[int] = None
class InitResponse(BaseModel):
"""初始化响应"""
success: bool
message: str
task_count: int = 0
@router.post("/replenishment/init", response_model=InitResponse)
async def init_replenishment(req: InitRequest):
"""
初始化补货建议
触发全量补货建议生成。
- 若指定 dealer_grouping_id,仅处理该商家组合
- 若未指定,处理 group_id 下所有商家组合
"""
try:
agent = ReplenishmentAgent()
if req.dealer_grouping_id:
data_service = DataService()
try:
groupings = data_service.get_dealer_groupings(req.group_id)
grouping = next(
(g for g in groupings if g["id"] == req.dealer_grouping_id),
None,
)
if not grouping:
raise HTTPException(
status_code=404,
detail=f"未找到商家组合: {req.dealer_grouping_id}",
)
agent.run(
group_id=req.group_id,
dealer_grouping_id=grouping["id"],
dealer_grouping_name=grouping["name"],
)
return InitResponse(
success=True,
message=f"商家组合 [{grouping['name']}] 补货建议生成完成",
task_count=1,
)
finally:
data_service.close()
else:
data_service = DataService()
try:
groupings = data_service.get_dealer_groupings(req.group_id)
finally:
data_service.close()
task_count = 0
for grouping in groupings:
try:
agent.run(
group_id=req.group_id,
dealer_grouping_id=grouping["id"],
dealer_grouping_name=grouping["name"],
)
task_count += 1
except Exception as e:
logger.error(
f"商家组合执行失败: {grouping['name']}, error={e}",
exc_info=True,
)
continue
return InitResponse(
success=True,
message=f"补货建议生成完成,共处理 {task_count}/{len(groupings)} 个商家组合",
task_count=task_count,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"补货建议初始化失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ============ 单种配件补货接口 ============
class SinglePartRequest(BaseModel):
"""单种配件补货请求"""
part_code: str
group_id: int
dealer_grouping_id: int
class SinglePartResponse(BaseModel):
"""单种配件补货响应"""
success: bool
message: str
queue_id: Optional[int] = None
@router.post("/replenishment/single-part", response_model=SinglePartResponse)
async def submit_single_part(req: SinglePartRequest):
"""
提交单种配件补货任务
异步处理,立即返回。通过 /replenishment/single-part/status 查询队列状态。
"""
try:
from ...services.single_part_queue import get_single_part_queue
queue = get_single_part_queue()
queue_id = await queue.submit(
part_code=req.part_code,
group_id=req.group_id,
dealer_grouping_id=req.dealer_grouping_id,
)
return SinglePartResponse(
success=True,
message=f"已提交单种配件补货任务: {req.part_code}",
queue_id=queue_id,
)
except Exception as e:
logger.error(f"单配件补货任务提交失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/replenishment/single-part/status")
async def get_single_part_status():
"""查询单种配件补货队列状态"""
try:
from ...services.single_part_queue import get_single_part_queue
queue = get_single_part_queue()
return queue.get_status()
except Exception as e:
logger.error(f"查询队列状态失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))