Skip to content

Commit

Permalink
[Remote Store] Fix stuck segments upload issue
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Nov 1, 2023
1 parent 8673fa9 commit ee651e7
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public long getRefreshSeqNoLag() {
}

public long getTimeMsLag() {
if (remoteRefreshTimeMs == localRefreshTimeMs) {
if (remoteRefreshTimeMs == localRefreshTimeMs || bytesLag == 0) {
return 0;
}
return currentTimeMsUsingSystemNanos() - remoteRefreshStartTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,21 @@ private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, Acti
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = createUploadListener();
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
try {
statsListener.onSuccess(src);
} finally {
batchUploadListener.onResponse(resp);
}
}, ex -> {
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex);
if (ex instanceof CorruptIndexException) {
indexShard.failShard(ex.getMessage(), ex);
try {
if (ex instanceof CorruptIndexException) {
indexShard.failShard(ex.getMessage(), ex);
}
statsListener.onFailure(src);
} finally {
batchUploadListener.onFailure(ex);
}
statsListener.onFailure(src);
batchUploadListener.onFailure(ex);
});
statsListener.beforeUpload(src);
remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener);
Expand Down Expand Up @@ -532,22 +538,34 @@ private UploadListener createUploadListener() {
@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
if (segmentTracker.getLatestLocalFileNameLengthMap().containsKey(file)) {
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
} else {
logger.warn("beforeUpload {} missing from segment tracker", file);
}
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
if (segmentTracker.getLatestLocalFileNameLengthMap().containsKey(file)) {
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
} else {
logger.warn("onSuccess {} missing from segment tracker", file);
}
segmentTracker.addToLatestUploadedFiles(file);
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
if (segmentTracker.getLatestLocalFileNameLengthMap().containsKey(file)) {
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
} else {
logger.warn("onFailure {} missing from segment tracker", file);
}
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos;
Expand Down Expand Up @@ -149,6 +150,7 @@ public void testComputeTimeLagOnUpdate() throws InterruptedException {
Thread.sleep(1);
transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());

transferTracker.updateLatestLocalFileNameLengthMap(List.of("test"), k -> 1L);
// Sleep for 100ms and then the lag should be within 100ms +/- 20ms
Thread.sleep(100);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - 100) <= 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -100,6 +101,8 @@ public void testValidateSegmentUploadLag() throws InterruptedException {
while (currentTimeMsUsingSystemNanos() - localRefreshTimeMs <= 20 * avg) {
Thread.sleep((long) (4 * avg));
}

pressureTracker.updateLatestLocalFileNameLengthMap(List.of("test"), k -> 1L);
Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
String regex = "^rejected execution on primary shard:\\[index]\\[0] due to remote segments lagging behind "
+ "local segments.time_lag:[0-9]{2,3} ms dynamic_time_lag_threshold:95\\.0 ms$";
Expand Down

0 comments on commit ee651e7

Please sign in to comment.