From b3d5cc567662dd1a58fee79436dcfde146a78ec6 Mon Sep 17 00:00:00 2001 From: Yuguang Wang Date: Wed, 24 May 2023 13:46:09 +0800 Subject: [PATCH] Explicitly pass in the lock id This patch get rid of `reset` in favor of `release` for locks. --- openlcs/libs/exceptions.py | 5 +++-- openlcs/libs/redis.py | 18 ++++++++++++++---- .../management/commands/flush_task_locks.py | 4 ++-- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/openlcs/libs/exceptions.py b/openlcs/libs/exceptions.py index 2e48dc4a..bdbe2488 100644 --- a/openlcs/libs/exceptions.py +++ b/openlcs/libs/exceptions.py @@ -17,10 +17,11 @@ class UnsupportedPriority(OpenLCSException): class TaskResubmissionException(OpenLCSException): - """Raised when tasks with identical name/args are submited simultaneously. + """Raised when tasks with identical name/args are submitted + simultaneously. This is enforced by acquiring a dedicated lock for each submitted task, lock won't be released unless tasks are finished, meaning no more tasks - with identical name/args are allowed to be submited again. + with identical name/args are allowed to be submitted again. """ pass # pylint: disable=unnecessary-pass diff --git a/openlcs/libs/redis.py b/openlcs/libs/redis.py index c2e125f8..3d958df7 100644 --- a/openlcs/libs/redis.py +++ b/openlcs/libs/redis.py @@ -1,12 +1,17 @@ import hashlib import json +import logging from redis import Redis from redis_lock import Lock +from redis_lock import NotAcquired from openlcsd.celeryconfig import broker_url from openlcsd.celeryconfig import task_time_limit from .constants import TASK_IDENTITY_PREFIX # noqa: E402 +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + # Based upon singleton's lock generation mechanism with some simplification. def generate_lock_key( @@ -73,7 +78,12 @@ def release_lock_for_key(self, lock_key: str, lock_id=None) -> None: when you want to release the lock in a different place other than where the lock is acquired. Defaults to None. """ - lock = self.get_lock(lock_key) - # FIXME: Forcibly deletes the lock. Need to figure out why - # lock.release() won't work. - lock.reset() + lock = self.get_lock(lock_key, lock_id) + lock_repr = f"{lock._name}(id: {lock.id})" + logger.info("Start to release %s...", lock_repr) + try: + lock.release() + except NotAcquired: + logger.info("Lock %s not acquired.", lock_repr) + else: + logger.info("Lock %s released.", lock_repr) diff --git a/openlcs/tasks/management/commands/flush_task_locks.py b/openlcs/tasks/management/commands/flush_task_locks.py index a382bf5c..707556de 100644 --- a/openlcs/tasks/management/commands/flush_task_locks.py +++ b/openlcs/tasks/management/commands/flush_task_locks.py @@ -18,7 +18,6 @@ def add_arguments(self, parser): action='store_true', help='Show actions to perform without changing anything.') - def handle(self, *args, **options): prefix = options['prefix'] dry_run = options['dry_run'] @@ -31,7 +30,8 @@ def handle(self, *args, **options): if keys: if dry_run: self.stdout.write('Dry run mode, will delete following keys:') - self.stdout.write('\n'.join(keys)) + self.stdout.write( + '\n'.join(map(lambda k: k.decode('utf-8'), keys))) else: redis_client.delete(*keys) self.stdout.write(