From 32a9c71fea28f5b31631dce055a540fe4dbf87a1 Mon Sep 17 00:00:00 2001 From: Jens Troeger Date: Fri, 20 Dec 2024 17:51:36 +1000 Subject: [PATCH] =?UTF-8?q?fix:=20make=20sure=20RabbitMQ=20broker=E2=80=99?= =?UTF-8?q?s=20enqueue()=20attempts=20the=20correct=20number=20of=20preset?= =?UTF-8?q?=20retries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dramatiq/brokers/rabbitmq.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dramatiq/brokers/rabbitmq.py b/dramatiq/brokers/rabbitmq.py index de6c0657..acc5f57e 100644 --- a/dramatiq/brokers/rabbitmq.py +++ b/dramatiq/brokers/rabbitmq.py @@ -246,8 +246,9 @@ def declare_queue(self, queue_name, *, ensure=False): self._ensure_queue(queue_name) def _ensure_queue(self, queue_name): - attempts = 1 + attempts = 0 while True: + attempts += 1 try: if queue_name in self.queues_pending: self._declare_queue(queue_name) @@ -262,7 +263,6 @@ def _ensure_queue(self, queue_name): # caller may initiate new ones of each. del self.connection - attempts += 1 if attempts > MAX_DECLARE_ATTEMPTS: raise ConnectionClosed(e) from None @@ -321,8 +321,9 @@ def enqueue(self, message, *, delay=None): }, ) - attempts = 1 + attempts = 0 while True: + attempts += 1 try: self.declare_queue(canonical_queue_name, ensure=True) self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name) @@ -351,7 +352,6 @@ def enqueue(self, message, *, delay=None): if getattr(e, "reply_code", None) == 404: self.queues.remove(q_name(queue_name)) - attempts += 1 if attempts > MAX_ENQUEUE_ATTEMPTS: raise ConnectionClosed(e) from None