""" 定时任务 使用 APScheduler 实现每日凌晨执行 """ import logging import argparse from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from ..config import get_settings from ..agent import ReplenishmentAgent logger = logging.getLogger(__name__) def run_replenishment_task(): """执行补货建议任务""" logger.info("="*50) logger.info("开始执行 AI 补货建议定时任务") logger.info("="*50) try: agent = ReplenishmentAgent() # 默认配置 - 可从数据库读取 group_id = 2 agent.run_for_all_groupings( group_id=group_id, ) logger.info("="*50) logger.info("AI 补货建议定时任务执行完成") logger.info("="*50) except Exception as e: logger.error(f"定时任务执行失败: {e}", exc_info=True) raise def start_scheduler(): """启动定时调度""" settings = get_settings() scheduler = BlockingScheduler() # 添加定时任务 trigger = CronTrigger( hour=settings.scheduler_cron_hour, minute=settings.scheduler_cron_minute, ) scheduler.add_job( run_replenishment_task, trigger=trigger, id="replenishment_task", name="AI 补货建议任务", replace_existing=True, ) logger.info( f"定时任务已配置: 每日 {settings.scheduler_cron_hour:02d}:{settings.scheduler_cron_minute:02d} 执行" ) try: logger.info("调度器启动...") scheduler.start() except (KeyboardInterrupt, SystemExit): logger.info("调度器停止") scheduler.shutdown() def main(): """CLI 入口""" parser = argparse.ArgumentParser(description="AI 补货建议定时任务") parser.add_argument( "--run-once", action="store_true", help="立即执行一次(不启动调度器)", ) parser.add_argument( "--group-id", type=int, default=2, help="集团ID (默认: 2)", ) parser.add_argument( "--dealer-grouping-id", type=int, help="指定商家组合ID (可选)", ) args = parser.parse_args() # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) if args.run_once: logger.info("单次执行模式") if args.dealer_grouping_id: # 指定商家组合 from ..services import DataService agent = ReplenishmentAgent() data_service = DataService() try: groupings = data_service.get_dealer_groupings(args.group_id) grouping = next((g for g in groupings if g["id"] == args.dealer_grouping_id), None) if not grouping: logger.error(f"未找到商家组合: {args.dealer_grouping_id}") return # 直接使用预计划数据,不需要 shop_ids agent.run( group_id=args.group_id, dealer_grouping_id=grouping["id"], dealer_grouping_name=grouping["name"], ) finally: data_service.close() else: # 所有商家组合 run_replenishment_task() else: start_scheduler() if __name__ == "__main__": main()