本帖最后由 阿秋361 于 2023-11-04 09:58 编辑
一、环境依赖系统:windows10 python: python==3.9.0 djnago==3.2.0 APScheduler==3.10.1 二、django中的配置1、创建utils包,在包里面创建schedulers包 utils/schedulers/task.py #1、设置 Django 环境,就可以导入项目的模型类这些了 import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "项目根目录名.settings") import django django.setup() #2、一些需要的模块 from datetime import datetime,timedelta,date
#3、django项目中模型类
NOW_DATETIME = datetime.strftime(datetime.now(),'%Y-%m-%d %H:%M:%S') NOW_DATE = date.today().strftime('%Y-%m-%d')
def example_interval(): ''' 每隔一段固定时间就执行一次 :return: ''' print('interval',NOW_DATETIME)
def example_cron(): ''' 在每天的固定时间执行 :return: ''' print('cron,凌晨开始执行的定时任务')
def example_date(): ''' 在指定日期执行一次,就执行一次 :return: ''' print(f'date,指定日期执行一次:{NOW_DATETIME}')
utils/schedulers/scheduler.py # 2、导入所需的调度器类和触发器类 from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.redis import RedisJobStore from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from pytz import timezone from threading import RLock from django.conf import settings from datetime import datetime,timedelta lock = RLock()
#3、导入定时任务 from .task import example_interval #例子,时间间隔,每隔一段时间执行 from .task import example_cron #指定时间执行,在指定时间点执行 from .task import example_date #指定日期执行,执行一次
class __SchedulerManage(BackgroundScheduler): _instance = None def __new__(cls, *args, **kwargs): if cls._instance: return cls._instance with lock: if cls._instance: return cls._instance cls._instance = super().__new__(cls) return cls._instance
def __init__(self): super().__init__() # 1、设置时区 self.timezone = timezone(settings.TIME_ZONE) # 2、使用内存存储定时任务信息 jobstore_redis = RedisJobStore(host='localhost', port=6379, db=0, password='redis密码') jobstore_memory = MemoryJobStore() self.add_jobstore(jobstore_memory) # 3、添加任务 self.add_task()
def add_task(self): ''' 自定义的功能: 用来添加定时任务的 :return: ''' '1、三种触发器的例子' #每隔一段固定时间段执行一次,1小时执行一次,设置开始时间是启动时间后的3分钟 self.add_job(example_interval, trigger=IntervalTrigger(hours=1,start_date=datetime.now()+timedelta(minutes=3)), id='example_interval', replace_existing=True) #设置每天的11:03:10 执行一次 self.add_job(example_cron,trigger=CronTrigger(hour=11,minute=3,second=10),id='example_cron',replace_existing=True) #设置在2023-08-10 11:03:01执行一次,只执行一次 self.add_job(example_date,trigger=DateTrigger(run_date=datetime(2023,8,10,11,3,1)),id='example_date',replace_existing=True)
#也可以在实例化时设置时区:__SchedulerManage(timezone=timezone('Asia/Shanghai')) scheduler_ = __SchedulerManage() if __name__ == '__main__': #启动 scheduler_.start() 或者 scheduler_() 两种方式都ok scheduler_()
utils/schedulers/__init__.py
2、项目配置文件settings.py ####配置定时任务 #启动定时任务 from utils.schedulers import scheduler_ scheduler_.start()
注:若转载请注明大神论坛来源(本贴地址)与作者信息。
|