Skip to content

Commit

Permalink
add a flag for retry info and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 17, 2023
1 parent 12598a4 commit 211c0be
Show file tree
Hide file tree
Showing 6 changed files with 470 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -484,9 +485,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
retrying1, innerSettings, clientContext);
ServerStreamingCallable<ReadRowsRequest, RowT> retrying2;
if (settings.getEnableRetryInfo()) {
retrying2 =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
retrying1, innerSettings, clientContext);
} else {
retrying2 = Callables.retrying(retrying1, innerSettings, clientContext);
}

return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
Expand Down Expand Up @@ -568,9 +574,15 @@ public Map<String, String> extract(
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable;
if (settings.getEnableRetryInfo()) {
retryable =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
} else {
retryable =
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
}

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -608,9 +620,15 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
UnaryCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.mutateRowSettings(), clientContext);
UnaryCallable<MutateRowRequest, MutateRowResponse> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.mutateRowSettings(), clientContext);
} else {
retrying =
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
}

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -763,11 +781,18 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
RetryAlgorithm<Void> retryAlgorithm;
ExponentialRetryAlgorithm exponentialRetryAlgorithm =
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock());
if (settings.getEnableRetryInfo()) {
retryAlgorithm =
new RetryAlgorithm<>(new RetryInfoRetryAlgorithm<>(), exponentialRetryAlgorithm);
} else {
retryAlgorithm =
new RetryAlgorithm<>(new ApiResultRetryAlgorithm<>(), exponentialRetryAlgorithm);
}

RetryingExecutorWithContext<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

Expand Down Expand Up @@ -812,10 +837,18 @@ public Map<String, String> extract(
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying;
if (settings.getEnableRetryInfo()) {

retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);

} else {
retrying =
Callables.retrying(
withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
}
return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
}
Expand Down Expand Up @@ -854,9 +887,16 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
} else {
retrying =
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
}

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -935,8 +975,14 @@ public Map<String, String> extract(
ServerStreamingCallable<String, ByteStringRange> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
ServerStreamingCallable<String, ByteStringRange> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, innerSettings, clientContext);
} else {
retrying = Callables.retrying(withBigtableTracer, innerSettings, clientContext);
}

SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
ServerStreamingCallable<String, ByteStringRange> traced =
Expand Down Expand Up @@ -1013,8 +1059,16 @@ public Map<String, String> extract(
ServerStreamingCallable<ReadChangeStreamRequest, ChangeStreamRecordT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<ReadChangeStreamRequest, ChangeStreamRecordT> readChangeStreamCallable =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
ServerStreamingCallable<ReadChangeStreamRequest, ChangeStreamRecordT> readChangeStreamCallable;

if (settings.getEnableRetryInfo()) {
readChangeStreamCallable =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
withBigtableTracer, innerSettings, clientContext);
} else {
readChangeStreamCallable =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
}

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT>
readChangeStreamUserCallable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRetryInfo;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -252,6 +253,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRetryInfo = builder.enableRetryInfo;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -313,6 +315,14 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on server
* returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
public boolean getEnableRetryInfo() {
return enableRetryInfo;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -595,6 +605,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private Map<String, String> jwtAudienceMapping;
private boolean enableRetryInfo;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -627,6 +638,7 @@ private Builder() {
primedTableIds = ImmutableList.of();
jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRetryInfo = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
Expand Down Expand Up @@ -745,6 +757,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRetryInfo = settings.enableRetryInfo;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -893,6 +906,23 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Sets if RetryInfo is enabled. If true, client bases retry decision and back off time on
* server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
public Builder setEnableRetryInfo(boolean enableRetryInfo) {
this.enableRetryInfo = enableRetryInfo;
return this;
}

/**
* Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on
* server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
public boolean getEnableRetryInfo() {
return enableRetryInfo;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1019,6 +1049,7 @@ public String toString() {
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRetryInfo", enableRetryInfo)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Google LLC
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
Expand Down
Loading

0 comments on commit 211c0be

Please sign in to comment.