Skip to content

Commit

Permalink
Explicitly pass in the lock id
Browse files Browse the repository at this point in the history
This patch get rid of `reset` in favor of `release` for locks.
  • Loading branch information
rhyw committed May 29, 2023
1 parent d3526f3 commit b3d5cc5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
5 changes: 3 additions & 2 deletions openlcs/libs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ class UnsupportedPriority(OpenLCSException):


class TaskResubmissionException(OpenLCSException):
"""Raised when tasks with identical name/args are submited simultaneously.
"""Raised when tasks with identical name/args are submitted
simultaneously.
This is enforced by acquiring a dedicated lock for each submitted task,
lock won't be released unless tasks are finished, meaning no more tasks
with identical name/args are allowed to be submited again.
with identical name/args are allowed to be submitted again.
"""
pass # pylint: disable=unnecessary-pass
18 changes: 14 additions & 4 deletions openlcs/libs/redis.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import hashlib
import json
import logging
from redis import Redis
from redis_lock import Lock
from redis_lock import NotAcquired

from openlcsd.celeryconfig import broker_url
from openlcsd.celeryconfig import task_time_limit
from .constants import TASK_IDENTITY_PREFIX # noqa: E402

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


# Based upon singleton's lock generation mechanism with some simplification.
def generate_lock_key(
Expand Down Expand Up @@ -73,7 +78,12 @@ def release_lock_for_key(self, lock_key: str, lock_id=None) -> None:
when you want to release the lock in a different place other
than where the lock is acquired. Defaults to None.
"""
lock = self.get_lock(lock_key)
# FIXME: Forcibly deletes the lock. Need to figure out why
# lock.release() won't work.
lock.reset()
lock = self.get_lock(lock_key, lock_id)
lock_repr = f"{lock._name}(id: {lock.id})"
logger.info("Start to release %s...", lock_repr)
try:
lock.release()
except NotAcquired:
logger.info("Lock %s not acquired.", lock_repr)
else:
logger.info("Lock %s released.", lock_repr)
4 changes: 2 additions & 2 deletions openlcs/tasks/management/commands/flush_task_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def add_arguments(self, parser):
action='store_true',
help='Show actions to perform without changing anything.')


def handle(self, *args, **options):
prefix = options['prefix']
dry_run = options['dry_run']
Expand All @@ -31,7 +30,8 @@ def handle(self, *args, **options):
if keys:
if dry_run:
self.stdout.write('Dry run mode, will delete following keys:')
self.stdout.write('\n'.join(keys))
self.stdout.write(
'\n'.join(map(lambda k: k.decode('utf-8'), keys)))
else:
redis_client.delete(*keys)
self.stdout.write(
Expand Down

0 comments on commit b3d5cc5

Please sign in to comment.