说明#
Celery 是一个分布式任务队列,Celery 任务有自动重试的功能,本文介绍了如何将达到最大重试次数后仍然失败的任务消息路由到 RabbitMQ 的死信队列。
创建死信队列#
一般一个项目配置一个死信队列的 Exchange 就可以了,然后再根据业务配置多个死信队列绑定到死信 Exchange。
以下是手动创建普通队列和死信队列的 Python 代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Define the dead-letter queue
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# Bind DLQ to an exchange
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')
# Define the main queue with DLQ settings
arguments = {
'x-dead-letter-exchange': 'dead_letter_exchange',
'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')
或者可以将 Exchange、Queue、Routing Key 传给 Celery,它会也会自动创建(默认行为,可以通过配置取消自动创建):
# 队列列表
task_queues = [
# 死信队列在被创建后,需要从 task_queues 中移除或注释,否则后续投递到死信队列中的消息又会被 celery 消费
Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
queue_arguments={
# 声明当前队列绑定的死信交换机
'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
# 声明当前队列的死信路由key
'x-dead-letter-routing-key': 'task.retry.deadletter'
}
)
]
celery_app = Celery(
'demo',
broker=current_config.RABBITMQ_URL,
backend=current_config.REDIS_URL
)
celery_app.conf.update(
task_queues=task_queues,
)
需要注意的是,在死信队列创建完毕之后,需要从 task_queues
数组中移除 Queue(QUEUE_RETRY_TASK_DEADLETTER..)
关于死信队列的配置,否则在 Celery 会消费掉该死信队列中的消息。
定义 Celery 任务#
@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
"""任务重试服务"""
print('------retry_task-start')
"""
retry的参数可以有:
exc:指定抛出的异常
throw:重试时是否通知worker是重试任务
eta:指定重试的时间/日期
countdown:在多久之后重试(每多少秒重试一次,默认3分钟) max_retries:最大重试次数(默认3次)
"""
self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)
print('------retry_task-end')
需要注意的是,必须设置 acks_on_failure_or_timeout=False
,避免达到最大重试次数之后,Celery 自己报 MaxRetriesException 又自动确认消息,配置为 False
可避免自动确认,这样消息最终会被路由到死信队列中。