From 211c0beb7f118849bb9d5f23af0cd0750d47b8db Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Sun, 17 Dec 2023 15:24:44 -0500 Subject: [PATCH] add a flag for retry info and tests --- .../data/v2/stub/EnhancedBigtableStub.java | 102 ++++-- .../v2/stub/EnhancedBigtableStubSettings.java | 31 ++ .../bigtable/gaxx/retrying/ApiException.java | 2 +- .../retrying/RetryInfoRetryAlgorithm.java | 15 + .../EnhancedBigtableStubSettingsTest.java | 55 +++- .../bigtable/data/v2/stub/RetryInfoTest.java | 298 +++++++++++++++++- 6 files changed, 470 insertions(+), 33 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 4a20a5799f..88a4cfdc16 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -106,6 +106,7 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; +import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -484,9 +485,14 @@ public Map extract(ReadRowsRequest readRowsRequest) { ServerStreamingCallable retrying1 = new ReadRowsRetryCompletedCallable<>(withBigtableTracer); - ServerStreamingCallable retrying2 = - com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( - retrying1, innerSettings, clientContext); + ServerStreamingCallable retrying2; + if (settings.getEnableRetryInfo()) { + retrying2 = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + retrying1, innerSettings, clientContext); + } else { + retrying2 = Callables.retrying(retrying1, innerSettings, clientContext); + } return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } @@ -568,9 +574,15 @@ public Map extract( UnaryCallable> withBigtableTracer = new BigtableTracerUnaryCallable<>(withStatsHeaders); - UnaryCallable> retryable = - com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( - withBigtableTracer, settings.sampleRowKeysSettings(), clientContext); + UnaryCallable> retryable; + if (settings.getEnableRetryInfo()) { + retryable = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + withBigtableTracer, settings.sampleRowKeysSettings(), clientContext); + } else { + retryable = + Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext); + } return createUserFacingUnaryCallable( methodName, new SampleRowKeysCallable(retryable, requestContext)); @@ -608,9 +620,15 @@ public Map extract(MutateRowRequest mutateRowRequest) { UnaryCallable withBigtableTracer = new BigtableTracerUnaryCallable<>(withStatsHeaders); - UnaryCallable retrying = - com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( - withBigtableTracer, settings.mutateRowSettings(), clientContext); + UnaryCallable retrying; + if (settings.getEnableRetryInfo()) { + retrying = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + withBigtableTracer, settings.mutateRowSettings(), clientContext); + } else { + retrying = + Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext); + } return createUserFacingUnaryCallable( methodName, new MutateRowCallable(retrying, requestContext)); @@ -763,11 +781,18 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(convertException); - RetryAlgorithm retryAlgorithm = - new RetryAlgorithm<>( - new RetryInfoRetryAlgorithm<>(), - new ExponentialRetryAlgorithm( - settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock())); + RetryAlgorithm retryAlgorithm; + ExponentialRetryAlgorithm exponentialRetryAlgorithm = + new ExponentialRetryAlgorithm( + settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()); + if (settings.getEnableRetryInfo()) { + retryAlgorithm = + new RetryAlgorithm<>(new RetryInfoRetryAlgorithm<>(), exponentialRetryAlgorithm); + } else { + retryAlgorithm = + new RetryAlgorithm<>(new ApiResultRetryAlgorithm<>(), exponentialRetryAlgorithm); + } + RetryingExecutorWithContext retryingExecutor = new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); @@ -812,10 +837,18 @@ public Map extract( UnaryCallable withBigtableTracer = new BigtableTracerUnaryCallable<>(withStatsHeaders); - UnaryCallable retrying = - com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( - withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext); + UnaryCallable retrying; + if (settings.getEnableRetryInfo()) { + + retrying = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext); + } else { + retrying = + Callables.retrying( + withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext); + } return createUserFacingUnaryCallable( methodName, new CheckAndMutateRowCallable(retrying, requestContext)); } @@ -854,9 +887,16 @@ public Map extract(ReadModifyWriteRowRequest request) { UnaryCallable withBigtableTracer = new BigtableTracerUnaryCallable<>(withStatsHeaders); - UnaryCallable retrying = - com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( - withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext); + UnaryCallable retrying; + if (settings.getEnableRetryInfo()) { + retrying = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext); + } else { + retrying = + Callables.retrying( + withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext); + } return createUserFacingUnaryCallable( methodName, new ReadModifyWriteRowCallable(retrying, requestContext)); @@ -935,8 +975,14 @@ public Map extract( ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(watched); - ServerStreamingCallable retrying = - Callables.retrying(withBigtableTracer, innerSettings, clientContext); + ServerStreamingCallable retrying; + if (settings.getEnableRetryInfo()) { + retrying = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + withBigtableTracer, innerSettings, clientContext); + } else { + retrying = Callables.retrying(withBigtableTracer, innerSettings, clientContext); + } SpanName span = getSpanName("GenerateInitialChangeStreamPartitions"); ServerStreamingCallable traced = @@ -1013,8 +1059,16 @@ public Map extract( ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(watched); - ServerStreamingCallable readChangeStreamCallable = - Callables.retrying(withBigtableTracer, innerSettings, clientContext); + ServerStreamingCallable readChangeStreamCallable; + + if (settings.getEnableRetryInfo()) { + readChangeStreamCallable = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + withBigtableTracer, innerSettings, clientContext); + } else { + readChangeStreamCallable = + Callables.retrying(withBigtableTracer, innerSettings, clientContext); + } ServerStreamingCallable readChangeStreamUserCallable = diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index cffd9c85df..142f83826c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -211,6 +211,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds; private final Map jwtAudienceMapping; + private final boolean enableRetryInfo; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -252,6 +253,7 @@ private EnhancedBigtableStubSettings(Builder builder) { isRefreshingChannel = builder.isRefreshingChannel; primedTableIds = builder.primedTableIds; jwtAudienceMapping = builder.jwtAudienceMapping; + enableRetryInfo = builder.enableRetryInfo; // Per method settings. readRowsSettings = builder.readRowsSettings.build(); @@ -313,6 +315,14 @@ public Map getJwtAudienceMapping() { return jwtAudienceMapping; } + /** + * Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on server + * returned RetryInfo value. Otherwise, client uses {@link RetrySettings}. + */ + public boolean getEnableRetryInfo() { + return enableRetryInfo; + } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() @@ -595,6 +605,7 @@ public static class Builder extends StubSettings.Builder primedTableIds; private Map jwtAudienceMapping; + private boolean enableRetryInfo; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -627,6 +638,7 @@ private Builder() { primedTableIds = ImmutableList.of(); jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING; setCredentialsProvider(defaultCredentialsProviderBuilder().build()); + this.enableRetryInfo = true; // Defaults provider BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder(); @@ -745,6 +757,7 @@ private Builder(EnhancedBigtableStubSettings settings) { isRefreshingChannel = settings.isRefreshingChannel; primedTableIds = settings.primedTableIds; jwtAudienceMapping = settings.jwtAudienceMapping; + enableRetryInfo = settings.enableRetryInfo; // Per method settings. readRowsSettings = settings.readRowsSettings.toBuilder(); @@ -893,6 +906,23 @@ public Map getJwtAudienceMapping() { return jwtAudienceMapping; } + /** + * Sets if RetryInfo is enabled. If true, client bases retry decision and back off time on + * server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}. + */ + public Builder setEnableRetryInfo(boolean enableRetryInfo) { + this.enableRetryInfo = enableRetryInfo; + return this; + } + + /** + * Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on + * server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}. + */ + public boolean getEnableRetryInfo() { + return enableRetryInfo; + } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -1019,6 +1049,7 @@ public String toString() { .add("isRefreshingChannel", isRefreshingChannel) .add("primedTableIds", primedTableIds) .add("jwtAudienceMapping", jwtAudienceMapping) + .add("enableRetryInfo", enableRetryInfo) .add("readRowsSettings", readRowsSettings) .add("readRowSettings", readRowSettings) .add("sampleRowKeysSettings", sampleRowKeysSettings) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiException.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiException.java index d2f676c33b..b13452934d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiException.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiException.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java index e18898559c..1fde962903 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java @@ -1,3 +1,18 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.bigtable.gaxx.retrying; import com.google.api.core.InternalApi; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index fbd6442e0c..6ffeeb2cfa 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -77,6 +77,7 @@ public void settingsAreNotLostTest() { CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); Duration watchdogInterval = Duration.ofSeconds(12); + boolean enableRetryInfo = false; EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() @@ -87,7 +88,8 @@ public void settingsAreNotLostTest() { .setEndpoint(endpoint) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) - .setStreamWatchdogCheckInterval(watchdogInterval); + .setStreamWatchdogCheckInterval(watchdogInterval) + .setEnableRetryInfo(enableRetryInfo); verifyBuilder( builder, @@ -98,7 +100,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + enableRetryInfo); verifySettings( builder.build(), projectId, @@ -108,7 +111,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + enableRetryInfo); verifyBuilder( builder.build().toBuilder(), projectId, @@ -118,7 +122,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + enableRetryInfo); } private void verifyBuilder( @@ -130,7 +135,8 @@ private void verifyBuilder( String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, - Duration watchdogInterval) { + Duration watchdogInterval, + boolean enableRetryInfo) { assertThat(builder.getProjectId()).isEqualTo(projectId); assertThat(builder.getInstanceId()).isEqualTo(instanceId); assertThat(builder.getAppProfileId()).isEqualTo(appProfileId); @@ -139,6 +145,7 @@ private void verifyBuilder( assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(builder.getEnableRetryInfo()).isEqualTo(enableRetryInfo); } private void verifySettings( @@ -150,7 +157,8 @@ private void verifySettings( String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, - Duration watchdogInterval) { + Duration watchdogInterval, + boolean enableRetryInfo) { assertThat(settings.getProjectId()).isEqualTo(projectId); assertThat(settings.getInstanceId()).isEqualTo(instanceId); assertThat(settings.getAppProfileId()).isEqualTo(appProfileId); @@ -159,6 +167,7 @@ private void verifySettings( assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(settings.getEnableRetryInfo()).isEqualTo(enableRetryInfo); } @Test @@ -781,6 +790,39 @@ public void isRefreshingChannelFalseValueTest() { assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse(); } + @Test + public void enableRetryInfoDefaultValueTest() throws IOException { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Mockito.when(credentialsProvider.getCredentials()).thenReturn(new FakeCredentials()); + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setCredentialsProvider(credentialsProvider); + assertThat(builder.getEnableRetryInfo()).isTrue(); + assertThat(builder.build().getEnableRetryInfo()).isTrue(); + assertThat(builder.build().toBuilder().getEnableRetryInfo()).isTrue(); + } + + @Test + public void enableRetryInfoFalseValueTest() throws IOException { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Mockito.when(credentialsProvider.getCredentials()).thenReturn(new FakeCredentials()); + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setEnableRetryInfo(false) + .setCredentialsProvider(credentialsProvider); + assertThat(builder.getEnableRetryInfo()).isFalse(); + assertThat(builder.build().getEnableRetryInfo()).isFalse(); + assertThat(builder.build().toBuilder().getEnableRetryInfo()).isFalse(); + } + static final String[] SETTINGS_LIST = { "projectId", "instanceId", @@ -788,6 +830,7 @@ public void isRefreshingChannelFalseValueTest() { "isRefreshingChannel", "primedTableIds", "jwtAudienceMapping", + "enableRetryInfo", "readRowsSettings", "readRowSettings", "sampleRowKeysSettings", diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java index a853829e63..41037ae067 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java @@ -28,10 +28,14 @@ import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.CheckAndMutateRowRequest; import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.bigtable.v2.ReadModifyWriteRowRequest; import com.google.bigtable.v2.ReadModifyWriteRowResponse; import com.google.bigtable.v2.ReadRowsRequest; @@ -43,8 +47,10 @@ import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.Filters; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; import com.google.cloud.bigtable.data.v2.models.Mutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; @@ -73,6 +79,7 @@ public class RetryInfoTest { private FakeBigtableService service; private BigtableDataClient client; + private BigtableDataSettings.Builder settings; private AtomicInteger attemptCounter = new AtomicInteger(); private Duration delay = Duration.newBuilder().setSeconds(1).setNanos(0).build(); @@ -82,7 +89,7 @@ public void setUp() throws IOException { service = new FakeBigtableService(); serverRule.getServiceRegistry().addService(service); - BigtableDataSettings.Builder settings = + settings = BigtableDataSettings.newBuilder() .setProjectId("fake-project") .setInstanceId("fake-instance") @@ -122,6 +129,30 @@ public void testReadRowNonRetryableErrorWithRetryInfo() { assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); } + @Test + public void testReadRowDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readRow("table", "row"); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(java.time.Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.readRow("table", "row"); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + @Test public void testReadRows() { createRetryableExceptionWithDelay(delay); @@ -146,6 +177,30 @@ public void testReadRowsNonRetraybleErrorWithRetryInfo() { assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); } + @Test + public void testReadRowsDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readRows(Query.create("table")).iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(java.time.Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.readRows(Query.create("table")).iterator().hasNext(); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + @Test public void testMutateRows() { createRetryableExceptionWithDelay(delay); @@ -176,6 +231,35 @@ public void testMutateRowsNonRetryableErrorWithRetryInfo() { assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); } + @Test + public void testMutateRowsDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(java.time.Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))); + } catch (ApiException e) { + assertThat(((MutateRowsException) e).getFailedMutations().get(0).getError().getStatusCode()) + .isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + @Test public void testMutateRow() { createRetryableExceptionWithDelay(delay); @@ -200,6 +284,30 @@ public void testMutateRowNonRetryableErrorWithRetryInfo() { assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(1)); } + @Test + public void testMutateRowDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(java.time.Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + @Test public void testSampleRowKeys() { createRetryableExceptionWithDelay(delay); @@ -226,6 +334,30 @@ public void testSampleRowKeysNonRetryableErrorWithRetryInfo() { assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); } + @Test + public void testSampleRowKeysDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.sampleRowKeys("table"); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(java.time.Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.sampleRowKeys("table"); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + @Test public void testCheckAndMutateRow() { createRetryableExceptionWithDelay(delay); @@ -241,6 +373,24 @@ public void testCheckAndMutateRow() { assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); } + @Test + public void testCheckAndMutateDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.checkAndMutateRow( + ConditionalRowMutation.create("table", "key") + .condition(Filters.FILTERS.value().regex("old-value")) + .then(Mutation.create().setCell("cf", "q", "v"))); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + @Test public void testReadModifyWrite() { createRetryableExceptionWithDelay(delay); @@ -253,6 +403,117 @@ public void testReadModifyWrite() { assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); } + @Test + public void testReadModifyWriteDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.readModifyWriteRow(ReadModifyWriteRow.create("table", "row").append("cf", "q", "v")); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + + @Test + public void testReadChangeStream() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testReadChangeStreamNonRetryableErrorWithRetryInfo() { + createNonRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testReadChangeStreamDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(java.time.Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext(); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + + @Test + public void testGenerateInitialChangeStreamPartition() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.generateInitialChangeStreamPartitions("table").iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testGenerateInitialChangeStreamPartitionNonRetryableError() { + createNonRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.generateInitialChangeStreamPartitions("table").iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testGenerateInitialChangeStreamPartitionDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.generateInitialChangeStreamPartitions("table").iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(java.time.Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = createNonRetryableExceptionWithDelay(delay); + try { + client.generateInitialChangeStreamPartitions("table").iterator().hasNext(); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + private void createRetryableExceptionWithDelay(Duration delay) { Metadata trailers = new Metadata(); RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build(); @@ -267,7 +528,7 @@ private void createRetryableExceptionWithDelay(Duration delay) { service.expectations.add(exception); } - private void createNonRetryableExceptionWithDelay(Duration delay) { + private ApiException createNonRetryableExceptionWithDelay(Duration delay) { Metadata trailers = new Metadata(); RetryInfo retryInfo = RetryInfo.newBuilder() @@ -282,6 +543,8 @@ private void createNonRetryableExceptionWithDelay(Duration delay) { false); service.expectations.add(exception); + + return exception; } private class FakeBigtableService extends BigtableGrpc.BigtableImplBase { @@ -370,5 +633,36 @@ public void readModifyWriteRow( responseObserver.onError(expectedRpc); } } + + @Override + public void generateInitialChangeStreamPartitions( + GenerateInitialChangeStreamPartitionsRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(GenerateInitialChangeStreamPartitionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void readChangeStream( + ReadChangeStreamRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext( + ReadChangeStreamResponse.newBuilder() + .setCloseStream(ReadChangeStreamResponse.CloseStream.getDefaultInstance()) + .build()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } } }