diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java index ba134ad9344f3..66128299a186b 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; +import org.opensearch.index.store.remote.file.OnDemandCompositeBlockIndexInput; import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.utils.filetracker.FileState; @@ -29,12 +30,14 @@ public class CompositeDirectory extends FilterDirectory { private final FSDirectory localDirectory; private final RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter; private final FileCache fileCache; + private final FSDirectory localCacheDirectory; - public CompositeDirectory(FSDirectory localDirectory, FileCache fileCache) { + public CompositeDirectory(FSDirectory localDirectory, FSDirectory localCacheDirectory, FileCache fileCache) { super(localDirectory); this.localDirectory = localDirectory; this.fileCache = fileCache; this.remoteStoreFileTrackerAdapter = new CompositeDirectoryRemoteStoreFileTrackerAdapter(fileCache); + this.localCacheDirectory = localCacheDirectory; } public void setRemoteDirectory(Directory remoteDirectory) { @@ -102,7 +105,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { break; case REMOTE_ONLY: - // TODO - return an implementation of OnDemandBlockIndexInput where the fetchBlock method is implemented + indexInput = new OnDemandCompositeBlockIndexInput(remoteStoreFileTrackerAdapter, name, localCacheDirectory); break; } return indexInput; diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectoryFactory.java index bed6e2454574a..9f4fa5b959ed8 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectoryFactory.java @@ -21,10 +21,12 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.function.Supplier; public class CompositeDirectoryFactory implements IndexStorePlugin.DirectoryFactory { + private static String CACHE_LOCATION = "remote_cache"; private final Supplier repositoriesService; private final FileCache remoteStoreFileCache; @@ -36,8 +38,9 @@ public CompositeDirectoryFactory(Supplier repositoriesServi @Override public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException { - final Path location = shardPath.resolveIndex(); - final FSDirectory primaryDirectory = FSDirectory.open(location); - return new CompositeDirectory(primaryDirectory, remoteStoreFileCache); + final FSDirectory primaryDirectory = FSDirectory.open(shardPath.resolveIndex()); + final FSDirectory localCacheDirectory = FSDirectory.open(Files.createDirectories(shardPath.getDataPath().resolve(CACHE_LOCATION))); + localCacheDirectory.syncMetaData(); + return new CompositeDirectory(primaryDirectory, localCacheDirectory, remoteStoreFileCache); } } diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectoryRemoteStoreFileTrackerAdapter.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectoryRemoteStoreFileTrackerAdapter.java index b6ae712579375..1b205aed88dcb 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectoryRemoteStoreFileTrackerAdapter.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectoryRemoteStoreFileTrackerAdapter.java @@ -13,10 +13,12 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.utils.BlobFetchRequest; +import org.opensearch.index.store.remote.utils.TransferManager; import org.opensearch.index.store.remote.utils.filetracker.FileState; import org.opensearch.index.store.remote.utils.filetracker.FileTrackingInfo; import org.opensearch.index.store.remote.utils.filetracker.FileType; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -36,10 +38,21 @@ public void setRemoteDirectory(Directory remoteDirectory) { this.remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectory; } + public String getUploadedFileName(String name) { + return remoteDirectory.getExistingRemoteFilename(name); + } + + public long getFileLength(String name) { + try { + return remoteDirectory.fileLength(name); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override - public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) { - // TODO - This function will fetch the requested data from blobContainer - return null; + public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { + return new TransferManager(remoteDirectory.getDataDirectoryBlobContainer(), fileCache).fetchBlob(blobFetchRequest); } public void trackFile(String name, FileState fileState, FileType fileType) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index dab99fd25b192..d1c48a3634f45 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.logging.Loggers; @@ -134,6 +135,10 @@ public RemoteSegmentStoreDirectory( init(); } + public BlobContainer getDataDirectoryBlobContainer() { + return remoteDataDirectory.getBlobContainer(); + } + /** * Initializes the cache which keeps track of all the segment files uploaded to the remote segment store. * As this cache is specific to an instance of RemoteSegmentStoreDirectory, it is possible that cache becomes stale @@ -696,7 +701,7 @@ private String getChecksumOfLocalFile(Directory directory, String file) throws I } } - private String getExistingRemoteFilename(String localFilename) { + public String getExistingRemoteFilename(String localFilename) { if (segmentsUploadedToRemoteStore.containsKey(localFilename)) { return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename; } else { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileTrackerAdapter.java b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileTrackerAdapter.java index 9f3ef8a5571d3..29f05c8dc60e8 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileTrackerAdapter.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileTrackerAdapter.java @@ -13,8 +13,10 @@ import org.opensearch.index.store.remote.utils.filetracker.FileState; import org.opensearch.index.store.remote.utils.filetracker.FileType; +import java.io.IOException; + public interface RemoteStoreFileTrackerAdapter { - IndexInput fetchBlob(BlobFetchRequest blobFetchRequest); + IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException; void trackFile(String name, FileState fileState, FileType fileType); diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandCompositeBlockIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandCompositeBlockIndexInput.java new file mode 100644 index 0000000000000..8b8975be7a46b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandCompositeBlockIndexInput.java @@ -0,0 +1,93 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.file; + +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.store.CompositeDirectoryRemoteStoreFileTrackerAdapter; +import org.opensearch.index.store.RemoteStoreFileTrackerAdapter; +import org.opensearch.index.store.remote.utils.BlobFetchRequest; + +import java.io.IOException; + +public class OnDemandCompositeBlockIndexInput extends OnDemandBlockIndexInput { + + private final RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter; + private final String fileName; + private final Long originalFileSize; + private final FSDirectory directory; + + public OnDemandCompositeBlockIndexInput(RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter, String fileName, FSDirectory directory) { + this( + OnDemandBlockIndexInput.builder(). + resourceDescription("OnDemandCompositeBlockIndexInput"). + isClone(false). + offset(0L). + length(getFileLength(remoteStoreFileTrackerAdapter, fileName)), + remoteStoreFileTrackerAdapter, + fileName, + directory); + } + + public OnDemandCompositeBlockIndexInput(Builder builder, RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter, String fileName, FSDirectory directory) { + super(builder); + this.remoteStoreFileTrackerAdapter = remoteStoreFileTrackerAdapter; + this.directory = null; + this.fileName = fileName; + originalFileSize = getFileLength(remoteStoreFileTrackerAdapter, fileName); + } + + @Override + protected OnDemandCompositeBlockIndexInput buildSlice(String sliceDescription, long offset, long length) { + return new OnDemandCompositeBlockIndexInput( + OnDemandBlockIndexInput.builder(). + blockSizeShift(blockSizeShift). + isClone(true). + offset(this.offset + offset). + length(length). + resourceDescription(sliceDescription), + remoteStoreFileTrackerAdapter, + fileName, + directory + ); + } + + @Override + protected IndexInput fetchBlock(int blockId) throws IOException { + final String uploadedFileName = ((CompositeDirectoryRemoteStoreFileTrackerAdapter)remoteStoreFileTrackerAdapter).getUploadedFileName(fileName); + final String blockFileName = uploadedFileName + "." + blockId; + final long blockStart = getBlockStart(blockId); + final long length = getActualBlockSize(blockId); + + BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder() + .position(blockStart) + .length(length) + .blobName(uploadedFileName) + .directory(directory) + .fileName(blockFileName) + .build(); + return remoteStoreFileTrackerAdapter.fetchBlob(blobFetchRequest); + } + + @Override + public OnDemandBlockIndexInput clone() { + OnDemandCompositeBlockIndexInput clone = buildSlice("clone", 0L, this.length); + // ensures that clones may be positioned at the same point as the blocked file they were cloned from + clone.cloneBlock(this); + return clone; + } + + private long getActualBlockSize(int blockId) { + return (blockId != getBlock(originalFileSize - 1)) ? blockSize : getBlockOffset(originalFileSize - 1) + 1; + } + + private static long getFileLength(RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter, String fileName) { + return ((CompositeDirectoryRemoteStoreFileTrackerAdapter)remoteStoreFileTrackerAdapter).getFileLength(fileName); + } +}