myesn

myEsn2E9

hi
github

Celery: 將失敗任務路由到 RabbitMQ 的死信隊列

說明#

Celery 是一個分佈式任務隊列,Celery 任務有自動重試的功能,本文介紹了如何將達到最大重試次數後仍然失敗的任務消息路由到 RabbitMQ 的死信隊列。

創建死信隊列#

一般一個項目配置一個死信隊列的 Exchange 就可以了,然後再根據業務配置多個死信隊列綁定到死信 Exchange。

以下是手動創建普通隊列和死信隊列的 Python 代碼:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 定義死信隊列
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# 綁定 DLQ 到一個交換機
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')

# 定義主隊列與 DLQ 設置
arguments = {
  'x-dead-letter-exchange': 'dead_letter_exchange',
  'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')

或者可以將 Exchange、Queue、Routing Key 傳給 Celery,它會也會自動創建(默認行為,可以通過配置取消自動創建):

# 隊列列表
task_queues = [
    # 死信隊列在被創建後,需要從 task_queues 中移除或註釋,否則後續投遞到死信隊列中的消息又會被 celery 消費
    Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
    Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
          queue_arguments={
              # 聲明當前隊列綁定的死信交換機
              'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
              # 聲明當前隊列的死信路由key
              'x-dead-letter-routing-key': 'task.retry.deadletter'
          }
    )
]

celery_app = Celery(
    'demo',
    broker=current_config.RABBITMQ_URL,
    backend=current_config.REDIS_URL
)
celery_app.conf.update(
    task_queues=task_queues,
)

需要注意的是,在死信隊列創建完畢之後,需要從 task_queues 陣列中移除 Queue(QUEUE_RETRY_TASK_DEADLETTER..) 關於死信隊列的配置,否則在 Celery 會消費掉該死信隊列中的消息。

定義 Celery 任務#

@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
    """任務重試服務"""
    print('------retry_task-start')
    """
    retry的參數可以有: 
        exc:指定拋出的異常
        throw:重試時是否通知worker是重試任務
        eta:指定重試的時間/日期
        countdown:在多久之後重試(每多少秒重試一次,默認3分鐘) max_retries:最大重試次數(默認3次)
    """
    self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)

    print('------retry_task-end')

需要注意的是,必須設置 acks_on_failure_or_timeout=False,避免達到最大重試次數之後,Celery 自己報 MaxRetriesException 又自動確認消息,配置為 False 可避免自動確認,這樣消息最終會被路由到死信隊列中。

參考#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。