From 3f2588bb1fd38daf6569fae2d3ae72a7b8328224 Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Wed, 19 Jun 2024 16:26:29 +0300 Subject: [PATCH] Make gRPC service report backups as FAILED if lose their callback future --- medusa/backup_manager.py | 19 ++++++++++++++----- medusa/service/grpc/server.py | 19 +++++++++++++++++++ tests/backup_man_test.py | 7 ++++++- tests/backup_node_test.py | 1 + tests/service/grpc/server_test.py | 1 + 5 files changed, 41 insertions(+), 6 deletions(-) diff --git a/medusa/backup_manager.py b/medusa/backup_manager.py index 7abdcca4a..841e2ca8b 100644 --- a/medusa/backup_manager.py +++ b/medusa/backup_manager.py @@ -112,9 +112,9 @@ def register_backup(backup_name, is_async, overwrite_existing=True): if overwrite_existing: if not BackupMan.__clean(backup_name): logging.error(f"Registered backup name {backup_name} cleanup failed prior to re-register.") - - BackupMan.__instance.__backups[backup_name] = [None, BackupMan.STATUS_UNKNOWN, is_async] - logging.info("Registered backup id {}".format(backup_name)) + else: + BackupMan.__instance.__backups[backup_name] = [None, BackupMan.STATUS_UNKNOWN, is_async] + logging.info("Registered backup id {}".format(backup_name)) # Caller can decide how long to wait for a result using the registered backup future returned. # A future is returned (for async mode), otherwise None (for non-async mode). @@ -128,8 +128,10 @@ def get_backup_future(backup_name): with lock: backup_state = BackupMan.__instance.__backups[backup_name] if backup_state: - logging.debug("Returning backup future for id: {}".format(backup_name)) - return backup_state[BackupMan.__IDX_FUTURE] + future = backup_state[BackupMan.__IDX_FUTURE] + if future is not None and not future.done(): + return backup_state[BackupMan.__IDX_FUTURE] + raise RuntimeError(f'Backup future not found or already completed for id: {backup_name} {future}') raise RuntimeError('Backup not located for id: {}'.format(backup_name)) @@ -152,6 +154,13 @@ def remove_all_backups(): is_all_cleanup_successful = True else: for backup_name in list(BackupMan.__instance.__backups): + try: + future = BackupMan.get_backup_future(backup_name) + if future is not None and not future.done(): + future.cancel() + except RuntimeError: + # the future was not there, so there's nothing to cancel + pass if not BackupMan.__clean(backup_name): is_all_cleanup_successful = False BackupMan.__instance.__backups = None diff --git a/medusa/service/grpc/server.py b/medusa/service/grpc/server.py index c47d47e90..e33769d0d 100644 --- a/medusa/service/grpc/server.py +++ b/medusa/service/grpc/server.py @@ -190,11 +190,20 @@ def BackupStatus(self, request, context): else: response.finishTime = "" BackupMan.register_backup(request.backupName, is_async=False, overwrite_existing=False) + # determine backup state status = BackupMan.STATUS_UNKNOWN if backup.started: status = BackupMan.STATUS_IN_PROGRESS if backup.finished: status = BackupMan.STATUS_SUCCESS + if status == BackupMan.STATUS_IN_PROGRESS: + # if the backup is in progress, check if we have the future waiting for its completion + try: + if BackupMan.get_backup_future(request.backupName) is None: + status = BackupMan.STATUS_FAILED + except RuntimeError: + # if we don't, then something bad happened (eg we restarted), so it's a failure + status = BackupMan.STATUS_FAILED BackupMan.update_backup_status(request.backupName, status) # record the status record_status_in_response(response, request.backupName) @@ -340,6 +349,13 @@ def get_backup_summary(backup): summary.finishTime = backup.finished summary.status = medusa_pb2.StatusType.SUCCESS + if summary.status == medusa_pb2.StatusType.IN_PROGRESS: + try: + if BackupMan.get_backup_future(backup.name) is None: + summary.status = medusa_pb2.StatusType.FAILED + except RuntimeError: + summary.status = medusa_pb2.StatusType.FAILED + summary.totalNodes = len(backup.tokenmap) summary.finishedNodes = len(backup.complete_nodes()) @@ -356,6 +372,9 @@ def get_backup_summary(backup): # Callback function for recording unique backup results def record_backup_info(future): + if future.cancelled(): + return + try: logging.info("Recording async backup information.") if future.exception(): diff --git a/tests/backup_man_test.py b/tests/backup_man_test.py index 1601ab6fa..8859b1608 100644 --- a/tests/backup_man_test.py +++ b/tests/backup_man_test.py @@ -58,7 +58,8 @@ def test_set_backup_future_missing_name(self): def test_register_backup_sync_mode(self): BackupMan.register_backup("test_backup_id", is_async=False) self.assertEqual(BackupMan.STATUS_UNKNOWN, BackupMan.get_backup_status("test_backup_id")) - self.assertEqual(None, BackupMan.get_backup_future("test_backup_id")) + with self.assertRaises(RuntimeError): + BackupMan.get_backup_future("test_backup_id") BackupMan.update_backup_status("test_backup_id", BackupMan.STATUS_SUCCESS) self.assertEqual(BackupMan.STATUS_SUCCESS, BackupMan.get_backup_status("test_backup_id")) @@ -66,6 +67,7 @@ def test_register_backup_sync_mode(self): def test_register_backup_async_mode(self): backup_id = "test_backup_id" mock_future = Mock(concurrent.futures.Future) + mock_future.done = lambda: False BackupMan.register_backup(backup_id, is_async=True) BackupMan.set_backup_future(backup_id, mock_future) stored_future = BackupMan.get_backup_future(backup_id) @@ -76,6 +78,7 @@ def test_register_backup_async_mode(self): backup_id_2 = "test_backup_id_2" mock_future_2 = Mock(concurrent.futures.Future) + mock_future_2.done = lambda: False BackupMan.register_backup(backup_id_2, is_async=True) BackupMan.set_backup_future(backup_id_2, mock_future_2) @@ -91,7 +94,9 @@ def test_register_backup_duplicate(self): # Self-healing of detected duplicate, clean and reset w/ new expected backup_id_1 = "test_backup_id" mock_future_1 = Mock(concurrent.futures.Future) + mock_future_1.done = lambda: False mock_future_2 = Mock(concurrent.futures.Future) + mock_future_2.done = lambda: False BackupMan.register_backup(backup_id_1, is_async=True) BackupMan.set_backup_future(backup_id_1, mock_future_1) self.assertEqual(BackupMan.get_backup_future(backup_id_1), mock_future_1) diff --git a/tests/backup_node_test.py b/tests/backup_node_test.py index 02d76248d..324ba0739 100644 --- a/tests/backup_node_test.py +++ b/tests/backup_node_test.py @@ -72,6 +72,7 @@ def test_handle_backup_async(self, mock_start_backup, mock_storage, mock_cassand backup_name_arg=test_backup_name, stagger_time=None, enable_md5_checks_flag=False, mode="differential") mock_future_instance = MagicMock() + mock_future_instance.done = lambda: False mock_callback = MagicMock() mock_future_instance.result.return_value = {"foo": "bar"} backup_future.add_done_callback(mock_callback) diff --git a/tests/service/grpc/server_test.py b/tests/service/grpc/server_test.py index 133748f46..fc99b82ee 100644 --- a/tests/service/grpc/server_test.py +++ b/tests/service/grpc/server_test.py @@ -109,6 +109,7 @@ def test_get_known_incomplete_backup(self): "node1": {"tokens": [-1094266504216117253], "is_up": True, "rack": "r1", "dc": "dc1"}, "node2": {"tokens": [1094266504216117253], "is_up": True, "rack": "r1", "dc": "dc1"} } + BackupMan.remove_backup('backup1') BackupMan.register_backup('backup1', True) BackupMan.update_backup_status('backup1', BackupMan.STATUS_IN_PROGRESS)