一文读懂Celery与RabbitMQ分布式任务调度系统

Celery结合RabbitMQ,可用于AI任务的异步处理、分布式计算、任务调度,广泛应用于模型推理、训练管理、数据预处理等场景。RabbitMQ 负责任务分发,Celery Worker 并行执行,提高系统吞吐量,支持负载均衡、任务重试、定时调度,确保AI任务高效稳定运行,适用于大规模计算和高并发请求处理。

一、Clerey的核心组件

Celery是一个异步任务队列/作业队列,基于分布式消息传递。它专注于实时操作,但也支持任务调度。作为一个任务队列,Celery的主要功能是将工作分配到多个工作节点,实现任务的异步执行。

  1. Broker (消息中间件)

    • 用于传递消息,接收并存储任务
    • 常用的Broker有RabbitMQ和Redis
    • RabbitMQ功能完善,适合生产环境
    • Redis设置简单,适合开发和小型应用
  2. Worker (工作节点)

    • 执行任务的进程
    • 可以在不同的机器上部署多个Worker
    • 分配任务到多个Worker实现负载均衡
  3. Backend (结果存储)

    • 存储任务执行结果
    • 常用的Backend也包括Redis和RabbitMQ
    • 也可使用MySQL、MongoDB等数据库
  4. Beat (定时任务)

    • 调度定时任务的组件
    • 类似于Linux的crontab
    • 发送定时任务到Broker队列

工作流程

  1. 客户端通过API将任务发送给Broker
  2. Broker将任务存入队列
  3. Worker从Broker获取任务并执行
  4. 执行结果存入Backend
  5. 客户端可以从Backend获取结果

特点和优势

  1. 分布式执行:任务可在多台机器上分布执行
  2. 高可用性:单个Worker故障不影响整个系统
  3. 灵活性:支持多种Broker和Backend
  4. 扩展性:可以通过增加Worker动态扩展处理能力
  5. 任务监控:提供Flower等监控工具
  6. 重试机制:任务失败后可以自动重试
  7. 任务优先级:可以设置任务优先级
  8. 并发控制:可控制每个Worker的并发任务数
  9. 资源限制:可限制CPU和内存使用

常见应用场景

  1. 异步任务处理:发送邮件、图像处理等耗时操作
  2. 定时任务:定期数据备份、报表生成
  3. 大规模数据处理:数据清洗、ETL任务
  4. API限流:控制API调用频率
  5. 任务工作流:编排复杂的任务流程

二、celery常用消息队列组件

  • Redis(常见,易部署,但不适合高吞吐任务)
  • Amazon SQS
  • RabbitMQ:Celery 默认推荐
  • Kafka(支持 Celery,但不如 RabbitMQ 原生)
  • 数据库(如 MySQL/PostgreSQL)(效率较低,不推荐)
功能 Celery + RabbitMQ Celery + Redis Celery + Kafka
任务持久化 ✅ 持久化存储 ❌ 内存存储,丢失风险 ✅ 可持久化
任务调度 ✅ 高效调度 ✅ 但吞吐低 🚫 需额外支持
适合高并发 🚫 不适合高吞吐
可靠性 ✅ 消息确认机制 ❌ 容易丢失任务

结论: 一般生产环境使用 Celery + RabbitMQ 的搭配比较多

三、RabbitMQ Celery 中的角色

RabbitMQ 充当 消息代理(Broker),主要用于:

  • 任务队列:Celery 把任务发布到 RabbitMQ 的队列中,等待 worker 处理。
  • 任务分发:RabbitMQ 负责将任务消息分发给 Celery worker 进程,实现异步任务执行。
  • 任务重试:如果 worker 失败,RabbitMQ 可以重新投递任务,确保可靠性。

Celery 任务执行流程(RabbitMQ 作为 Broker)

  1. 任务发布
    • 客户端(producer)使用 Celery 发送一个任务(task)。
    • 任务通过 Broker(RabbitMQ) 存入队列。
  2. 任务消费
    • Celery worker 订阅 RabbitMQ 队列,获取任务。
    • worker 从 RabbitMQ 取出任务,并执行。
  3. 结果存储(可选)
    • 任务执行结果可以存入 Redis、数据库等,方便查询。

使用 RabbitMQ 作为Broker优势:

1. 解耦

  • 生产者和消费者不需要直接交互,而是通过 RabbitMQ 进行通信,降低了模块之间的耦合度,提高了系统的可维护性和可扩展性。

2. 异步处理 & 提高吞吐量

  • 生产者可以快速将消息发送到队列,消费者可以异步处理,避免阻塞,提高系统整体吞吐能力。

3. 流量削峰 & 限流

  • 在高并发场景下,RabbitMQ 可以作为缓冲层,避免数据库或下游系统被瞬时流量压垮。

4. 可靠性 & 持久化

  • 通过 消息确认(ACK)、持久化存储、死信队列(DLX) 等机制,RabbitMQ 可以防止消息丢失,提高系统可靠性。

5. 支持多种消息路由模式

  • 简单队列模式(点对点)
  • 发布/订阅模式(广播)
  • 路由模式(基于路由键分发)
  • 主题模式(基于通配符匹配的订阅)
  • RPC模式(远程调用)

6. 多语言支持

  • 兼容多种编程语言(Python、Java、Go、C++ 等),易于集成到不同技术栈的系统中。

7. 分布式 & 高可用

  • 支持 集群部署、主从复制、镜像队列,保证服务高可用。

8. 消息优先级 & TTL

  • 可以设置 消息的优先级,让高优先级的消息先被消费。
  • 可以设置 消息TTL(过期时间),避免无用的消息占用队列资源。

9. 插件生态丰富

  • 支持 Web 管理界面、监控插件、延迟队列等功能,方便运维管理。

10. 轻量级 & 易部署

  • RabbitMQ 基于 Erlang 开发,性能强劲,同时占用资源较少,易于部署和维护。

高吞吐、异步处理、消息可靠性,RabbitMQ 是一个不错的选择。当场景需要 超高吞吐(百万级QPS),Kafka 会更适合。

# 四、安装Celery、RabbitMQ

第0步:先安装celery

1
pip install celery

第一步:更新包列表

1
2
sudo apt-get update
sudo apt-get upgrade

第二步:安装Erlang

RabbitMQ是基于Erlang语言开发的,所以首先需要安装Erlang: 添加Erlang仓库

1
2
3
sudo apt-get install software-properties-common apt-transport-https
wget -O- https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | sudo apt-key add -
sudo add-apt-repository "deb https://packages.erlang-solutions.com/ubuntu $(lsb_release -cs) contrib"

更新包列表

1
sudo apt-get update

安装Erlang

1
sudo apt-get install erlang

第三步:安装RabbitMQ

添加RabbitMQ仓库

1
2
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo add-apt-repository "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -cs) main"

更新包列表

1
sudo apt-get update

安装RabbitMQ

1
sudo apt-get install rabbitmq-server

第四步:启动rabbitMQ

1
2
3
4
5
# 启动服务
sudo systemctl start rabbitmq-server

# 设置开机自启
sudo systemctl enable rabbitmq-server

查看队列状态

1
2
sudo systemctl status rabbitmq-server
sudo rabbitmqctl list_queues

五、简单的RabbitMQ测试脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# celeryconfig.py - Celery配置文件
from datetime import timedelta

# 时区设置
timezone = 'Asia/Shanghai'

# 任务序列化格式
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'

# 定义周期性任务
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16),
},
'multiply-every-minute': {
'task': 'tasks.multiply',
'schedule': timedelta(minutes=1),
'args': (4, 5),
},
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# tasks.py - 任务定义文件
from celery import Celery

# 创建Celery实例,指定消息代理(RabbitMQ)和结果后端
app = Celery('tasks',
broker='amqp://guest:guest@localhost:5672//',
backend='rpc://')

# 任务装饰器,将函数注册为Celery任务
@app.task
def add(x, y):
return x + y

@app.task
def multiply(x, y):
return x * y

# 带有重试机制的任务示例
@app.task(bind=True, max_retries=3, default_retry_delay=5)
def fetch_data(self, url):
try:
# 这里应该是获取数据的实际代码
# 为了示例,我们模拟一个成功的返回
return f"Data fetched from {url}"
except Exception as exc:
# 发生错误时重试
self.retry(exc=exc)

# 周期性任务 - 在celery beat配置中定义
# 需要在celeryconfig.py中配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# test_tasks.py - 测试任务的脚本
from tasks import add, multiply, fetch_data
import time

def test_simple_tasks():
# 异步调用任务
print("启动任务...")

# 调用add任务
result1 = add.delay(4, 4)
# 调用multiply任务
result2 = multiply.delay(5, 6)
# 调用fetch_data任务
result3 = fetch_data.delay("https://example.com")

# 等待任务完成并获取结果
print("等待任务结果...")

# 获取add任务结果
while not result1.ready():
time.sleep(0.1)
print(f"Add结果: {result1.get()}")

# 获取multiply任务结果
while not result2.ready():
time.sleep(0.1)
print(f"Multiply结果: {result2.get()}")

# 获取fetch_data任务结果
while not result3.ready():
time.sleep(0.1)
print(f"Fetch data结果: {result3.get()}")

def test_scheduled_task():
# 这个函数演示如何手动测试计划任务
# 实际上,计划任务是由celery beat自动调度的
print("模拟计划任务...")
result = add.apply_async(args=[10, 20])

# 等待任务完成
print("等待计划任务结果...")
while not result.ready():
time.sleep(0.1)
print(f"计划任务结果: {result.get()}")

if __name__ == "__main__":
# 测试简单任务
test_simple_tasks()

# 测试计划任务
test_scheduled_task()

启动方式

  • 确保已经启动RabbitMQ服务器

  • bash1启动Celery worker:

    1
    celery -A tasks worker --loglevel=info
  • bash2启动Celery beat (用于周期性任务):

    1
    celery -A tasks beat --loglevel=info
  • bash3运行测试脚本:

    1
    python test_tasks.py

一文读懂Celery与RabbitMQ分布式任务调度系统
https://linxkon.github.io/一文读懂Celery与RabbitMQ分布式任务调度系统.html
作者
linxkon
发布于
2025年3月1日
许可协议