myesn

myEsn2E9

hi
github

Celery: 将失败任务路由到 RabbitMQ 的死信队列

说明#

Celery 是一个分布式任务队列,Celery 任务有自动重试的功能,本文介绍了如何将达到最大重试次数后仍然失败的任务消息路由到 RabbitMQ 的死信队列。

创建死信队列#

一般一个项目配置一个死信队列的 Exchange 就可以了,然后再根据业务配置多个死信队列绑定到死信 Exchange。

以下是手动创建普通队列和死信队列的 Python 代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Define the dead-letter queue
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# Bind DLQ to an exchange
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')

# Define the main queue with DLQ settings
arguments = {
  'x-dead-letter-exchange': 'dead_letter_exchange',
  'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')

或者可以将 Exchange、Queue、Routing Key 传给 Celery,它会也会自动创建(默认行为,可以通过配置取消自动创建):

# 队列列表
task_queues = [
    # 死信队列在被创建后,需要从 task_queues 中移除或注释,否则后续投递到死信队列中的消息又会被 celery 消费
    Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
    Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
          queue_arguments={
              # 声明当前队列绑定的死信交换机
              'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
              # 声明当前队列的死信路由key
              'x-dead-letter-routing-key': 'task.retry.deadletter'
          }
    )
]

celery_app = Celery(
    'demo',
    broker=current_config.RABBITMQ_URL,
    backend=current_config.REDIS_URL
)
celery_app.conf.update(
    task_queues=task_queues,
)

需要注意的是,在死信队列创建完毕之后,需要从 task_queues 数组中移除 Queue(QUEUE_RETRY_TASK_DEADLETTER..) 关于死信队列的配置,否则在 Celery 会消费掉该死信队列中的消息。

定义 Celery 任务#

@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
    """任务重试服务"""
    print('------retry_task-start')
    """
    retry的参数可以有: 
        exc:指定抛出的异常
        throw:重试时是否通知worker是重试任务
        eta:指定重试的时间/日期
        countdown:在多久之后重试(每多少秒重试一次,默认3分钟) max_retries:最大重试次数(默认3次)
    """
    self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)

    print('------retry_task-end')

需要注意的是,必须设置 acks_on_failure_or_timeout=False,避免达到最大重试次数之后,Celery 自己报 MaxRetriesException 又自动确认消息,配置为 False 可避免自动确认,这样消息最终会被路由到死信队列中。

参考#

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。