summary_repo.py
3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
配件汇总数据访问层
提供 ai_replenishment_part_summary 表的 CRUD 操作
"""
import logging
from typing import List
from ..db import get_connection
from ...models import ReplenishmentPartSummary
logger = logging.getLogger(__name__)
class SummaryRepository:
"""配件汇总数据访问"""
def __init__(self, connection=None):
self._conn = connection
def _get_connection(self):
"""获取数据库连接"""
if self._conn is None or not self._conn.is_connected():
self._conn = get_connection()
return self._conn
def close(self):
"""关闭连接"""
if self._conn and self._conn.is_connected():
self._conn.close()
self._conn = None
def save_batch(self, summaries: List[ReplenishmentPartSummary]) -> int:
"""
批量保存配件汇总
Returns:
插入的行数
"""
if not summaries:
return 0
conn = self._get_connection()
cursor = conn.cursor()
try:
sql = """
INSERT INTO ai_replenishment_part_summary (
task_no, group_id, dealer_grouping_id, part_code, part_name,
unit, cost_price, total_storage_cnt, total_avg_sales_cnt,
group_current_ratio, total_suggest_cnt, total_suggest_amount,
shop_count, need_replenishment_shop_count, part_decision_reason,
priority, llm_confidence, statistics_date, create_time
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
"""
values = [
(
s.task_no, s.group_id, s.dealer_grouping_id, s.part_code, s.part_name,
s.unit, float(s.cost_price), float(s.total_storage_cnt),
float(s.total_avg_sales_cnt),
float(s.group_current_ratio) if s.group_current_ratio else None,
s.total_suggest_cnt, float(s.total_suggest_amount),
s.shop_count, s.need_replenishment_shop_count, s.part_decision_reason,
s.priority, s.llm_confidence, s.statistics_date,
)
for s in summaries
]
cursor.executemany(sql, values)
conn.commit()
logger.info(f"保存配件汇总: {cursor.rowcount}条")
return cursor.rowcount
finally:
cursor.close()
def delete_by_task_no(self, task_no: str) -> int:
"""
删除指定任务的配件汇总
Returns:
删除的行数
"""
conn = self._get_connection()
cursor = conn.cursor()
try:
sql = "DELETE FROM ai_replenishment_part_summary WHERE task_no = %s"
cursor.execute(sql, (task_no,))
conn.commit()
logger.info(f"删除配件汇总: task_no={task_no}, rows={cursor.rowcount}")
return cursor.rowcount
finally:
cursor.close()