data_service.py
2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
数据获取服务
从 MySQL 数据库查询库销比等数据
"""
import logging
from typing import List, Optional
from datetime import date
from decimal import Decimal
from .db import get_connection
from ..models import PartRatio
logger = logging.getLogger(__name__)
class DataService:
"""数据服务"""
def __init__(self):
self._conn = None
def _get_connection(self):
"""获取数据库连接"""
if self._conn is None or not self._conn.is_connected():
self._conn = get_connection()
return self._conn
def close(self):
"""关闭连接"""
if self._conn and self._conn.is_connected():
self._conn.close()
self._conn = None
def get_dealer_groupings(self, group_id: int) -> List[dict]:
"""
获取商家组合列表
Returns:
[{"id": 1, "name": "商家组合A", "dealer_ids": [1, 2, 3]}]
"""
conn = self._get_connection()
cursor = conn.cursor(dictionary=True)
try:
sql = """
SELECT id, region_name as name, auth_dealers
FROM artificial_region_dealer
WHERE group_id = %s
"""
cursor.execute(sql, [group_id])
rows = cursor.fetchall()
result = []
for row in rows:
dealer_ids = []
if row.get("auth_dealers"):
try:
import json
dealers = json.loads(row["auth_dealers"])
if isinstance(dealers, list):
if dealers and isinstance(dealers[0], dict):
dealer_ids = [d.get("dealerId") for d in dealers if d.get("dealerId")]
else:
dealer_ids = [int(d) for d in dealers if d]
except:
pass
result.append({
"id": row["id"],
"name": row["name"],
"dealer_ids": dealer_ids,
})
logger.info(f"查询商家组合: group_id={group_id}, count={len(result)}")
return result
finally:
cursor.close()