Skip to content

Commit

Permalink
Incoporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Apr 22, 2024
1 parent a57054b commit a58d712
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

/**
* Hook for running code that needs to be executed before the upload of index metadata. Here we have introduced a hook
Expand All @@ -20,7 +23,16 @@
*
* @opensearch.internal
*/
public interface IndexMetadataUploadListener {
public abstract class IndexMetadataUploadListener {

private final ExecutorService executorService;

public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName) {
Objects.requireNonNull(threadPool);
Objects.requireNonNull(threadPoolName);
assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false;
this.executorService = threadPool.executor(threadPoolName);
}

/**
* Runs before the new index upload of index metadata (or first time upload). The caller is expected to trigger
Expand All @@ -29,7 +41,9 @@ public interface IndexMetadataUploadListener {
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param actionListener listener to be invoked on success or failure.
*/
void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);
public final void onNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener));
}

String getThreadpoolName();
protected abstract void doOnNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,17 +532,13 @@ private void invokeIndexMetadataUploadListeners(
List<Exception> exceptionList
) {
for (IndexMetadataUploadListener listener : indexMetadataUploadListeners) {
// We are submitting the task for async execution to ensure that we are not blocking the cluster state upload
String listenerName = listener.getClass().getSimpleName();
String threadPoolName = listener.getThreadpoolName();
assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false;
threadpool.executor(threadPoolName).execute(() -> {
listener.beforeNewIndexUpload(
newIndexMetadataList,
getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName)
);
});
listener.onNewIndexUpload(
newIndexMetadataList,
getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName)
);
}

}

private ActionListener<Void> getIndexMetadataUploadActionListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreFormat;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ConfigBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -57,9 +57,11 @@
* @opensearch.internal
*/
@ExperimentalApi
public class RemoteIndexPathUploader implements IndexMetadataUploadListener {
public class RemoteIndexPathUploader extends IndexMetadataUploadListener {

public static final BlobStoreFormat<RemoteIndexPath> REMOTE_INDEX_PATH_FORMAT = new BlobStoreFormat<>(RemoteIndexPath.FILE_NAME_FORMAT);
public static final ConfigBlobStoreFormat<RemoteIndexPath> REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>(
RemoteIndexPath.FILE_NAME_FORMAT
);

private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s";
private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s";
Expand All @@ -79,7 +81,13 @@ public class RemoteIndexPathUploader implements IndexMetadataUploadListener {
private BlobStoreRepository translogRepository;
private BlobStoreRepository segmentRepository;

public RemoteIndexPathUploader(Settings settings, Supplier<RepositoriesService> repositoriesService, ClusterSettings clusterSettings) {
public RemoteIndexPathUploader(
ThreadPool threadPool,
Settings settings,
Supplier<RepositoriesService> repositoriesService,
ClusterSettings clusterSettings
) {
super(threadPool, ThreadPool.Names.GENERIC);
this.settings = Objects.requireNonNull(settings);
this.repositoriesService = Objects.requireNonNull(repositoriesService);
isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings);
Expand All @@ -91,12 +99,7 @@ public RemoteIndexPathUploader(Settings settings, Supplier<RepositoriesService>
}

@Override
public String getThreadpoolName() {
return ThreadPool.Names.GENERIC;
}

@Override
public void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
protected void doOnNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
if (isRemoteDataAttributePresent == false) {
logger.trace("Skipping beforeNewIndexUpload as there are no remote indexes");
actionListener.onResponse(null);
Expand All @@ -105,24 +108,16 @@ public void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionLi

long startTime = System.nanoTime();
boolean success = false;
List<IndexMetadata> eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList());
String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(","));
int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2);
CountDownLatch latch = new CountDownLatch(latchCount);
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));
try {
List<IndexMetadata> eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList());
int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2);
CountDownLatch latch = new CountDownLatch(latchCount);
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));
for (IndexMetadata indexMetadata : eligibleList) {
try {
writeIndexPathAsync(indexMetadata, latch, exceptionList);
} catch (IOException exception) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(indexMetadata.getIndex().getName()))
);
exceptionList.forEach(ex::addSuppressed);
actionListener.onFailure(ex);
return;
}
writeIndexPathAsync(indexMetadata, latch, exceptionList);
}
String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(","));

logger.trace(new ParameterizedMessage("Remote index path upload started for {}", indexNames));

try {
Expand Down Expand Up @@ -153,17 +148,21 @@ public void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionLi
}
success = true;
actionListener.onResponse(null);
} catch (Exception ex) {
} catch (Exception exception) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, indexNames),

Check warning on line 153 in server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java#L151-L153

Added lines #L151 - L153 were not covered by tests
exception
);
exceptionList.forEach(ex::addSuppressed);
actionListener.onFailure(ex);

Check warning on line 157 in server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java#L156-L157

Added lines #L156 - L157 were not covered by tests
throw ex;
} finally {
long tookTimeNs = System.nanoTime() - startTime;
logger.trace(new ParameterizedMessage("executed beforeNewIndexUpload status={} tookTimeNs={}", success, tookTimeNs));
}

}

private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List<Exception> exceptionList) throws IOException {
private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List<Exception> exceptionList) {
if (isTranslogSegmentRepoSame) {
// If the repositories are same, then we need to upload a single file containing paths for both translog and segments.
writePathToRemoteStore(idxMD, translogRepository, latch, exceptionList, COMBINED_PATH);
Expand All @@ -181,7 +180,7 @@ private void writePathToRemoteStore(
CountDownLatch latch,
List<Exception> exceptionList,
Map<RemoteStoreEnums.DataCategory, List<RemoteStoreEnums.DataType>> pathCreationMap
) throws IOException {
) {
Map<String, String> remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.valueOf(remoteCustomData.get(RemoteStoreEnums.PathType.NAME));
RemoteStoreEnums.PathHashAlgorithm hashAlgorithm = RemoteStoreEnums.PathHashAlgorithm.valueOf(
Expand All @@ -191,13 +190,20 @@ private void writePathToRemoteStore(
int shardCount = idxMD.getNumberOfShards();
BlobPath basePath = repository.basePath();
BlobContainer blobContainer = repository.blobStore().blobContainer(basePath.add(RemoteIndexPath.DIR));
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, basePath, pathType, hashAlgorithm, pathCreationMap),
blobContainer,
indexUUID,
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap)
);

ActionListener<Void> actionListener = getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap);
try {
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, basePath, pathType, hashAlgorithm, pathCreationMap),
blobContainer,
indexUUID,
actionListener
);
} catch (IOException ioException) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(idxMD.getIndex().getName()))

Check warning on line 203 in server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java#L201-L203

Added lines #L201 - L203 were not covered by tests
);
actionListener.onFailure(ioException);

Check warning on line 205 in server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java#L205

Added line #L205 was not covered by tests
}
}

private Repository validateAndGetRepository(String repoSetting) {
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ protected Node(
final RemoteIndexPathUploader remoteIndexPathUploader;
if (isRemoteStoreClusterStateEnabled(settings)) {
remoteIndexPathUploader = new RemoteIndexPathUploader(

Check warning on line 732 in server/src/main/java/org/opensearch/node/Node.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/Node.java#L732

Added line #L732 was not covered by tests
threadPool,
settings,
repositoriesServiceReference::get,
clusterService.getClusterSettings()

Check warning on line 736 in server/src/main/java/org/opensearch/node/Node.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/Node.java#L735-L736

Added lines #L735 - L736 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.blobstore;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Locale;
import java.util.Objects;

/**
* Provides common methods, variables that can be used by the implementors.
*
* @opensearch.internal
*/
public class AbstractBlobStoreFormat<T extends ToXContent> {

private static final int BUFFER_SIZE = 4096;

private final String blobNameFormat;

private final boolean skipHeaderFooter;

/**
* @param blobNameFormat format of the blobname in {@link String#format} format
*/
public AbstractBlobStoreFormat(String blobNameFormat, boolean skipHeaderFooter) {
this.blobNameFormat = blobNameFormat;
this.skipHeaderFooter = skipHeaderFooter;
}

protected String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}

/**
* Writes blob with resolving the blob name using {@link #blobName} method.
* <p>
* The blob will optionally by compressed.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param compressor whether to use compression
* @param params ToXContent params
* @param codec codec used
* @param version version used
*/
protected void write(
final T obj,
final BlobContainer blobContainer,
final String name,
final Compressor compressor,
final ToXContent.Params params,
XContentType xContentType,
String codec,
Integer version
) throws IOException {
final String blobName = blobName(name);
final BytesReference bytes = serialize(obj, blobName, compressor, params, xContentType, codec, version);
blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false);
}

public BytesReference serialize(
final T obj,
final String blobName,
final Compressor compressor,
final ToXContent.Params params,
XContentType xContentType,
String codec,
Integer version
) throws IOException {
assert skipHeaderFooter || (Objects.nonNull(codec) && Objects.nonNull(version));
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")",
blobName,
outputStream,
BUFFER_SIZE
)
) {
if (skipHeaderFooter == false) {
CodecUtil.writeHeader(indexOutput, codec, version);
}
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
}
};
XContentBuilder builder = MediaTypeRegistry.contentBuilder(
xContentType,
compressor.threadLocalOutputStream(indexOutputOutputStream)
)
) {
builder.startObject();
obj.toXContent(builder, params);
builder.endObject();
}
if (skipHeaderFooter == false) {
CodecUtil.writeFooter(indexOutput);
}
}
return outputStream.bytes();
}
}

protected String getBlobNameFormat() {
return blobNameFormat;
}
}
Loading

0 comments on commit a58d712

Please sign in to comment.