diff --git a/dramatiq/brokers/rabbitmq.py b/dramatiq/brokers/rabbitmq.py index de6c0657..acc5f57e 100644 --- a/dramatiq/brokers/rabbitmq.py +++ b/dramatiq/brokers/rabbitmq.py @@ -246,8 +246,9 @@ def declare_queue(self, queue_name, *, ensure=False): self._ensure_queue(queue_name) def _ensure_queue(self, queue_name): - attempts = 1 + attempts = 0 while True: + attempts += 1 try: if queue_name in self.queues_pending: self._declare_queue(queue_name) @@ -262,7 +263,6 @@ def _ensure_queue(self, queue_name): # caller may initiate new ones of each. del self.connection - attempts += 1 if attempts > MAX_DECLARE_ATTEMPTS: raise ConnectionClosed(e) from None @@ -321,8 +321,9 @@ def enqueue(self, message, *, delay=None): }, ) - attempts = 1 + attempts = 0 while True: + attempts += 1 try: self.declare_queue(canonical_queue_name, ensure=True) self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name) @@ -351,7 +352,6 @@ def enqueue(self, message, *, delay=None): if getattr(e, "reply_code", None) == 404: self.queues.remove(q_name(queue_name)) - attempts += 1 if attempts > MAX_ENQUEUE_ATTEMPTS: raise ConnectionClosed(e) from None