tasks.py 3.59 KB
"""
定时任务
使用 APScheduler 实现每日凌晨执行
"""

import logging
import argparse
from datetime import date

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

from ..config import get_settings
from ..agent import ReplenishmentAgent

logger = logging.getLogger(__name__)


def run_replenishment_task():
    """执行补货建议任务"""
    logger.info("="*50)
    logger.info("开始执行 AI 补货建议定时任务")
    logger.info("="*50)
    
    try:
        agent = ReplenishmentAgent()
        
        # 默认配置 - 可从数据库读取
        group_id = 2
        
        agent.run_for_all_groupings(
            group_id=group_id,
        )
        
        logger.info("="*50)
        logger.info("AI 补货建议定时任务执行完成")
        logger.info("="*50)
        
    except Exception as e:
        logger.error(f"定时任务执行失败: {e}", exc_info=True)
        raise


def start_scheduler():
    """启动定时调度"""
    settings = get_settings()
    
    scheduler = BlockingScheduler()
    
    # 添加定时任务
    trigger = CronTrigger(
        hour=settings.scheduler_cron_hour,
        minute=settings.scheduler_cron_minute,
    )
    
    scheduler.add_job(
        run_replenishment_task,
        trigger=trigger,
        id="replenishment_task",
        name="AI 补货建议任务",
        replace_existing=True,
    )
    
    logger.info(
        f"定时任务已配置: 每日 {settings.scheduler_cron_hour:02d}:{settings.scheduler_cron_minute:02d} 执行"
    )
    
    try:
        logger.info("调度器启动...")
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        logger.info("调度器停止")
        scheduler.shutdown()


def main():
    """CLI 入口"""
    parser = argparse.ArgumentParser(description="AI 补货建议定时任务")
    parser.add_argument(
        "--run-once",
        action="store_true",
        help="立即执行一次(不启动调度器)",
    )
    parser.add_argument(
        "--group-id",
        type=int,
        default=2,
        help="集团ID (默认: 2)",
    )
    parser.add_argument(
        "--dealer-grouping-id",
        type=int,
        help="指定商家组合ID (可选)",
    )
    
    args = parser.parse_args()
    
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    )
    
    if args.run_once:
        logger.info("单次执行模式")
        
        if args.dealer_grouping_id:
            # 指定商家组合
            from ..services import DataService
            
            agent = ReplenishmentAgent()
            data_service = DataService()
            
            try:
                groupings = data_service.get_dealer_groupings(args.group_id)
                grouping = next((g for g in groupings if g["id"] == args.dealer_grouping_id), None)
                
                if not grouping:
                    logger.error(f"未找到商家组合: {args.dealer_grouping_id}")
                    return
                
                # 直接使用预计划数据,不需要 shop_ids
                agent.run(
                    group_id=args.group_id,
                    dealer_grouping_id=grouping["id"],
                    dealer_grouping_name=grouping["name"],
                )
                
            finally:
                data_service.close()
        else:
            # 所有商家组合
            run_replenishment_task()
    else:
        start_scheduler()


if __name__ == "__main__":
    main()