diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java index 88daad5464..2c71d6a528 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java @@ -38,8 +38,7 @@ class CookiesHolder { static CookiesHolder fromCallOptions(CallOptions options) { // CookiesHolder should be added by CookiesServerStreamingCallable and // CookiesUnaryCallable for most methods. However, methods like PingAndWarm - // or ReadChangeStream doesn't support routing cookie, in which case this - // method will return null. + // doesn't support routing cookie, in which case this will return null. return options.getOption(COOKIES_HOLDER_KEY); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index ff37ced33d..705b3027ed 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -938,7 +938,10 @@ public Map extract( ServerStreamingCallable traced = new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + ServerStreamingCallable withCookie = + new CookiesServerStreamingCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -1018,7 +1021,10 @@ public Map extract( new TracedServerStreamingCallable<>( readChangeStreamUserCallable, clientContext.getTracerFactory(), span); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + ServerStreamingCallable withCookie = + new CookiesServerStreamingCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java index 41182564ab..bad40e7423 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java @@ -22,10 +22,14 @@ import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.CheckAndMutateRowRequest; import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.bigtable.v2.ReadModifyWriteRowRequest; import com.google.bigtable.v2.ReadModifyWriteRowResponse; import com.google.bigtable.v2.ReadRowsRequest; @@ -39,6 +43,7 @@ import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.Mutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; @@ -52,7 +57,6 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -309,36 +313,46 @@ public void testNoCookieSucceedSampleRowKeys() { } @Test - public void testAllMethodsAreCalled() { + public void testAllMethodsAreCalled() throws InterruptedException { // This test ensures that all methods respect the retry cookie except for the ones that are // explicitly added to the methods list. It requires that any newly method is exercised in this // test. // This is enforced by introspecting grpc method descriptors. client.readRows(Query.create("fake-table")).iterator().hasNext(); + fakeService.count.set(0); client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v")); + fakeService.count.set(0); client.bulkMutateRows( BulkMutation.create("fake-table") .add(RowMutationEntry.create("key").setCell("cf", "q", "v"))); + fakeService.count.set(0); client.sampleRowKeys("fake-table"); + fakeService.count.set(0); client.checkAndMutateRow( ConditionalRowMutation.create("fake-table", "key") .then(Mutation.create().setCell("cf", "q", "v"))); + fakeService.count.set(0); client.readModifyWriteRow( ReadModifyWriteRow.create("fake-table", "key").append("cf", "q", "v")); + fakeService.count.set(0); + client.generateInitialChangeStreamPartitions("fake-table").iterator().hasNext(); + + fakeService.count.set(0); + client.readChangeStream(ReadChangeStreamQuery.create("fake-table")).iterator().hasNext(); + Set expected = BigtableGrpc.getServiceDescriptor().getMethods().stream() .map(MethodDescriptor::getBareMethodName) .collect(Collectors.toSet()); // Exclude methods that are not supported by routing cookie - methods.addAll( - Arrays.asList("PingAndWarm", "GenerateInitialChangeStreamPartitions", "ReadChangeStream")); + methods.add("PingAndWarm"); assertThat(methods).containsExactlyElementsIn(expected); } @@ -443,6 +457,39 @@ public void readModifyWriteRow( responseObserver.onCompleted(); } + @Override + public void readChangeStream( + ReadChangeStreamRequest request, + StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "readChangeStream"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext( + ReadChangeStreamResponse.newBuilder() + .setCloseStream(ReadChangeStreamResponse.CloseStream.getDefaultInstance()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void generateInitialChangeStreamPartitions( + GenerateInitialChangeStreamPartitionsRequest request, + StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "generateInitialChangeStreamPartitions"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(GenerateInitialChangeStreamPartitionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + private void maybePopulateCookie(Metadata trailers, String label) { if (returnCookie) { trailers.put(ROUTING_COOKIE_1, label);