tasks.py
3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
定时任务
使用 APScheduler 实现每日凌晨执行
"""
import logging
import argparse
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from ..config import get_settings
from ..agent import ReplenishmentAgent
logger = logging.getLogger(__name__)
def run_replenishment_task():
"""执行补货建议任务"""
logger.info("="*50)
logger.info("开始执行 AI 补货建议定时任务")
logger.info("="*50)
try:
agent = ReplenishmentAgent()
# 默认配置 - 可从数据库读取
group_id = 2
agent.run_for_all_groupings(
group_id=group_id,
)
logger.info("="*50)
logger.info("AI 补货建议定时任务执行完成")
logger.info("="*50)
except Exception as e:
logger.error(f"定时任务执行失败: {e}", exc_info=True)
raise
def start_scheduler():
"""启动定时调度"""
settings = get_settings()
scheduler = BlockingScheduler()
# 添加定时任务
trigger = CronTrigger(
hour=settings.scheduler_cron_hour,
minute=settings.scheduler_cron_minute,
)
scheduler.add_job(
run_replenishment_task,
trigger=trigger,
id="replenishment_task",
name="AI 补货建议任务",
replace_existing=True,
)
logger.info(
f"定时任务已配置: 每日 {settings.scheduler_cron_hour:02d}:{settings.scheduler_cron_minute:02d} 执行"
)
try:
logger.info("调度器启动...")
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.info("调度器停止")
scheduler.shutdown()
def main():
"""CLI 入口"""
parser = argparse.ArgumentParser(description="AI 补货建议定时任务")
parser.add_argument(
"--run-once",
action="store_true",
help="立即执行一次(不启动调度器)",
)
parser.add_argument(
"--group-id",
type=int,
default=2,
help="集团ID (默认: 2)",
)
parser.add_argument(
"--dealer-grouping-id",
type=int,
help="指定商家组合ID (可选)",
)
args = parser.parse_args()
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
if args.run_once:
logger.info("单次执行模式")
if args.dealer_grouping_id:
# 指定商家组合
from ..services import DataService
agent = ReplenishmentAgent()
data_service = DataService()
try:
groupings = data_service.get_dealer_groupings(args.group_id)
grouping = next((g for g in groupings if g["id"] == args.dealer_grouping_id), None)
if not grouping:
logger.error(f"未找到商家组合: {args.dealer_grouping_id}")
return
# 直接使用预计划数据,不需要 shop_ids
agent.run(
group_id=args.group_id,
dealer_grouping_id=grouping["id"],
dealer_grouping_name=grouping["name"],
)
finally:
data_service.close()
else:
# 所有商家组合
run_replenishment_task()
else:
start_scheduler()
if __name__ == "__main__":
main()