diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go index ac4182ba0ebf..fa67eeb0cc74 100644 --- a/bigtable/bttest/inmem.go +++ b/bigtable/bttest/inmem.go @@ -42,6 +42,7 @@ import ( "net" "regexp" "sort" + "strconv" "strings" "sync" "time" @@ -1167,6 +1168,28 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co delete(r.families, fampre) } } + + if tbl.changeRetention != 0 { + mutsCopy := make([]*btpb.Mutation, len(muts)) + for i := range muts { + m := proto.Clone(muts[i]).(*btpb.Mutation) + if sc := m.GetSetCell(); sc != nil { + if sc.TimestampMicros == -1 { + sc.TimestampMicros = serverTime + } + } + mutsCopy[i] = m + } + tbl.changeMu.Lock() + tbl.changes = append(tbl.changes, &changeStreamEntry{ + rowKey: r.key, + commit: time.Unix(0, serverTime*1e3), + muts: mutsCopy, + }) + tbl.changeCond.Broadcast() + tbl.changeMu.Unlock() + } + return nil } @@ -1332,6 +1355,132 @@ func (s *server) SampleRowKeys(req *btpb.SampleRowKeysRequest, stream btpb.Bigta return err } +func (s *server) GenerateInitialChangeStreamPartitions(req *btpb.GenerateInitialChangeStreamPartitionsRequest, srv btpb.Bigtable_GenerateInitialChangeStreamPartitionsServer) error { + return srv.Send(&btpb.GenerateInitialChangeStreamPartitionsResponse{ + Partition: initialStreamPartition(), + }) +} + +// Always use a single stream partition that covers the entire keyspace. +func initialStreamPartition() *btpb.StreamPartition { + return &btpb.StreamPartition{ + RowRange: &btpb.RowRange{ + StartKey: &btpb.RowRange_StartKeyClosed{}, + EndKey: &btpb.RowRange_EndKeyOpen{}, + }, + } +} + +const continuationTokenPrefix = "changesIdx=" + +func (s *server) ReadChangeStream(req *btpb.ReadChangeStreamRequest, srv btpb.Bigtable_ReadChangeStreamServer) error { + // TODO: send heartbeats + // TODO: honor retention period + + s.mu.Lock() + tbl, ok := s.tables[req.TableName] + s.mu.Unlock() + if !ok { + return status.Errorf(codes.NotFound, "table %q not found", req.TableName) + } + if tbl.changeRetention == 0 { + return status.Errorf(codes.InvalidArgument, "table %q does not have change streams enabled", req.TableName) + } + if !proto.Equal(req.Partition, initialStreamPartition()) { + return status.Errorf(codes.InvalidArgument, "unexpected partition") + } + + changesIdx := 0 // Index into tbl.changes. + + switch st := req.StartFrom.(type) { + case *btpb.ReadChangeStreamRequest_StartTime: + // Advance changesIdx to the first entry in tbl.changes that is + // not before the requested start time. + start := st.StartTime.AsTime() + tbl.changeMu.Lock() + for changesIdx < len(tbl.changes) { + if !tbl.changes[changesIdx].commit.Before(start) { + break + } + changesIdx++ + } + tbl.changeMu.Unlock() + case *btpb.ReadChangeStreamRequest_ContinuationTokens: + // Parse and validate the continuation token. + toks := st.ContinuationTokens.GetTokens() + if len(toks) != 1 { + return status.Errorf(codes.InvalidArgument, "expected exactly one continuation token, got %d", len(toks)) + } + tok := toks[0] + if !proto.Equal(tok.Partition, initialStreamPartition()) { + return status.Errorf(codes.InvalidArgument, "continuation token has unexpected partition") + } + idxStr, ok := strings.CutPrefix(tok.Token, continuationTokenPrefix) + if !ok { + return status.Errorf(codes.InvalidArgument, "malformed continuation token") + } + idx, err := strconv.ParseInt(idxStr, 10, 64) + if err != nil { + return status.Errorf(codes.InvalidArgument, "malformed continuation token: %v", err) + } + changesIdx = int(idx) + tbl.changeMu.Lock() + n := len(tbl.changes) + tbl.changeMu.Unlock() + if 0 > changesIdx || changesIdx >= n { + return status.Errorf(codes.InvalidArgument, "malformed continuation token: out of range") + } + } + + ctx := srv.Context() + // When the client goes away, broadcast on the condition + // to unblock the wait loop below. + go func() { + <-ctx.Done() + tbl.changeCond.Broadcast() + }() + for { + // Get, or wait for, the latest change to send to the caller. + tbl.changeMu.Lock() + for changesIdx >= len(tbl.changes) && ctx.Err() == nil { + tbl.changeCond.Wait() + } + if ctx.Err() != nil { + tbl.changeMu.Unlock() + return ctx.Err() + } + ch := tbl.changes[changesIdx] + tbl.changeMu.Unlock() + changesIdx++ + + if req.EndTime != nil && ch.commit.After(req.EndTime.AsTime()) { + // The stream has ended; send CloseStream. + return srv.Send(&btpb.ReadChangeStreamResponse{ + StreamRecord: &btpb.ReadChangeStreamResponse_CloseStream_{ + CloseStream: &btpb.ReadChangeStreamResponse_CloseStream{}, + }, + }) + } + + dc := &btpb.ReadChangeStreamResponse_DataChange{ + Type: btpb.ReadChangeStreamResponse_DataChange_USER, + RowKey: []byte(ch.rowKey), + CommitTimestamp: timestamppb.New(ch.commit), + Done: true, + Token: fmt.Sprintf(continuationTokenPrefix+"%d", changesIdx), + EstimatedLowWatermark: timestamppb.New(ch.commit), + } + for _, m := range ch.muts { + dc.Chunks = append(dc.Chunks, &btpb.ReadChangeStreamResponse_MutationChunk{Mutation: m}) + } + if err := srv.Send(&btpb.ReadChangeStreamResponse{ + StreamRecord: &btpb.ReadChangeStreamResponse_DataChange_{DataChange: dc}, + }); err != nil { + return err + } + } +} + // needGC is invoked whenever the server needs gcloop running. func (s *server) needGC() { s.mu.Lock() @@ -1376,6 +1525,17 @@ type table struct { families map[string]*columnFamily // keyed by plain family name rows *btree.BTree // indexed by row key isProtected bool // whether this table has deletion protection + + changeRetention time.Duration // If non-zero, change streams are enabled. + changeMu sync.Mutex // Guards the fields below. + changeCond *sync.Cond // Signalled when changes is mutated. + changes []*changeStreamEntry +} + +type changeStreamEntry struct { + rowKey string + commit time.Time + muts []*btpb.Mutation } const btreeDegree = 16 @@ -1393,12 +1553,17 @@ func newTable(ctr *btapb.CreateTableRequest) *table { c++ } } - return &table{ + tbl := &table{ families: fams, counter: c, rows: btree.New(btreeDegree), isProtected: ctr.GetTable().GetDeletionProtection(), } + if csc := ctr.GetTable().GetChangeStreamConfig(); csc != nil { + tbl.changeRetention = csc.RetentionPeriod.AsDuration() + tbl.changeCond = sync.NewCond(&tbl.changeMu) + } + return tbl } func (t *table) validTimestamp(ts int64) bool { diff --git a/bigtable/bttest/inmem_test.go b/bigtable/bttest/inmem_test.go index 44b7aedcc354..82e93f9df1a0 100644 --- a/bigtable/bttest/inmem_test.go +++ b/bigtable/bttest/inmem_test.go @@ -40,6 +40,8 @@ import ( "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -2305,3 +2307,260 @@ func TestFilterRowCellsPerRowLimitFilterTruthiness(t *testing.T) { } } } + +func TestChangeStreams(t *testing.T) { + ctx := context.Background() + srv := &server{tables: make(map[string]*table)} + + // Create a table with change streams enabled. + tblReq := &btapb.CreateTableRequest{ + Parent: "instance", + TableId: "table", + Table: &btapb.Table{ + ColumnFamilies: map[string]*btapb.ColumnFamily{ + "cf": {}, + }, + ChangeStreamConfig: &btapb.ChangeStreamConfig{ + RetentionPeriod: durationpb.New(24 * time.Hour), + }, + }, + } + tbl, err := srv.CreateTable(ctx, tblReq) + if err != nil { + t.Fatalf("CreateTable: %v", err) + } + + // Generate initial change stream partitions. + initReq := &btpb.GenerateInitialChangeStreamPartitionsRequest{ + TableName: tbl.Name, + } + initResp := &fakeGenerateInitialChangeStreamPartitionsServer{} + if err := srv.GenerateInitialChangeStreamPartitions(initReq, initResp); err != nil { + t.Fatalf("GenerateInitialChangeStreamPartitions: %v", err) + } + if got := len(initResp.responses); got != 1 { + t.Fatalf("GenerateInitialChangeStreamPartitions: got %d responses, want 1", got) + } + initPart := initResp.responses[0].Partition + + write := func(row, value string) { + t.Helper() + req := &btpb.MutateRowRequest{ + TableName: tbl.Name, + RowKey: []byte(row), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ + FamilyName: "cf", + ColumnQualifier: []byte("cq"), + TimestampMicros: -1, + Value: []byte(value), + }}, + }}, + } + if _, err := srv.MutateRow(ctx, req); err != nil { + t.Fatalf("Writing row %q: %v", row, err) + } + // Make sure that all mutations have a unique commit time. + time.Sleep(1 * time.Millisecond) + } + + // Add some data, start a reader with no StartFrom, and verify that it + // sees all those reads. + write("a", "1") + write("b", "2") + r1 := readChangeStream(t, srv, &btpb.ReadChangeStreamRequest{ + TableName: tbl.Name, + Partition: initPart, + }) + r1.want("a", "1") + r1.want("b", "2") + + // Note the start time and continuation token for readers that are + // started below. + startTime := r1.lastCommit.Add(1 * time.Microsecond) + token := r1.lastToken + + // Write some more data, verify the reader sees it. + write("a", "3") + write("b", "4") + r1.want("a", "3") + r1.want("b", "4") + + // Start two readers, one using the start time and another using the + // continuation token, and check that they see the new writes. + r2 := readChangeStream(t, srv, &btpb.ReadChangeStreamRequest{ + TableName: tbl.Name, + Partition: initPart, + StartFrom: &btpb.ReadChangeStreamRequest_StartTime{ + StartTime: timestamppb.New(startTime), + }, + }) + r3 := readChangeStream(t, srv, &btpb.ReadChangeStreamRequest{ + TableName: tbl.Name, + Partition: initPart, + StartFrom: &btpb.ReadChangeStreamRequest_ContinuationTokens{ + ContinuationTokens: &btpb.StreamContinuationTokens{ + Tokens: []*btpb.StreamContinuationToken{{ + Partition: initPart, + Token: token, + }}, + }, + }, + }) + r2.want("a", "3") + r3.want("a", "3") + r2.want("b", "4") + r3.want("b", "4") + + // Write another row and make sure all readers see it. + write("c", "5") + r3.want("c", "5") + r2.want("c", "5") + r1.want("c", "5") + + // Stop all readers, making sure they exit cleanly. + r1.stop() + r2.stop() + r3.stop() + + // Start another reader with a start and end time and expect it to close + // after sending the specified records. + r4 := readChangeStream(t, srv, &btpb.ReadChangeStreamRequest{ + TableName: tbl.Name, + Partition: initPart, + StartFrom: &btpb.ReadChangeStreamRequest_StartTime{ + StartTime: timestamppb.New(startTime), + }, + // Stop just before the last mutation. + EndTime: timestamppb.New(r1.lastCommit.Add(-1 * time.Microsecond)), + }) + r4.want("a", "3") + r4.want("b", "4") + r4.wantClose() +} + +type fakeGenerateInitialChangeStreamPartitionsServer struct { + grpc.ServerStream + responses []*btpb.GenerateInitialChangeStreamPartitionsResponse +} + +func (s *fakeGenerateInitialChangeStreamPartitionsServer) Send(resp *btpb.GenerateInitialChangeStreamPartitionsResponse) error { + s.responses = append(s.responses, resp) + return nil +} + +func readChangeStream(t *testing.T, srv btpb.BigtableServer, req *btpb.ReadChangeStreamRequest) *changeStreamReader { + var ( + ctx, cancel = context.WithCancel(context.Background()) + responses = make(chan *btpb.ReadChangeStreamResponse) + errc = make(chan error, 1) + ) + go func() { + defer cancel() + errc <- srv.ReadChangeStream(req, fakeReadChangeStreamServer{ctx: ctx, responses: responses}) + }() + return &changeStreamReader{ + t: t, + ctx: ctx, + cancel: cancel, + responses: responses, + errc: errc, + } +} + +type changeStreamReader struct { + t *testing.T + ctx context.Context + cancel context.CancelFunc + responses <-chan *btpb.ReadChangeStreamResponse + errc <-chan error + + lastCommit time.Time + lastToken string +} + +func (r *changeStreamReader) want(row, value string) { + r.t.Helper() + select { + case got := <-r.responses: + dc := got.GetDataChange() + if dc == nil { + r.cancel() + r.t.Fatalf("unexpected ReadChangeStreamResponse: %+v", got) + } + r.lastCommit = dc.CommitTimestamp.AsTime() + r.lastToken = dc.Token + if got := string(dc.RowKey); got != row { + r.cancel() + r.t.Fatalf("got change for row %q, want %q", got, row) + } + if got := len(dc.Chunks); got != 1 { + r.cancel() + r.t.Fatalf("got %d chunks, want 1", got) + } + m := dc.Chunks[0].GetMutation() + sc := m.GetSetCell() + if sc == nil { + r.cancel() + r.t.Fatalf("unexpected Mutation: %+v", m) + } + if got := string(sc.Value); got != value { + r.cancel() + r.t.Fatalf("got value %q for row %q, want %q", got, row, value) + } + case err := <-r.errc: + if err != nil { + r.cancel() + r.t.Fatalf("ReadChangeStream: %v", err) + } else { + r.cancel() + r.t.Fatalf("ReadChangeStream returned while waiting for response") + } + } +} + +func (r *changeStreamReader) wantClose() { + r.t.Helper() + select { + case got := <-r.responses: + cs := got.GetCloseStream() + if cs == nil { + r.cancel() + r.t.Fatalf("unexpected ReadChangeStreamResponse: %+v", got) + } + case err := <-r.errc: + if err != nil { + r.cancel() + r.t.Fatalf("ReadChangeStream: %v", err) + } else { + r.cancel() + r.t.Fatalf("ReadChangeStream returned while waiting for response") + } + } +} + +func (r *changeStreamReader) stop() { + r.cancel() + if err := <-r.errc; err != r.ctx.Err() { + r.t.Helper() + r.t.Fatalf("unexpected error from ReadChangeStream: %v", err) + } +} + +type fakeReadChangeStreamServer struct { + grpc.ServerStream + + ctx context.Context + responses chan<- *btpb.ReadChangeStreamResponse +} + +func (s fakeReadChangeStreamServer) Context() context.Context { return s.ctx } + +func (s fakeReadChangeStreamServer) Send(resp *btpb.ReadChangeStreamResponse) error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case s.responses <- resp: + return nil + } +}