Skip to content

Commit

Permalink
add cookie to readChangeStream
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Nov 16, 2023
1 parent 91989cd commit 1422950
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ class CookiesHolder {
static CookiesHolder fromCallOptions(CallOptions options) {
// CookiesHolder should be added by CookiesServerStreamingCallable and
// CookiesUnaryCallable for most methods. However, methods like PingAndWarm
// or ReadChangeStream doesn't support routing cookie, in which case this
// method will return null.
// doesn't support routing cookie, in which case this will return null.
return options.getOption(COOKIES_HOLDER_KEY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,10 @@ public Map<String, String> extract(
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
ServerStreamingCallable<String, ByteStringRange> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1018,7 +1021,10 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand All @@ -39,6 +43,7 @@
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Mutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
Expand All @@ -52,7 +57,6 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -309,36 +313,46 @@ public void testNoCookieSucceedSampleRowKeys() {
}

@Test
public void testAllMethodsAreCalled() {
public void testAllMethodsAreCalled() throws InterruptedException {
// This test ensures that all methods respect the retry cookie except for the ones that are
// explicitly added to the methods list. It requires that any newly method is exercised in this
// test.
// This is enforced by introspecting grpc method descriptors.
client.readRows(Query.create("fake-table")).iterator().hasNext();

fakeService.count.set(0);
client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v"));

fakeService.count.set(0);
client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("key").setCell("cf", "q", "v")));

fakeService.count.set(0);
client.sampleRowKeys("fake-table");

fakeService.count.set(0);
client.checkAndMutateRow(
ConditionalRowMutation.create("fake-table", "key")
.then(Mutation.create().setCell("cf", "q", "v")));

fakeService.count.set(0);
client.readModifyWriteRow(
ReadModifyWriteRow.create("fake-table", "key").append("cf", "q", "v"));

fakeService.count.set(0);
client.generateInitialChangeStreamPartitions("fake-table").iterator().hasNext();

fakeService.count.set(0);
client.readChangeStream(ReadChangeStreamQuery.create("fake-table")).iterator().hasNext();

Set<String> expected =
BigtableGrpc.getServiceDescriptor().getMethods().stream()
.map(MethodDescriptor::getBareMethodName)
.collect(Collectors.toSet());

// Exclude methods that are not supported by routing cookie
methods.addAll(
Arrays.asList("PingAndWarm", "GenerateInitialChangeStreamPartitions", "ReadChangeStream"));
methods.add("PingAndWarm");

assertThat(methods).containsExactlyElementsIn(expected);
}
Expand Down Expand Up @@ -443,6 +457,39 @@ public void readModifyWriteRow(
responseObserver.onCompleted();
}

@Override
public void readChangeStream(
ReadChangeStreamRequest request,
StreamObserver<ReadChangeStreamResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
maybePopulateCookie(trailers, "readChangeStream");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
}
responseObserver.onNext(
ReadChangeStreamResponse.newBuilder()
.setCloseStream(ReadChangeStreamResponse.CloseStream.getDefaultInstance())
.build());
responseObserver.onCompleted();
}

@Override
public void generateInitialChangeStreamPartitions(
GenerateInitialChangeStreamPartitionsRequest request,
StreamObserver<GenerateInitialChangeStreamPartitionsResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
maybePopulateCookie(trailers, "generateInitialChangeStreamPartitions");
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
}
responseObserver.onNext(GenerateInitialChangeStreamPartitionsResponse.getDefaultInstance());
responseObserver.onCompleted();
}

private void maybePopulateCookie(Metadata trailers, String label) {
if (returnCookie) {
trailers.put(ROUTING_COOKIE_1, label);
Expand Down

0 comments on commit 1422950

Please sign in to comment.