一文读懂Celery与RabbitMQ分布式任务调度系统
Celery结合RabbitMQ,可用于AI任务的异步处理、分布式计算、任务调度,广泛应用于模型推理、训练管理、数据预处理等场景。RabbitMQ 负责任务分发,Celery Worker 并行执行,提高系统吞吐量,支持负载均衡、任务重试、定时调度,确保AI任务高效稳定运行,适用于大规模计算和高并发请求处理。
一、Clerey的核心组件
Celery是一个异步任务队列/作业队列,基于分布式消息传递。它专注于实时操作,但也支持任务调度。作为一个任务队列,Celery的主要功能是将工作分配到多个工作节点,实现任务的异步执行。
Broker (消息中间件)
- 用于传递消息,接收并存储任务
- 常用的Broker有RabbitMQ和Redis
- RabbitMQ功能完善,适合生产环境
- Redis设置简单,适合开发和小型应用
Worker (工作节点)
- 执行任务的进程
- 可以在不同的机器上部署多个Worker
- 分配任务到多个Worker实现负载均衡
Backend (结果存储)
- 存储任务执行结果
- 常用的Backend也包括Redis和RabbitMQ
- 也可使用MySQL、MongoDB等数据库
Beat (定时任务)
- 调度定时任务的组件
- 类似于Linux的crontab
- 发送定时任务到Broker队列
工作流程
- 客户端通过API将任务发送给Broker
- Broker将任务存入队列
- Worker从Broker获取任务并执行
- 执行结果存入Backend
- 客户端可以从Backend获取结果
特点和优势
- 分布式执行:任务可在多台机器上分布执行
- 高可用性:单个Worker故障不影响整个系统
- 灵活性:支持多种Broker和Backend
- 扩展性:可以通过增加Worker动态扩展处理能力
- 任务监控:提供Flower等监控工具
- 重试机制:任务失败后可以自动重试
- 任务优先级:可以设置任务优先级
- 并发控制:可控制每个Worker的并发任务数
- 资源限制:可限制CPU和内存使用
常见应用场景
- 异步任务处理:发送邮件、图像处理等耗时操作
- 定时任务:定期数据备份、报表生成
- 大规模数据处理:数据清洗、ETL任务
- API限流:控制API调用频率
- 任务工作流:编排复杂的任务流程
二、celery常用消息队列组件
- Redis(常见,易部署,但不适合高吞吐任务)
- Amazon SQS
- RabbitMQ:Celery 默认推荐
- Kafka(支持 Celery,但不如 RabbitMQ 原生)
- 数据库(如 MySQL/PostgreSQL)(效率较低,不推荐)
功能 | Celery + RabbitMQ | Celery + Redis | Celery + Kafka |
---|---|---|---|
任务持久化 | ✅ 持久化存储 | ❌ 内存存储,丢失风险 | ✅ 可持久化 |
任务调度 | ✅ 高效调度 | ✅ 但吞吐低 | 🚫 需额外支持 |
适合高并发 | ✅ | 🚫 不适合高吞吐 | ✅ |
可靠性 | ✅ 消息确认机制 | ❌ 容易丢失任务 | ✅ |
结论: 一般生产环境使用 Celery + RabbitMQ 的搭配比较多
三、RabbitMQ 在 Celery 中的角色
RabbitMQ 充当 消息代理(Broker),主要用于:
- 任务队列:Celery 把任务发布到 RabbitMQ 的队列中,等待 worker 处理。
- 任务分发:RabbitMQ 负责将任务消息分发给 Celery worker 进程,实现异步任务执行。
- 任务重试:如果 worker 失败,RabbitMQ 可以重新投递任务,确保可靠性。
Celery 任务执行流程(RabbitMQ 作为 Broker)
- 任务发布:
- 客户端(producer)使用 Celery 发送一个任务(task)。
- 任务通过 Broker(RabbitMQ) 存入队列。
- 任务消费:
- Celery worker 订阅 RabbitMQ 队列,获取任务。
- worker 从 RabbitMQ 取出任务,并执行。
- 结果存储(可选):
- 任务执行结果可以存入 Redis、数据库等,方便查询。
使用 RabbitMQ 作为Broker优势:
1. 解耦
- 生产者和消费者不需要直接交互,而是通过 RabbitMQ 进行通信,降低了模块之间的耦合度,提高了系统的可维护性和可扩展性。
2. 异步处理 & 提高吞吐量
- 生产者可以快速将消息发送到队列,消费者可以异步处理,避免阻塞,提高系统整体吞吐能力。
3. 流量削峰 & 限流
- 在高并发场景下,RabbitMQ 可以作为缓冲层,避免数据库或下游系统被瞬时流量压垮。
4. 可靠性 & 持久化
- 通过 消息确认(ACK)、持久化存储、死信队列(DLX) 等机制,RabbitMQ 可以防止消息丢失,提高系统可靠性。
5. 支持多种消息路由模式
- 简单队列模式(点对点)
- 发布/订阅模式(广播)
- 路由模式(基于路由键分发)
- 主题模式(基于通配符匹配的订阅)
- RPC模式(远程调用)
6. 多语言支持
- 兼容多种编程语言(Python、Java、Go、C++ 等),易于集成到不同技术栈的系统中。
7. 分布式 & 高可用
- 支持 集群部署、主从复制、镜像队列,保证服务高可用。
8. 消息优先级 & TTL
- 可以设置
消息的优先级,让高优先级的消息先被消费。
- 可以设置 消息TTL(过期时间),避免无用的消息占用队列资源。
9. 插件生态丰富
- 支持 Web 管理界面、监控插件、延迟队列等功能,方便运维管理。
10. 轻量级 & 易部署
- RabbitMQ 基于 Erlang 开发,性能强劲,同时占用资源较少,易于部署和维护。
高吞吐、异步处理、消息可靠性,RabbitMQ 是一个不错的选择。当场景需要 超高吞吐(百万级QPS),Kafka 会更适合。
# 四、安装Celery、RabbitMQ
第0步:先安装celery
1 |
|
第一步:更新包列表
1 |
|
第二步:安装Erlang
RabbitMQ是基于Erlang语言开发的,所以首先需要安装Erlang: 添加Erlang仓库
1 |
|
更新包列表
1 |
|
安装Erlang
1 |
|
第三步:安装RabbitMQ
添加RabbitMQ仓库 1
2wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo add-apt-repository "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -cs) main"
更新包列表
1 |
|
安装RabbitMQ
1 |
|
第四步:启动rabbitMQ
1 |
|
查看队列状态
1 |
|
五、简单的RabbitMQ测试脚本
1 |
|
1 |
|
1 |
|
启动方式
确保已经启动RabbitMQ服务器
bash1启动Celery worker:
1
celery -A tasks worker --loglevel=info
bash2启动Celery beat (用于周期性任务):
1
celery -A tasks beat --loglevel=info
bash3运行测试脚本:
1
python test_tasks.py
一文读懂Celery与RabbitMQ分布式任务调度系统
https://linxkon.github.io/一文读懂Celery与RabbitMQ分布式任务调度系统.html