Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote State] fix lock release before deletion is completed #10611

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,8 @@
* @param clusterUUID uuid of cluster state to refer to in remote
* @param manifestsToRetain no of latest manifest files to keep in remote
*/
private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
// public for testing
public void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
logger.info("Delete stale cluster metadata task is already in progress.");
return;
Expand Down Expand Up @@ -1109,8 +1110,9 @@
}
}
);
} finally {
} catch (Exception e) {

Check warning on line 1113 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1113

Added line #L1113 was not covered by tests
deleteStaleMetadataRunning.set(false);
throw e;

Check warning on line 1115 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1115

Added line #L1115 was not covered by tests
}
}

Expand Down Expand Up @@ -1190,7 +1192,7 @@
public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
String clusterName = clusterState.getClusterName().value();
logger.info("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
Set<String> allClustersUUIDsInRemote;
try {
allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
Expand All @@ -73,6 +76,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
Expand Down Expand Up @@ -1004,6 +1008,36 @@ public void testFileNames() {
assertThat(splittedName[3], is("P"));
}

public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
BlobContainer blobContainer = mock(BlobContainer.class);
BlobPath blobPath = new BlobPath().add("random-path");
when((blobStoreRepository.basePath())).thenReturn(blobPath);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);

CountDownLatch latch = new CountDownLatch(1);
AtomicInteger callCount = new AtomicInteger(0);
doAnswer(invocation -> {
callCount.incrementAndGet();
if (latch.await(5000, TimeUnit.SECONDS) == false) {
throw new Exception("Timed out waiting for delete task queuing to complete");
}
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
any(String.class),
any(int.class),
any(BlobContainer.BlobNameSortOrder.class),
any(ActionListener.class)
);

remoteClusterStateService.start();
remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);

latch.countDown();
assertBusy(() -> assertEquals(1, callCount.get()));
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
final BlobPath blobPath = mock(BlobPath.class);
when((blobStoreRepository.basePath())).thenReturn(blobPath);
Expand Down
Loading