Description#
Celery is a distributed task queue, and Celery tasks have an automatic retry feature. This article describes how to route task messages that fail even after reaching the maximum retry count to RabbitMQ's dead-letter queue.
Creating a Dead-Letter Queue#
Generally, a project only needs to configure one dead-letter queue Exchange, and then bind multiple dead-letter queues to the dead-letter Exchange based on business needs.
Here is the Python code to manually create a regular queue and a dead-letter queue:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Define the dead-letter queue
channel.queue_declare(queue='dead_letter_queue')
channel.exchange_declare(exchange='dead_letter_exchange', exchange_type='direct')
# Bind DLQ to an exchange
channel.queue_bind(exchange='dead_letter_exchange', queue='dead_letter_queue', routing_key='dead')
# Define the main queue with DLQ settings
arguments = {
'x-dead-letter-exchange': 'dead_letter_exchange',
'x-dead-letter-routing-key': 'dead'
}
channel.queue_declare(queue='task_queue', arguments=arguments)
channel.exchange_declare(exchange='task_exchange', exchange_type='direct')
channel.queue_bind(exchange='task_exchange', queue='task_queue', routing_key='task')
Alternatively, you can pass the Exchange, Queue, and Routing Key to Celery, which will also create them automatically (default behavior, can be disabled through configuration):
# Queue list
task_queues = [
# After the dead-letter queue is created, it needs to be removed or commented out from task_queues; otherwise, messages delivered to the dead-letter queue will be consumed by Celery again.
Queue(QUEUE_RETRY_TASK_DEADLETTER, Exchange('task_retry_deadletter_exchange', type='direct'), routing_key='task.retry.deadletter'),
Queue(QUEUE_RETRY_TASK, Exchange('task_retry_exchange', type='direct'), routing_key='task.retry',
queue_arguments={
# Declare the dead-letter exchange bound to the current queue
'x-dead-letter-exchange': 'task_retry_deadletter_exchange',
# Declare the dead-letter routing key for the current queue
'x-dead-letter-routing-key': 'task.retry.deadletter'
}
)
]
celery_app = Celery(
'demo',
broker=current_config.RABBITMQ_URL,
backend=current_config.REDIS_URL
)
celery_app.conf.update(
task_queues=task_queues,
)
It is important to note that after the dead-letter queue is created, the configuration for Queue(QUEUE_RETRY_TASK_DEADLETTER..)
regarding the dead-letter queue needs to be removed from the task_queues
array; otherwise, Celery will consume messages from that dead-letter queue.
Defining Celery Tasks#
@celery_app.task(bind=True, base=Task, name='tasks.retry', acks_on_failure_or_timeout=False)
def retry_task(self, **kwargs):
"""Task retry service"""
print('------retry_task-start')
"""
The parameters for retry can include:
exc: specify the exception to be raised
throw: whether to notify the worker that this is a retry task
eta: specify the time/date for retry
countdown: how long to wait before retrying (retry every few seconds, default is 3 minutes) max_retries: maximum number of retries (default is 3)
"""
self.retry(exc=Exception('eeeeeee'), countdown=3, max_retries=1)
print('------retry_task-end')
It is important to note that acks_on_failure_or_timeout=False
must be set to prevent Celery from automatically acknowledging the message after reaching the maximum retry count and reporting a MaxRetriesException. Setting it to False
can prevent automatic acknowledgment, allowing the message to be routed to the dead-letter queue.