說明#
Celery 是一個分佈式任務隊列,Celery 任務有自動重試的功能,本文介紹了如何將達到最大重試次數後仍然失敗的任務消息路由到 RabbitMQ 的死信隊列。
創建死信隊列#
一般一個項目配置一個死信隊列的 Exchange 就可以了,然後再根據業務配置多個死信隊列綁定到死信 Exchange。
以下是手動創建普通隊列和死信隊列的 Python 代碼:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 定義死信隊列
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# 綁定 DLQ 到一個交換機
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')
# 定義主隊列與 DLQ 設置
arguments = {
'x-dead-letter-exchange': 'dead_letter_exchange',
'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')
或者可以將 Exchange、Queue、Routing Key 傳給 Celery,它會也會自動創建(默認行為,可以通過配置取消自動創建):
# 隊列列表
task_queues = [
# 死信隊列在被創建後,需要從 task_queues 中移除或註釋,否則後續投遞到死信隊列中的消息又會被 celery 消費
Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
queue_arguments={
# 聲明當前隊列綁定的死信交換機
'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
# 聲明當前隊列的死信路由key
'x-dead-letter-routing-key': 'task.retry.deadletter'
}
)
]
celery_app = Celery(
'demo',
broker=current_config.RABBITMQ_URL,
backend=current_config.REDIS_URL
)
celery_app.conf.update(
task_queues=task_queues,
)
需要注意的是,在死信隊列創建完畢之後,需要從 task_queues
陣列中移除 Queue(QUEUE_RETRY_TASK_DEADLETTER..)
關於死信隊列的配置,否則在 Celery 會消費掉該死信隊列中的消息。
定義 Celery 任務#
@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
"""任務重試服務"""
print('------retry_task-start')
"""
retry的參數可以有:
exc:指定拋出的異常
throw:重試時是否通知worker是重試任務
eta:指定重試的時間/日期
countdown:在多久之後重試(每多少秒重試一次,默認3分鐘) max_retries:最大重試次數(默認3次)
"""
self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)
print('------retry_task-end')
需要注意的是,必須設置 acks_on_failure_or_timeout=False
,避免達到最大重試次數之後,Celery 自己報 MaxRetriesException 又自動確認消息,配置為 False
可避免自動確認,這樣消息最終會被路由到死信隊列中。