Skip to content

Commit

Permalink
feat(bigtable): initial support for change streams in bttest
Browse files Browse the repository at this point in the history
Implement the GenerateInitialChangeStreamPartitions and ReadChangeStream
BigtableServer methods, and record and serve change streams for tables
that are created with a ChangeStreamConfig specified.

Note: this change does not implement sending Heartbeat records from
ReadChangeStream, and the retention period specified by the client is
not honored. These are left as TODOs.
  • Loading branch information
adg committed Dec 28, 2023
1 parent 5d0313f commit a75763b
Show file tree
Hide file tree
Showing 2 changed files with 425 additions and 1 deletion.
167 changes: 166 additions & 1 deletion bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"net"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -1167,6 +1168,28 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co
delete(r.families, fampre)
}
}

if tbl.changeRetention != 0 {
mutsCopy := make([]*btpb.Mutation, len(muts))
for i := range muts {
m := proto.Clone(muts[i]).(*btpb.Mutation)
if sc := m.GetSetCell(); sc != nil {
if sc.TimestampMicros == -1 {
sc.TimestampMicros = serverTime
}
}
mutsCopy[i] = m
}
tbl.changeMu.Lock()
tbl.changes = append(tbl.changes, &changeStreamEntry{
rowKey: r.key,
commit: time.Unix(0, serverTime*1e3),
muts: mutsCopy,
})
tbl.changeCond.Broadcast()
tbl.changeMu.Unlock()
}

return nil
}

Expand Down Expand Up @@ -1332,6 +1355,132 @@ func (s *server) SampleRowKeys(req *btpb.SampleRowKeysRequest, stream btpb.Bigta
return err
}

func (s *server) GenerateInitialChangeStreamPartitions(req *btpb.GenerateInitialChangeStreamPartitionsRequest, srv btpb.Bigtable_GenerateInitialChangeStreamPartitionsServer) error {
return srv.Send(&btpb.GenerateInitialChangeStreamPartitionsResponse{
Partition: initialStreamPartition(),
})
}

// Always use a single stream partition that covers the entire keyspace.
func initialStreamPartition() *btpb.StreamPartition {
return &btpb.StreamPartition{
RowRange: &btpb.RowRange{
StartKey: &btpb.RowRange_StartKeyClosed{},
EndKey: &btpb.RowRange_EndKeyOpen{},
},
}
}

const continuationTokenPrefix = "changesIdx="

func (s *server) ReadChangeStream(req *btpb.ReadChangeStreamRequest, srv btpb.Bigtable_ReadChangeStreamServer) error {
// TODO: send heartbeats
// TODO: honor retention period

s.mu.Lock()
tbl, ok := s.tables[req.TableName]
s.mu.Unlock()
if !ok {
return status.Errorf(codes.NotFound, "table %q not found", req.TableName)
}
if tbl.changeRetention == 0 {
return status.Errorf(codes.InvalidArgument, "table %q does not have change streams enabled", req.TableName)
}
if !proto.Equal(req.Partition, initialStreamPartition()) {
return status.Errorf(codes.InvalidArgument, "unexpected partition")
}

changesIdx := 0 // Index into tbl.changes.

switch st := req.StartFrom.(type) {
case *btpb.ReadChangeStreamRequest_StartTime:
// Advance changesIdx to the first entry in tbl.changes that is
// not before the requested start time.
start := st.StartTime.AsTime()
tbl.changeMu.Lock()
for changesIdx < len(tbl.changes) {
if !tbl.changes[changesIdx].commit.Before(start) {
break
}
changesIdx++
}
tbl.changeMu.Unlock()
case *btpb.ReadChangeStreamRequest_ContinuationTokens:
// Parse and validate the continuation token.
toks := st.ContinuationTokens.GetTokens()
if len(toks) != 1 {
return status.Errorf(codes.InvalidArgument, "expected exactly one continuation token, got %d", len(toks))
}
tok := toks[0]
if !proto.Equal(tok.Partition, initialStreamPartition()) {
return status.Errorf(codes.InvalidArgument, "continuation token has unexpected partition")
}
idxStr, ok := strings.CutPrefix(tok.Token, continuationTokenPrefix)
if !ok {
return status.Errorf(codes.InvalidArgument, "malformed continuation token")
}
idx, err := strconv.ParseInt(idxStr, 10, 64)
if err != nil {
return status.Errorf(codes.InvalidArgument, "malformed continuation token: %v", err)
}
changesIdx = int(idx)
tbl.changeMu.Lock()
n := len(tbl.changes)
tbl.changeMu.Unlock()
if 0 > changesIdx || changesIdx >= n {
return status.Errorf(codes.InvalidArgument, "malformed continuation token: out of range")
}
}

ctx := srv.Context()
// When the client goes away, broadcast on the condition
// to unblock the wait loop below.
go func() {
<-ctx.Done()
tbl.changeCond.Broadcast()
}()
for {
// Get, or wait for, the latest change to send to the caller.
tbl.changeMu.Lock()
for changesIdx >= len(tbl.changes) && ctx.Err() == nil {
tbl.changeCond.Wait()
}
if ctx.Err() != nil {
tbl.changeMu.Unlock()
return ctx.Err()
}
ch := tbl.changes[changesIdx]
tbl.changeMu.Unlock()
changesIdx++

if req.EndTime != nil && ch.commit.After(req.EndTime.AsTime()) {
// The stream has ended; send CloseStream.
return srv.Send(&btpb.ReadChangeStreamResponse{
StreamRecord: &btpb.ReadChangeStreamResponse_CloseStream_{
CloseStream: &btpb.ReadChangeStreamResponse_CloseStream{},
},
})
}

dc := &btpb.ReadChangeStreamResponse_DataChange{
Type: btpb.ReadChangeStreamResponse_DataChange_USER,
RowKey: []byte(ch.rowKey),
CommitTimestamp: timestamppb.New(ch.commit),
Done: true,
Token: fmt.Sprintf(continuationTokenPrefix+"%d", changesIdx),
EstimatedLowWatermark: timestamppb.New(ch.commit),
}
for _, m := range ch.muts {
dc.Chunks = append(dc.Chunks, &btpb.ReadChangeStreamResponse_MutationChunk{Mutation: m})
}
if err := srv.Send(&btpb.ReadChangeStreamResponse{
StreamRecord: &btpb.ReadChangeStreamResponse_DataChange_{DataChange: dc},
}); err != nil {
return err
}
}
}

// needGC is invoked whenever the server needs gcloop running.
func (s *server) needGC() {
s.mu.Lock()
Expand Down Expand Up @@ -1376,6 +1525,17 @@ type table struct {
families map[string]*columnFamily // keyed by plain family name
rows *btree.BTree // indexed by row key
isProtected bool // whether this table has deletion protection

changeRetention time.Duration // If non-zero, change streams are enabled.
changeMu sync.Mutex // Guards the fields below.
changeCond *sync.Cond // Signalled when changes is mutated.
changes []*changeStreamEntry
}

type changeStreamEntry struct {
rowKey string
commit time.Time
muts []*btpb.Mutation
}

const btreeDegree = 16
Expand All @@ -1393,12 +1553,17 @@ func newTable(ctr *btapb.CreateTableRequest) *table {
c++
}
}
return &table{
tbl := &table{
families: fams,
counter: c,
rows: btree.New(btreeDegree),
isProtected: ctr.GetTable().GetDeletionProtection(),
}
if csc := ctr.GetTable().GetChangeStreamConfig(); csc != nil {
tbl.changeRetention = csc.RetentionPeriod.AsDuration()
tbl.changeCond = sync.NewCond(&tbl.changeMu)
}
return tbl
}

func (t *table) validTimestamp(ts int64) bool {
Expand Down
Loading

0 comments on commit a75763b

Please sign in to comment.