myesn

myEsn2E9

hi
github

Celery: Route failed tasks to RabbitMQ's dead letter queue

Description#

Celery is a distributed task queue, and Celery tasks have an automatic retry feature. This article describes how to route task messages that fail even after reaching the maximum retry count to RabbitMQ's dead-letter queue.

Creating a Dead-Letter Queue#

Generally, a project only needs to configure one dead-letter queue Exchange, and then bind multiple dead-letter queues to the dead-letter Exchange based on business needs.

Here is the Python code to manually create a regular queue and a dead-letter queue:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Define the dead-letter queue
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# Bind DLQ to an exchange
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')

# Define the main queue with DLQ settings
arguments = {
  'x-dead-letter-exchange': 'dead_letter_exchange',
  'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')

Alternatively, you can pass the Exchange, Queue, and Routing Key to Celery, which will also create them automatically (default behavior, can be disabled through configuration):

# Queue list
task_queues = [
    # After the dead-letter queue is created, it needs to be removed or commented out from task_queues; otherwise, messages delivered to the dead-letter queue will be consumed by Celery again.
    Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
    Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
          queue_arguments={
              # Declare the dead-letter exchange bound to the current queue
              'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
              # Declare the dead-letter routing key for the current queue
              'x-dead-letter-routing-key': 'task.retry.deadletter'
          }
    )
]

celery_app = Celery(
    'demo',
    broker=current_config.RABBITMQ_URL,
    backend=current_config.REDIS_URL
)
celery_app.conf.update(
    task_queues=task_queues,
)

It is important to note that after the dead-letter queue is created, the configuration for Queue(QUEUE_RETRY_TASK_DEADLETTER..) regarding the dead-letter queue needs to be removed from the task_queues array; otherwise, Celery will consume messages from that dead-letter queue.

Defining Celery Tasks#

@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
    """Task retry service"""
    print('------retry_task-start')
    """
    The parameters for retry can include: 
        exc: specify the exception to be raised
        throw: whether to notify the worker that this is a retry task
        eta: specify the time/date for retry
        countdown: how long to wait before retrying (retry every few seconds, default is 3 minutes) max_retries: maximum number of retries (default is 3)
    """
    self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)

    print('------retry_task-end')

It is important to note that acks_on_failure_or_timeout=False must be set to prevent Celery from automatically acknowledging the message after reaching the maximum retry count and reporting a MaxRetriesException. Setting it to False can prevent automatic acknowledgment, allowing the message to be routed to the dead-letter queue.

References#

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.