myesn

myEsn2E9

hi
github

Celery: 失敗したタスクをRabbitMQのデッドレターキューにルーティングする

説明#

Celery は分散タスクキューで、Celery タスクには自動再試行機能があります。本記事では、最大再試行回数に達した後も失敗したタスクメッセージを RabbitMQ の死信キューにルーティングする方法を紹介します。

死信キューの作成#

一般的に、プロジェクトには 1 つの死信キューのエクスチェンジを設定すればよく、その後、ビジネスに応じて複数の死信キューを死信エクスチェンジにバインドします。

以下は、通常のキューと死信キューを手動で作成するための Python コードです:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 死信キューを定義
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# DLQをエクスチェンジにバインド
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')

# DLQ設定を持つメインキューを定義
arguments = {
  'x-dead-letter-exchange': 'dead_letter_exchange',
  'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')

または、エクスチェンジ、キュー、ルーティングキーを Celery に渡すこともでき、Celery が自動的に作成します(デフォルトの動作で、設定によって自動作成を無効にできます):

# キューリスト
task_queues = [
    # 死信キューが作成された後は、task_queues から削除またはコメントアウトする必要があります。そうしないと、後続の死信キューへのメッセージが再び Celery に消費されます。
    Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
    Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
          queue_arguments={
              # 現在のキューがバインドされている死信エクスチェンジを宣言
              'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
              # 現在のキューの死信ルーティングキーを宣言
              'x-dead-letter-routing-key': 'task.retry.deadletter'
          }
    )
]

celery_app = Celery(
    'demo',
    broker=current_config.RABBITMQ_URL,
    backend=current_config.REDIS_URL
)
celery_app.conf.update(
    task_queues=task_queues,
)

注意が必要なのは、死信キューが作成された後は、task_queues 配列から Queue(QUEUE_RETRY_TASK_DEADLETTER..) の死信キューに関する設定を削除する必要があります。そうしないと、Celery がその死信キュー内のメッセージを消費してしまいます。

Celery タスクの定義#

@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
    """タスク再試行サービス"""
    print('------retry_task-start')
    """
    retryのパラメータには以下があります: 
        exc:指定された例外
        throw:再試行時にワーカーに再試行タスクであることを通知するかどうか
        eta:再試行の時間/日付を指定
        countdown:どれくらい後に再試行するか(何秒ごとに再試行するか、デフォルトは3分) max_retries:最大再試行回数(デフォルトは3回)
    """
    self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)

    print('------retry_task-end')

注意が必要なのは、必ず acks_on_failure_or_timeout=False を設定することです。最大再試行回数に達した後、Celery が自動的に MaxRetriesException を報告し、メッセージを自動確認するのを防ぐために、False に設定することで自動確認を回避し、メッセージが最終的に死信キューにルーティングされるようにします。

参考#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。