Celery任务队列基本笔记
场景
Web服务中的某些请求可能耗时颇长,并且我们难以控制响应时间时间,比如我们调用了第三方的服务(如电子邮件服务、文档解析服务等),我们无法加快第三方服务的返回同时也不能简单地跳过/忽略这些调用。
这种需要执行耗时不确定的操作的场景,我们可以将任务添加到消息/任务队列中,并立即响应用户,由另外的“消费者”进程/线程去处理耗时任务。
可能的场景:
发送电子邮件、短信通知
图像、视频处理
生成报告
调用机器学习模型
Celery
特性
Python开发社区中最受欢迎的任务队列框架为Celery,其具有很多优点:
高效率:官方介绍,Celery单进程可在一分钟内处理百万级的任务
分布式执行:Celery支持将任务分发到多个计算机上,以实现分布式执行,从而提高处理能力和性能
生产者-消费者模型:Celery采用典型的生产者和消费者模型。生产者发送任务到消息队列,消费者从消息队列中取任务进行执行。
异步任务处理:通过将任务放入队列中,Celery可以实现任务的异步执行,避免阻塞主应用程序的进程
支持计划执行任务(替代crontab)
架构
任务(Task):需要执行的任务,通常异步执行。支持定时任务(Periodic Task):定时执行的任务,可以间隔执行,支持crontab表达式。
消息中间件(Broker):用于存储和传递任务消息。常用的中间件有RabbitMQ、Redis等。
任务执行单元/工作节点(Worker):负责从消息队列中获取任务并执行它们。通常使用多线程技术如Eventlet、gevent等来提高并发性能。
任务结果存储(Task Result Store):用于存储任务执行的结果,确保任务结果可以被查询和回调。可选Redis/RebbitMQ/MySQL/……
任务(Task)
定时任务调度(Beat):用于触发定时任务。从配置文件或数据库读取定时任务配置,定期将待执行任务写入消息中间件。
概念
任务、任务池
@app.task
@shared_task
任务池(TaskPool)
定时任务
Celery Beat进程管理和调度定时任务。
Periodic Tasks — Celery 5.4.0 documentation
Scheduler
django admin后台添加定时任务
系统启动时自动注册定时任务
运行时动态添加
……
Q:任务有多少种状态?
Tasks — Celery 5.4.0 documentation
PENDING
STARTED
SUCCESS
FAILURE
RETRY
REVOKE
Q:worker如何消费消息中间件的任务?
轮询还是订阅?
标记状态为START 任务完成ACK
Q:如何保证任务不会被多个worker重复执行?
高并发场景下,可能出现同一个任务被多个worker重复消费的问题,目前官方未提供任务防重复执行和幂等性保障的特性。可以自行实现或使用第三方类库完成任务去重。
- celery_once
通过在Redis对任务按名称和参数加锁防止任务被重复执行
Q:任务执行失败怎么办?如何重新执行任务?
Q:不同任务优先级不同如何应对?
队列优先级消费
任务路由绑定,不同worker处理不同队列的任务
使用前置条件
选择消息中间件:Redis/RebbitMQ/…。celery进程订阅消息中间件后即可消费任务队列中的任务。
选择结果中间件,用于保存任务执行结果
Django中使用Celery的简单方法
安装celery库
Django中使用celery无需安装额外的库
在Django的配置文件中配置Celery相关参数(如使用的消息中间件、结果存储中间件、用户密码、工作线程数、任务脚本搜索路径等)
编写Celery应用初始化脚本(读取配置、实例化Celery app对象)
启动Celery Worker进程:
celery -A CELERY_APP worker -L INFO
注:若是windows环境,启动时需要加参数-P eventlet
简单示例如下:
celery配置:
# django settings.py
#broker(消息中间件来接收和发送任务消息)
CELERY_BROKER_URL = 'redis://localhost:6379/1'
#backend(存储worker执行的结果)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
#设置时间参照,不设置默认使用的UTC时间
CELERY_TIMEZONE = 'Asia/Shanghai'
#指定任务的序列化
CELERY_TASK_SERIALIZER='json'
#指定执行结果的序列化
CELERY_RESULT_SERIALIZER='json'
CELERY_WORKER_CONCURRENCY = 2
celery初始化脚本
import os
import celery
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')
django.setup()
app = celery.Celery('demo', include=[
'service.tasks'
])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
CELERY环境变量
BROKER_URL
CELERY_RESULT_BACKEND
Celery任务监控:Flower
Flower是一个Web版本的celery任务队列监控工具。
安装:
pip install flower
启动命令:
celery -A celery_app flower --url_prefix=flower --basic-auth=USERNAME:PASSWORD --port=5555 --loglevel=info
如果需要任务结果持久化存储,启动时添加参数persistant
Celery连接Redis哨兵
(暂略)