Celery任务队列基本笔记

场景

Web服务中的某些请求可能耗时颇长,并且我们难以控制响应时间时间,比如我们调用了第三方的服务(如电子邮件服务、文档解析服务等),我们无法加快第三方服务的返回同时也不能简单地跳过/忽略这些调用。

这种需要执行耗时不确定的操作的场景,我们可以将任务添加到消息/任务队列中,并立即响应用户,由另外的“消费者”进程/线程去处理耗时任务。

可能的场景:

  • 发送电子邮件、短信通知

  • 图像、视频处理

  • 生成报告

  • 调用机器学习模型

Celery

特性

Python开发社区中最受欢迎的任务队列框架为Celery,其具有很多优点:

  • 高效率:官方介绍,Celery单进程可在一分钟内处理百万级的任务

  • 分布式执行:Celery支持将任务分发到多个计算机上,以实现分布式执行,从而提高处理能力和性能

  • 生产者-消费者模型:Celery采用典型的生产者和消费者模型。生产者发送任务到消息队列,消费者从消息队列中取任务进行执行。

  • 异步任务处理:通过将任务放入队列中,Celery可以实现任务的异步执行,避免阻塞主应用程序的进程

  • 支持计划执行任务(替代crontab)

架构

  1. 任务(Task):需要执行的任务,通常异步执行。支持定时任务(Periodic Task):定时执行的任务,可以间隔执行,支持crontab表达式。

  2. 消息中间件(Broker):用于存储和传递任务消息。常用的中间件有RabbitMQ、Redis等。

  3. 任务执行单元/工作节点(Worker):负责从消息队列中获取任务并执行它们。通常使用多线程技术如Eventlet、gevent等来提高并发性能。

  4. 任务结果存储(Task Result Store):用于存储任务执行的结果,确保任务结果可以被查询和回调。可选Redis/RebbitMQ/MySQL/……

  5. 任务(Task)

  6. 定时任务调度(Beat):用于触发定时任务。从配置文件或数据库读取定时任务配置,定期将待执行任务写入消息中间件。

概念

任务、任务池

  • @app.task

  • @shared_task

  • 任务池(TaskPool)

定时任务

Celery Beat进程管理和调度定时任务。

Periodic Tasks — Celery 5.4.0 documentation

Scheduler

  • django admin后台添加定时任务

  • 系统启动时自动注册定时任务

  • 运行时动态添加

  • ……

Q:任务有多少种状态?

Tasks — Celery 5.4.0 documentation

  • PENDING

  • STARTED

  • SUCCESS

  • FAILURE

  • RETRY

  • REVOKE

Q:worker如何消费消息中间件的任务?

轮询还是订阅?

标记状态为START 任务完成ACK

Q:如何保证任务不会被多个worker重复执行?

高并发场景下,可能出现同一个任务被多个worker重复消费的问题,目前官方未提供任务防重复执行和幂等性保障的特性。可以自行实现或使用第三方类库完成任务去重。

  • celery_once

GitHub - cameronmaske/celery-once: Celery Once allows you to prevent multiple execution and queuing of celery tasks.

通过在Redis对任务按名称和参数加锁防止任务被重复执行

Q:任务执行失败怎么办?如何重新执行任务?

Q:不同任务优先级不同如何应对?

  • 队列优先级消费

  • 任务路由绑定,不同worker处理不同队列的任务

Celery中文文档

使用前置条件

  • 选择消息中间件:Redis/RebbitMQ/…。celery进程订阅消息中间件后即可消费任务队列中的任务。

  • 选择结果中间件,用于保存任务执行结果

Django中使用Celery的简单方法

  1. 安装celery库

    Django中使用celery无需安装额外的库

  2. 在Django的配置文件中配置Celery相关参数(如使用的消息中间件、结果存储中间件、用户密码、工作线程数、任务脚本搜索路径等)

  3. 编写Celery应用初始化脚本(读取配置、实例化Celery app对象)

  4. 启动Celery Worker进程:celery -A CELERY_APP worker -L INFO

    注:若是windows环境,启动时需要加参数-P eventlet

简单示例如下:

celery配置:

# django settings.py
#broker(消息中间件来接收和发送任务消息)
CELERY_BROKER_URL = 'redis://localhost:6379/1'
#backend(存储worker执行的结果)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

#设置时间参照,不设置默认使用的UTC时间
CELERY_TIMEZONE = 'Asia/Shanghai'
#指定任务的序列化
CELERY_TASK_SERIALIZER='json'
#指定执行结果的序列化
CELERY_RESULT_SERIALIZER='json'

CELERY_WORKER_CONCURRENCY = 2

celery初始化脚本

import os
import celery
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')
django.setup()
app = celery.Celery('demo', include=[
    'service.tasks'

])

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

CELERY环境变量

BROKER_URL
CELERY_RESULT_BACKEND

Celery任务监控:Flower

Flower是一个Web版本的celery任务队列监控工具。

安装:

pip install flower

启动命令:

celery -A celery_app flower --url_prefix=flower --basic-auth=USERNAME:PASSWORD --port=5555 --loglevel=info

如果需要任务结果持久化存储,启动时添加参数persistant

Celery连接Redis哨兵

(暂略)

文档

CoolCats
CoolCats
理学学士

我的研究兴趣是时空数据分析、知识图谱、自然语言处理与服务端开发