Skip to content

Commit

Permalink
Incorporate PR review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ashking94 committed Oct 21, 2023
1 parent c691e08 commit da539ef
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -66,6 +67,12 @@ public class RemoteSegmentTransferTracker extends RemoteTransferTracker {
*/
private volatile long remoteRefreshTimeMs;

/**
* This is the time of first local refresh after the last successful remote refresh. When the remote store is in
* sync with local refresh, this will be reset to -1.
*/
private volatile long remoteRefreshStartTimeMs = -1;

/**
* The refresh time(clock) of most recent remote refresh.
*/
Expand Down Expand Up @@ -136,7 +143,7 @@ public RemoteSegmentTransferTracker(
}

public static long currentTimeMsUsingSystemNanos() {
return System.nanoTime() / 1_000_000L;
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}

@Override
Expand Down Expand Up @@ -184,13 +191,17 @@ public void updateLocalRefreshTimeAndSeqNo() {
}

// Visible for testing
void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
synchronized void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ " < "
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs;
boolean isRemoteInSyncBeforeLocalRefresh = this.localRefreshTimeMs == this.remoteRefreshTimeMs;
this.localRefreshTimeMs = localRefreshTimeMs;
if (isRemoteInSyncBeforeLocalRefresh) {
this.remoteRefreshStartTimeMs = localRefreshTimeMs;
}
}

private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
Expand Down Expand Up @@ -219,13 +230,18 @@ long getRemoteRefreshClockTimeMs() {
return remoteRefreshClockTimeMs;
}

public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
assert remoteRefreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ remoteRefreshTimeMs
public synchronized void updateRemoteRefreshTimeMs(long refreshTimeMs) {
assert refreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ refreshTimeMs
+ " < "
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
this.remoteRefreshTimeMs = refreshTimeMs;
// When multiple refreshes have failed, there is a possibility that retry is ongoing while another refresh gets
// triggered. After the segments have been uploaded and before the below code runs, the updateLocalRefreshTimeAndSeqNo
// method is triggered, which will update the local localRefreshTimeMs. Now, the lag would basically become the
// time since the last refresh happened locally.
this.remoteRefreshStartTimeMs = refreshTimeMs == this.localRefreshTimeMs ? -1 : this.localRefreshTimeMs;
}

public void updateRemoteRefreshClockTimeMs(long remoteRefreshClockTimeMs) {
Expand All @@ -242,10 +258,9 @@ public long getRefreshSeqNoLag() {

public long getTimeMsLag() {
if (remoteRefreshTimeMs == localRefreshTimeMs) {
logger.info("remoteRefreshTimeMs={} localRefreshTimeMs={}", remoteRefreshTimeMs, localRefreshTimeMs);
return 0;
}
return currentTimeMsUsingSystemNanos() - localRefreshTimeMs;
return currentTimeMsUsingSystemNanos() - remoteRefreshStartTimeMs;
}

public long getBytesLag() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,14 @@ private TimeLagValidator(RemoteStorePressureSettings pressureSettings) {
@Override
public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) {
if (pressureTracker.getRefreshSeqNoLag() <= 1) {
logger.info("not ready");
return true;
}
if (pressureTracker.isUploadTimeMovingAverageReady() == false) {
logger.info("upload time moving average is not ready");
return true;
}
long timeLag = pressureTracker.getTimeMsLag();
double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMovingAverage() * pressureSettings
.getUploadTimeLagVarianceFactor();
logger.info("timeLag={} dynamicTimeLagThreshold={}", timeLag, dynamicTimeLagThreshold);
return timeLag <= dynamicTimeLagThreshold;
}

Expand Down

0 comments on commit da539ef

Please sign in to comment.