説明#
Celery は分散タスクキューで、Celery タスクには自動再試行機能があります。本記事では、最大再試行回数に達した後も失敗したタスクメッセージを RabbitMQ の死信キューにルーティングする方法を紹介します。
死信キューの作成#
一般的に、プロジェクトには 1 つの死信キューのエクスチェンジを設定すればよく、その後、ビジネスに応じて複数の死信キューを死信エクスチェンジにバインドします。
以下は、通常のキューと死信キューを手動で作成するための Python コードです:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 死信キューを定義
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# DLQをエクスチェンジにバインド
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')
# DLQ設定を持つメインキューを定義
arguments = {
'x-dead-letter-exchange': 'dead_letter_exchange',
'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')
または、エクスチェンジ、キュー、ルーティングキーを Celery に渡すこともでき、Celery が自動的に作成します(デフォルトの動作で、設定によって自動作成を無効にできます):
# キューリスト
task_queues = [
# 死信キューが作成された後は、task_queues から削除またはコメントアウトする必要があります。そうしないと、後続の死信キューへのメッセージが再び Celery に消費されます。
Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
queue_arguments={
# 現在のキューがバインドされている死信エクスチェンジを宣言
'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
# 現在のキューの死信ルーティングキーを宣言
'x-dead-letter-routing-key': 'task.retry.deadletter'
}
)
]
celery_app = Celery(
'demo',
broker=current_config.RABBITMQ_URL,
backend=current_config.REDIS_URL
)
celery_app.conf.update(
task_queues=task_queues,
)
注意が必要なのは、死信キューが作成された後は、task_queues
配列から Queue(QUEUE_RETRY_TASK_DEADLETTER..)
の死信キューに関する設定を削除する必要があります。そうしないと、Celery がその死信キュー内のメッセージを消費してしまいます。
Celery タスクの定義#
@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
"""タスク再試行サービス"""
print('------retry_task-start')
"""
retryのパラメータには以下があります:
exc:指定された例外
throw:再試行時にワーカーに再試行タスクであることを通知するかどうか
eta:再試行の時間/日付を指定
countdown:どれくらい後に再試行するか(何秒ごとに再試行するか、デフォルトは3分) max_retries:最大再試行回数(デフォルトは3回)
"""
self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)
print('------retry_task-end')
注意が必要なのは、必ず acks_on_failure_or_timeout=False
を設定することです。最大再試行回数に達した後、Celery が自動的に MaxRetriesException を報告し、メッセージを自動確認するのを防ぐために、False
に設定することで自動確認を回避し、メッセージが最終的に死信キューにルーティングされるようにします。