From 201e631f893b1edacdd5760c1d180b212dc9e38a Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 18 Dec 2023 18:14:15 -0500 Subject: [PATCH] feat: add a flag to add / remove routing cookie from callable chain (#2032) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Test and rollback plan: [go/cbt-routing-cookie-rollback](http://goto.google.com/cbt-routing-cookie-rollback) Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) - [ ] Rollback plan is reviewed and LGTMed Fixes # ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../bigtable/data/v2/stub/CookiesHolder.java | 8 +- .../data/v2/stub/EnhancedBigtableStub.java | 90 ++++--- .../v2/stub/EnhancedBigtableStubSettings.java | 35 +++ .../cloud/bigtable/data/v2/it/ReadIT.java | 2 + .../data/v2/stub/CookiesHolderTest.java | 224 ++++++++++++++++-- .../EnhancedBigtableStubSettingsTest.java | 55 ++++- .../v2/stub/EnhancedBigtableStubTest.java | 1 + 7 files changed, 351 insertions(+), 64 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java index 7d7ca6a029..7a153cfd5f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java @@ -55,14 +55,14 @@ Metadata injectCookiesInRequestHeaders(Metadata headers) { * COOKIE_KEY_PREFIX to cookies. Values in trailers will override the value set in initial * metadata for the same keys. */ - void extractCookiesFromMetadata(@Nullable Metadata trailers) { - if (trailers == null) { + void extractCookiesFromMetadata(@Nullable Metadata metadata) { + if (metadata == null) { return; } - for (String key : trailers.keys()) { + for (String key : metadata.keys()) { if (key.startsWith(COOKIE_KEY_PREFIX)) { Metadata.Key metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER); - String value = trailers.get(metadataKey); + String value = metadata.get(metadataKey); cookies.put(metadataKey, value); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 705b3027ed..f1339d083e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -37,6 +37,7 @@ import com.google.api.gax.rpc.RequestParamsExtractor; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.OpencensusTracerFactory; import com.google.api.gax.tracing.SpanName; @@ -185,11 +186,14 @@ public static EnhancedBigtableStubSettings finalizeSettings( // workaround JWT audience issues patchCredentials(builder); - // patch cookies interceptor - InstantiatingGrpcChannelProvider.Builder transportProvider = null; - if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) { - transportProvider = - ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder(); + InstantiatingGrpcChannelProvider.Builder transportProvider = + builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider + ? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder() + : null; + + if (builder.getEnableRoutingCookie() && transportProvider != null) { + // TODO: this also need to be added to BigtableClientFactory + // patch cookies interceptor transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor())); } @@ -371,11 +375,7 @@ public ServerStreamingCallable createReadRowsCallable( new TracedServerStreamingCallable<>( readRowsUserCallable, clientContext.getTracerFactory(), span); - // CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry - // attempts won't see a CookieHolder. - ServerStreamingCallable withCookie = new CookiesServerStreamingCallable<>(traced); - - return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -411,9 +411,7 @@ public UnaryCallable createReadRowCallable(RowAdapter new TracedUnaryCallable<>( firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); - UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); - - return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -485,7 +483,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { new ReadRowsRetryCompletedCallable<>(withBigtableTracer); ServerStreamingCallable retrying2 = - Callables.retrying(retrying1, innerSettings, clientContext); + withRetries(retrying1, innerSettings); return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } @@ -568,7 +566,7 @@ public Map extract( new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable> retryable = - Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext); + withRetries(withBigtableTracer, settings.sampleRowKeysSettings()); return createUserFacingUnaryCallable( methodName, new SampleRowKeysCallable(retryable, requestContext)); @@ -607,7 +605,7 @@ public Map extract(MutateRowRequest mutateRowRequest) { new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext); + withRetries(withBigtableTracer, settings.mutateRowSettings()); return createUserFacingUnaryCallable( methodName, new MutateRowCallable(retrying, requestContext)); @@ -631,11 +629,17 @@ public Map extract(MutateRowRequest mutateRowRequest) { private UnaryCallable createBulkMutateRowsCallable() { UnaryCallable baseCallable = createMutateRowsBaseCallable(); + UnaryCallable withCookie = baseCallable; + + if (settings.getEnableRoutingCookie()) { + withCookie = new CookiesUnaryCallable<>(baseCallable); + } + UnaryCallable flowControlCallable = null; if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) { flowControlCallable = new DynamicFlowControlCallable( - baseCallable, + withCookie, bulkMutationFlowController, bulkMutationDynamicFlowControlStats, settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(), @@ -643,7 +647,7 @@ private UnaryCallable createBulkMutateRowsCallable() { } UnaryCallable userFacing = new BulkMutateRowsUserFacingCallable( - flowControlCallable != null ? flowControlCallable : baseCallable, requestContext); + flowControlCallable != null ? flowControlCallable : withCookie, requestContext); SpanName spanName = getSpanName("MutateRows"); @@ -654,9 +658,7 @@ private UnaryCallable createBulkMutateRowsCallable() { new TracedUnaryCallable<>( tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName); - UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); - - return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -810,7 +812,7 @@ public Map extract( new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext); + withRetries(withBigtableTracer, settings.checkAndMutateRowSettings()); return createUserFacingUnaryCallable( methodName, new CheckAndMutateRowCallable(retrying, requestContext)); @@ -851,8 +853,7 @@ public Map extract(ReadModifyWriteRowRequest request) { new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying( - withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext); + withRetries(withBigtableTracer, settings.readModifyWriteRowSettings()); return createUserFacingUnaryCallable( methodName, new ReadModifyWriteRowCallable(retrying, requestContext)); @@ -932,16 +933,13 @@ public Map extract( new BigtableTracerStreamingCallable<>(watched); ServerStreamingCallable retrying = - Callables.retrying(withBigtableTracer, innerSettings, clientContext); + withRetries(withBigtableTracer, innerSettings); SpanName span = getSpanName("GenerateInitialChangeStreamPartitions"); ServerStreamingCallable traced = new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span); - ServerStreamingCallable withCookie = - new CookiesServerStreamingCallable<>(traced); - - return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -1010,7 +1008,7 @@ public Map extract( new BigtableTracerStreamingCallable<>(watched); ServerStreamingCallable readChangeStreamCallable = - Callables.retrying(withBigtableTracer, innerSettings, clientContext); + withRetries(withBigtableTracer, innerSettings); ServerStreamingCallable readChangeStreamUserCallable = @@ -1021,10 +1019,7 @@ public Map extract( new TracedServerStreamingCallable<>( readChangeStreamUserCallable, clientContext.getTracerFactory(), span); - ServerStreamingCallable withCookie = - new CookiesServerStreamingCallable<>(traced); - - return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -1037,11 +1032,7 @@ private UnaryCallable createUserFacin UnaryCallable traced = new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName)); - // CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry - // attempts won't see a CookieHolder. - UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); - - return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } private UnaryCallable createPingAndWarmCallable() { @@ -1062,6 +1053,27 @@ public Map extract(PingAndWarmRequest request) { Collections.emptySet()); return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext()); } + + private UnaryCallable withRetries( + UnaryCallable innerCallable, UnaryCallSettings unaryCallSettings) { + UnaryCallable retrying = + Callables.retrying(innerCallable, unaryCallSettings, clientContext); + if (settings.getEnableRoutingCookie()) { + return new CookiesUnaryCallable<>(retrying); + } + return retrying; + } + + private ServerStreamingCallable withRetries( + ServerStreamingCallable innerCallable, + ServerStreamingCallSettings serverStreamingCallSettings) { + ServerStreamingCallable retrying = + Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext); + if (settings.getEnableRoutingCookie()) { + return new CookiesServerStreamingCallable<>(retrying); + } + return retrying; + } // // diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index cffd9c85df..64f44bb52f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.stub; +import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.gax.batching.BatchingCallSettings; import com.google.api.gax.batching.BatchingSettings; @@ -211,6 +212,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds; private final Map jwtAudienceMapping; + private final boolean enableRoutingCookie; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -252,6 +254,7 @@ private EnhancedBigtableStubSettings(Builder builder) { isRefreshingChannel = builder.isRefreshingChannel; primedTableIds = builder.primedTableIds; jwtAudienceMapping = builder.jwtAudienceMapping; + enableRoutingCookie = builder.enableRoutingCookie; // Per method settings. readRowsSettings = builder.readRowsSettings.build(); @@ -313,6 +316,15 @@ public Map getJwtAudienceMapping() { return jwtAudienceMapping; } + /** + * Gets if routing cookie is enabled. If true, client will retry a request with extra metadata + * server sent back. + */ + @BetaApi("Routing cookie is not currently stable and may change in the future") + public boolean getEnableRoutingCookie() { + return enableRoutingCookie; + } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() @@ -595,6 +607,7 @@ public static class Builder extends StubSettings.Builder primedTableIds; private Map jwtAudienceMapping; + private boolean enableRoutingCookie; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -627,6 +640,7 @@ private Builder() { primedTableIds = ImmutableList.of(); jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING; setCredentialsProvider(defaultCredentialsProviderBuilder().build()); + this.enableRoutingCookie = true; // Defaults provider BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder(); @@ -745,6 +759,7 @@ private Builder(EnhancedBigtableStubSettings settings) { isRefreshingChannel = settings.isRefreshingChannel; primedTableIds = settings.primedTableIds; jwtAudienceMapping = settings.jwtAudienceMapping; + enableRoutingCookie = settings.enableRoutingCookie; // Per method settings. readRowsSettings = settings.readRowsSettings.toBuilder(); @@ -893,6 +908,25 @@ public Map getJwtAudienceMapping() { return jwtAudienceMapping; } + /** + * Sets if routing cookie is enabled. If true, client will retry a request with extra metadata + * server sent back. + */ + @BetaApi("Routing cookie is not currently stable and may change in the future") + public Builder setEnableRoutingCookie(boolean enableRoutingCookie) { + this.enableRoutingCookie = enableRoutingCookie; + return this; + } + + /** + * Gets if routing cookie is enabled. If true, client will retry a request with extra metadata + * server sent back. + */ + @BetaApi("Routing cookie is not currently stable and may change in the future") + public boolean getEnableRoutingCookie() { + return enableRoutingCookie; + } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -1019,6 +1053,7 @@ public String toString() { .add("isRefreshingChannel", isRefreshingChannel) .add("primedTableIds", primedTableIds) .add("jwtAudienceMapping", jwtAudienceMapping) + .add("enableRoutingCookie", enableRoutingCookie) .add("readRowsSettings", readRowsSettings) .add("readRowSettings", readRowSettings) .add("sampleRowKeysSettings", sampleRowKeysSettings) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java index 0fa7eb10bd..6578dbad24 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java @@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -322,6 +323,7 @@ public void reversed() { } @Test + @Ignore("Test taking too long to run, ignore for now") public void reversedWithForcedResumption() throws IOException, InterruptedException { assume() .withMessage("reverse scans are not supported in the emulator") diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java index 5dac053523..44ceaa737d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java @@ -37,10 +37,12 @@ import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.SampleRowKeysRequest; import com.google.bigtable.v2.SampleRowKeysResponse; +import com.google.bigtable.v2.StreamContinuationToken; import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.Mutation; import com.google.cloud.bigtable.data.v2.models.Query; @@ -58,6 +60,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,12 +80,18 @@ public class CookiesHolderTest { Metadata.Key.of("x-goog-cbt-cookie-routing", Metadata.ASCII_STRING_MARSHALLER); private static final Metadata.Key ROUTING_COOKIE_2 = Metadata.Key.of("x-goog-cbt-cookie-random", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key ROUTING_COOKIE_HEADER = + Metadata.Key.of("x-goog-cbt-cookie-header", Metadata.ASCII_STRING_MARSHALLER); private static final Metadata.Key BAD_KEY = Metadata.Key.of("x-goog-cbt-not-cookie", Metadata.ASCII_STRING_MARSHALLER); + + private static final String testHeaderCookie = "header-cookie"; private static final String testCookie = "test-routing-cookie"; + private static final String routingCookie1Header = "should-be-overridden"; private Server server; private final FakeService fakeService = new FakeService(); + private BigtableDataSettings.Builder settings; private BigtableDataClient client; private final List serverMetadata = new ArrayList<>(); @@ -101,7 +110,16 @@ public ServerCall.Listener interceptCall( if (metadata.containsKey(ROUTING_COOKIE_1)) { methods.add(serverCall.getMethodDescriptor().getBareMethodName()); } - return serverCallHandler.startCall(serverCall, metadata); + return serverCallHandler.startCall( + new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata responseHeaders) { + responseHeaders.put(ROUTING_COOKIE_HEADER, testHeaderCookie); + responseHeaders.put(ROUTING_COOKIE_1, routingCookie1Header); + super.sendHeaders(responseHeaders); + } + }, + metadata); } }; @@ -138,6 +156,8 @@ public ServerCall.Listener interceptCall( .build()) .setRetryableCodes(StatusCode.Code.UNAVAILABLE); + this.settings = settings; + client = BigtableDataClient.create(settings.build()); } @@ -161,7 +181,13 @@ public void testReadRows() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); assertThat(lastMetadata) - .containsAtLeast(ROUTING_COOKIE_1.name(), "readRows", ROUTING_COOKIE_2.name(), testCookie); + .containsAtLeast( + ROUTING_COOKIE_1.name(), + "readRows", + ROUTING_COOKIE_2.name(), + testCookie, + ROUTING_COOKIE_HEADER.name(), + testHeaderCookie); assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); serverMetadata.clear(); @@ -177,7 +203,13 @@ public void testReadRow() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); assertThat(lastMetadata) - .containsAtLeast(ROUTING_COOKIE_1.name(), "readRows", ROUTING_COOKIE_2.name(), testCookie); + .containsAtLeast( + ROUTING_COOKIE_1.name(), + "readRows", + ROUTING_COOKIE_2.name(), + testCookie, + ROUTING_COOKIE_HEADER.name(), + testHeaderCookie); assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); serverMetadata.clear(); @@ -196,7 +228,12 @@ public void testMutateRows() { assertThat(lastMetadata) .containsAtLeast( - ROUTING_COOKIE_1.name(), "mutateRows", ROUTING_COOKIE_2.name(), testCookie); + ROUTING_COOKIE_1.name(), + "mutateRows", + ROUTING_COOKIE_2.name(), + testCookie, + ROUTING_COOKIE_HEADER.name(), + testHeaderCookie); assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); serverMetadata.clear(); @@ -212,7 +249,13 @@ public void testMutateRow() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); assertThat(lastMetadata) - .containsAtLeast(ROUTING_COOKIE_1.name(), "mutateRow", ROUTING_COOKIE_2.name(), testCookie); + .containsAtLeast( + ROUTING_COOKIE_1.name(), + "mutateRow", + ROUTING_COOKIE_2.name(), + testCookie, + ROUTING_COOKIE_HEADER.name(), + testHeaderCookie); assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); serverMetadata.clear(); @@ -230,7 +273,58 @@ public void testSampleRowKeys() { assertThat(lastMetadata) .containsAtLeast( - ROUTING_COOKIE_1.name(), "sampleRowKeys", ROUTING_COOKIE_2.name(), testCookie); + ROUTING_COOKIE_1.name(), + "sampleRowKeys", + ROUTING_COOKIE_2.name(), + testCookie, + ROUTING_COOKIE_HEADER.name(), + testHeaderCookie); + assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testReadChangeStream() { + for (ChangeStreamRecord record : + client.readChangeStream(ReadChangeStreamQuery.create("table"))) {} + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .containsAtLeast( + ROUTING_COOKIE_1.name(), + "readChangeStream", + ROUTING_COOKIE_2.name(), + testCookie, + ROUTING_COOKIE_HEADER.name(), + testHeaderCookie); + assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testGenerateInitialChangeStreamPartition() { + client.generateInitialChangeStreamPartitions("table").iterator().hasNext(); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + // generateInitialChangeStreamPartition uses SimpleStreamResumptionStrategy which means + // it can't resume from the middle of the stream. So we are not able to send a header + // for error responses. + assertThat(lastMetadata) + .containsAtLeast( + ROUTING_COOKIE_1.name(), + "generateInitialChangeStreamPartitions", + ROUTING_COOKIE_2.name(), + testCookie); assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); serverMetadata.clear(); @@ -247,7 +341,9 @@ public void testNoCookieSucceedReadRows() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); - assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name()); + assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name()); + // Should contain initial metadata + assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header); serverMetadata.clear(); } @@ -263,8 +359,8 @@ public void testNoCookieSucceedReadRow() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); - assertThat(lastMetadata) - .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header); serverMetadata.clear(); } @@ -282,8 +378,8 @@ public void testNoCookieSucceedMutateRows() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); - assertThat(lastMetadata) - .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header); serverMetadata.clear(); } @@ -299,8 +395,8 @@ public void testNoCookieSucceedMutateRow() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); - assertThat(lastMetadata) - .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header); serverMetadata.clear(); } @@ -316,6 +412,43 @@ public void testNoCookieSucceedSampleRowKeys() { Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header); + + serverMetadata.clear(); + } + + @Test + public void testNoCookieSucceedReadChangeStream() { + fakeService.returnCookie = false; + + for (ChangeStreamRecord record : + client.readChangeStream(ReadChangeStreamQuery.create("table"))) {} + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name()); + assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header); + + serverMetadata.clear(); + + serverMetadata.clear(); + } + + @Test + public void testNoCookieSucceedGenerateInitialChangeStreamParition() { + fakeService.returnCookie = false; + + client.generateInitialChangeStreamPartitions("table").iterator().hasNext(); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + assertThat(lastMetadata) .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); @@ -379,7 +512,7 @@ public void sendHeaders(Metadata headers) { } @Test - public void testAllMethodsAreCalled() throws InterruptedException { + public void testAllMethodsAreCalled() { // This test ensures that all methods respect the retry cookie except for the ones that are // explicitly added to the methods list. It requires that any newly method is exercised in this // test. This is enforced by introspecting grpc method descriptors. @@ -409,7 +542,8 @@ public void testAllMethodsAreCalled() throws InterruptedException { client.generateInitialChangeStreamPartitions("fake-table").iterator().hasNext(); fakeService.count.set(0); - client.readChangeStream(ReadChangeStreamQuery.create("fake-table")).iterator().hasNext(); + for (ChangeStreamRecord record : + client.readChangeStream(ReadChangeStreamQuery.create("fake-table"))) {} Set expected = BigtableGrpc.getServiceDescriptor().getMethods().stream() @@ -422,6 +556,55 @@ public void testAllMethodsAreCalled() throws InterruptedException { assertThat(methods).containsExactlyElementsIn(expected); } + @Test + public void testDisableRoutingCookie() throws IOException { + // This test disables routing cookie in the client settings and ensures that none of the routing + // cookie + // is added. + settings.stubSettings().setEnableRoutingCookie(false); + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + client.readRows(Query.create("fake-table")).iterator().hasNext(); + assertThat(fakeService.count.get()).isEqualTo(2); + fakeService.count.set(0); + + client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v")); + assertThat(fakeService.count.get()).isEqualTo(2); + fakeService.count.set(0); + + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("key").setCell("cf", "q", "v"))); + assertThat(fakeService.count.get()).isEqualTo(2); + fakeService.count.set(0); + + client.sampleRowKeys("fake-table"); + assertThat(fakeService.count.get()).isEqualTo(2); + fakeService.count.set(0); + + client.checkAndMutateRow( + ConditionalRowMutation.create("fake-table", "key") + .then(Mutation.create().setCell("cf", "q", "v"))); + assertThat(fakeService.count.get()).isEqualTo(2); + fakeService.count.set(0); + + client.readModifyWriteRow( + ReadModifyWriteRow.create("fake-table", "key").append("cf", "q", "v")); + assertThat(fakeService.count.get()).isEqualTo(2); + fakeService.count.set(0); + + client.generateInitialChangeStreamPartitions("fake-table").iterator().hasNext(); + assertThat(fakeService.count.get()).isEqualTo(2); + fakeService.count.set(0); + + for (ChangeStreamRecord record : + client.readChangeStream(ReadChangeStreamQuery.create("fake-table"))) {} + + assertThat(fakeService.count.get()).isEqualTo(2); + + assertThat(methods).isEmpty(); + } + } + static class FakeService extends BigtableGrpc.BigtableImplBase { private boolean returnCookie = true; @@ -448,6 +631,7 @@ public void mutateRow( if (count.getAndIncrement() < 1) { Metadata trailers = new Metadata(); maybePopulateCookie(trailers, "mutateRow"); + responseObserver.onNext(MutateRowResponse.getDefaultInstance()); StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); responseObserver.onError(exception); return; @@ -462,6 +646,7 @@ public void mutateRows( if (count.getAndIncrement() < 1) { Metadata trailers = new Metadata(); maybePopulateCookie(trailers, "mutateRows"); + responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); responseObserver.onError(exception); return; @@ -479,6 +664,7 @@ public void sampleRowKeys( if (count.getAndIncrement() < 1) { Metadata trailers = new Metadata(); maybePopulateCookie(trailers, "sampleRowKeys"); + responseObserver.onNext(SampleRowKeysResponse.getDefaultInstance()); StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); responseObserver.onError(exception); return; @@ -524,6 +710,14 @@ public void readChangeStream( if (count.getAndIncrement() < 1) { Metadata trailers = new Metadata(); maybePopulateCookie(trailers, "readChangeStream"); + responseObserver.onNext( + ReadChangeStreamResponse.newBuilder() + .setHeartbeat( + ReadChangeStreamResponse.Heartbeat.newBuilder() + .setContinuationToken( + StreamContinuationToken.newBuilder().setToken("a").build()) + .build()) + .build()); StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); responseObserver.onError(exception); return; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index fbd6442e0c..2c05ca4ee8 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -77,6 +77,7 @@ public void settingsAreNotLostTest() { CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); Duration watchdogInterval = Duration.ofSeconds(12); + boolean enableRoutingCookie = false; EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() @@ -87,7 +88,8 @@ public void settingsAreNotLostTest() { .setEndpoint(endpoint) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) - .setStreamWatchdogCheckInterval(watchdogInterval); + .setStreamWatchdogCheckInterval(watchdogInterval) + .setEnableRoutingCookie(enableRoutingCookie); verifyBuilder( builder, @@ -98,7 +100,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + enableRoutingCookie); verifySettings( builder.build(), projectId, @@ -108,7 +111,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + enableRoutingCookie); verifyBuilder( builder.build().toBuilder(), projectId, @@ -118,7 +122,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + enableRoutingCookie); } private void verifyBuilder( @@ -130,7 +135,8 @@ private void verifyBuilder( String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, - Duration watchdogInterval) { + Duration watchdogInterval, + boolean enableRoutingCookie) { assertThat(builder.getProjectId()).isEqualTo(projectId); assertThat(builder.getInstanceId()).isEqualTo(instanceId); assertThat(builder.getAppProfileId()).isEqualTo(appProfileId); @@ -139,6 +145,7 @@ private void verifyBuilder( assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(builder.getEnableRoutingCookie()).isEqualTo(enableRoutingCookie); } private void verifySettings( @@ -150,7 +157,8 @@ private void verifySettings( String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, - Duration watchdogInterval) { + Duration watchdogInterval, + boolean enableRoutingCookie) { assertThat(settings.getProjectId()).isEqualTo(projectId); assertThat(settings.getInstanceId()).isEqualTo(instanceId); assertThat(settings.getAppProfileId()).isEqualTo(appProfileId); @@ -159,6 +167,7 @@ private void verifySettings( assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(settings.getEnableRoutingCookie()).isEqualTo(enableRoutingCookie); } @Test @@ -781,6 +790,39 @@ public void isRefreshingChannelFalseValueTest() { assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse(); } + @Test + public void routingCookieIsEnabled() throws IOException { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Mockito.when(credentialsProvider.getCredentials()).thenReturn(new FakeCredentials()); + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setCredentialsProvider(credentialsProvider); + assertThat(builder.getEnableRoutingCookie()).isTrue(); + assertThat(builder.build().getEnableRoutingCookie()).isTrue(); + assertThat(builder.build().toBuilder().getEnableRoutingCookie()).isTrue(); + } + + @Test + public void routingCookieFalseValueSet() throws IOException { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Mockito.when(credentialsProvider.getCredentials()).thenReturn(new FakeCredentials()); + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setEnableRoutingCookie(false) + .setCredentialsProvider(credentialsProvider); + assertThat(builder.getEnableRoutingCookie()).isFalse(); + assertThat(builder.build().getEnableRoutingCookie()).isFalse(); + assertThat(builder.build().toBuilder().getEnableRoutingCookie()).isFalse(); + } + static final String[] SETTINGS_LIST = { "projectId", "instanceId", @@ -788,6 +830,7 @@ public void isRefreshingChannelFalseValueTest() { "isRefreshingChannel", "primedTableIds", "jwtAudienceMapping", + "enableRoutingCookie", "readRowsSettings", "readRowSettings", "sampleRowKeysSettings", diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index e36eb1a8a9..eacf145bcb 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -549,6 +549,7 @@ public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception { FeatureFlags featureFlags = FeatureFlags.parseFrom(decodedFlags); assertThat(featureFlags.getMutateRowsRateLimit()).isFalse(); assertThat(featureFlags.getMutateRowsRateLimit2()).isFalse(); + stub.close(); } @Test