detail_repo.py
3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
补货明细数据访问层
提供 ai_replenishment_detail 表的 CRUD 操作
"""
import logging
from typing import List
from ..db import get_connection
from ...models import ReplenishmentDetail
logger = logging.getLogger(__name__)
class DetailRepository:
"""补货明细数据访问"""
def __init__(self, connection=None):
self._conn = connection
def _get_connection(self):
"""获取数据库连接"""
if self._conn is None or not self._conn.is_connected():
self._conn = get_connection()
return self._conn
def close(self):
"""关闭连接"""
if self._conn and self._conn.is_connected():
self._conn.close()
self._conn = None
def save_batch(self, details: List[ReplenishmentDetail]) -> int:
"""
批量保存补货明细
Returns:
插入的行数
"""
if not details:
return 0
conn = self._get_connection()
cursor = conn.cursor()
try:
sql = """
INSERT INTO ai_replenishment_detail (
task_no, group_id, dealer_grouping_id, brand_grouping_id,
shop_id, shop_name, part_code, part_name, unit, cost_price,
current_ratio, base_ratio, post_plan_ratio,
valid_storage_cnt, avg_sales_cnt, suggest_cnt, suggest_amount,
suggestion_reason, priority, llm_confidence, statistics_date, create_time
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
"""
values = [
(
d.task_no, d.group_id, d.dealer_grouping_id, d.brand_grouping_id,
d.shop_id, d.shop_name, d.part_code, d.part_name, d.unit,
float(d.cost_price),
float(d.current_ratio) if d.current_ratio else None,
float(d.base_ratio) if d.base_ratio else None,
float(d.post_plan_ratio) if d.post_plan_ratio else None,
float(d.valid_storage_cnt), float(d.avg_sales_cnt),
d.suggest_cnt, float(d.suggest_amount),
d.suggestion_reason, d.priority, d.llm_confidence, d.statistics_date,
)
for d in details
]
cursor.executemany(sql, values)
conn.commit()
logger.info(f"保存补货明细: {cursor.rowcount}条")
return cursor.rowcount
finally:
cursor.close()
def delete_by_task_no(self, task_no: str) -> int:
"""
删除指定任务的补货明细
Returns:
删除的行数
"""
conn = self._get_connection()
cursor = conn.cursor()
try:
sql = "DELETE FROM ai_replenishment_detail WHERE task_no = %s"
cursor.execute(sql, (task_no,))
conn.commit()
logger.info(f"删除补货明细: task_no={task_no}, rows={cursor.rowcount}")
return cursor.rowcount
finally:
cursor.close()
def find_by_task_no(self, task_no: str) -> List[ReplenishmentDetail]:
"""根据 task_no 查询补货明细"""
conn = self._get_connection()
cursor = conn.cursor(dictionary=True)
try:
sql = "SELECT * FROM ai_replenishment_detail WHERE task_no = %s"
cursor.execute(sql, (task_no,))
rows = cursor.fetchall()
return [ReplenishmentDetail(**row) for row in rows]
finally:
cursor.close()