Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigtable): Mutate groups even after first error #11434

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,8 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool {
return true
}

const maxMutations = 100000
// Overriden in tests
var maxMutations = 100000

// Apply mutates a row atomically. A mutation must contain at least one
// operation and at most 100000 operations.
Expand Down Expand Up @@ -1227,9 +1228,14 @@ func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb.
type entryErr struct {
Entry *btpb.MutateRowsRequest_Entry
Err error

// TopLevelErr is the error received either from
// 1. client.MutateRows
// 2. stream.Recv
TopLevelErr error
}

// ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
// ApplyBulk applies multiple Mutations.
// Each mutation is individually applied atomically,
// but the set of mutations may be applied in any order.
//
Expand Down Expand Up @@ -1257,17 +1263,31 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio
origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
}

for _, group := range groupEntries(origEntries, maxMutations) {
var firstGroupErr error
numFailed := 0
groups := groupEntries(origEntries, maxMutations)
for _, group := range groups {
err := t.applyGroup(ctx, group, opts...)
if err != nil {
return nil, err
if firstGroupErr == nil {
firstGroupErr = err
}
numFailed++
}
}

if numFailed == len(groups) {
return nil, firstGroupErr
}

// All the errors are accumulated into an array and returned, interspersed with nils for successful
// entries. The absence of any errors means we should return nil.
var foundErr bool
for _, entry := range origEntries {
if entry.Err == nil && entry.TopLevelErr != nil {
// Populate per mutation error if top level error is not nil
entry.Err = entry.TopLevelErr
}
if entry.Err != nil {
foundErr = true
}
Expand All @@ -1292,6 +1312,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
// We want to retry the entire request with the current group
return err
}
// Get the entries that need to be retried
group = t.getApplyBulkRetries(group)
if len(group) > 0 && len(idempotentRetryCodes) > 0 {
// We have at least one mutation that needs to be retried.
Expand Down Expand Up @@ -1327,6 +1348,11 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
}
}

var topLevelErr error
defer func() {
populateTopLevelError(entryErrs, topLevelErr)
}()

entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
for i, entryErr := range entryErrs {
entries[i] = entryErr.Entry
Expand All @@ -1343,6 +1369,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD

stream, err := t.c.client.MutateRows(ctx, req)
if err != nil {
_, topLevelErr = convertToGrpcStatusErr(err)
return err
}

Expand All @@ -1357,6 +1384,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
}
if err != nil {
*trailerMD = stream.Trailer()
_, topLevelErr = convertToGrpcStatusErr(err)
return err
}

Expand All @@ -1373,6 +1401,12 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
return nil
}

func populateTopLevelError(entries []*entryErr, topLevelErr error) {
for _, entry := range entries {
entry.TopLevelErr = topLevelErr
}
}

// groupEntries groups entries into groups of a specified size without breaking up
// individual entries.
func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
Expand Down
138 changes: 138 additions & 0 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}}
Expand Down Expand Up @@ -875,3 +877,139 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) {
t.Error()
}
}

type rowKeyCheckingInterceptor struct {
grpc.ClientStream
failRow string
failErr error // error to use while sending failed reponse for fail row
requestCounter *int
}

func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error {
*i.requestCounter = *i.requestCounter + 1
if req, ok := m.(*btpb.MutateRowsRequest); ok {
for _, entry := range req.Entries {
if string(entry.RowKey) == i.failRow {
return i.failErr
}
}
}
return i.ClientStream.SendMsg(m)
}

func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error {
return i.ClientStream.RecvMsg(m)
}

// Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service
// This test validates that even if one of the group receives error, requests are sent for further groups
func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) {
testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{})
if gotErr != nil {
t.Fatalf("NewEmulatedEnv failed: %v", gotErr)
}

// Add interceptor to fail rows
failedRow := "row2"
failErr := status.Error(codes.InvalidArgument, "Invalid row key")
reqCount := 0
conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientStream, err := streamer(ctx, desc, cc, method, opts...)
return &rowKeyCheckingInterceptor{
ClientStream: clientStream,
failRow: failedRow,
requestCounter: &reqCount,
failErr: failErr,
}, err
}),
)
if gotErr != nil {
t.Fatalf("grpc.Dial failed: %v", gotErr)
}

// Create client and table
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, gotErr := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if gotErr != nil {
t.Fatalf("NewClient failed: %v", gotErr)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
ColumnFamilies: map[string]Family{
"f": {
ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
},
},
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if gotErr != nil {
t.Fatalf("NewClientWithConfig failed: %v", gotErr)
}
defer client.Close()
table := client.Open(testEnv.config.Table)

// Override maxMutations to break mutations into smaller groups
origMaxMutations := maxMutations
t.Cleanup(func() {
maxMutations = origMaxMutations
})
maxMutations = 2

// Create mutations
m1 := NewMutation()
m1.AddIntToCell("f", "q", 0, 1000)
m2 := NewMutation()
m2.AddIntToCell("f", "q", 0, 2000)

// Perform ApplyBulk operation and compare errors
rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"}
var wantErr error
wantErrs := []error{nil, nil, failErr, failErr, nil, nil}
gotErrs, gotErr := table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2})

// Assert overall error
if !equalErrs(gotErr, wantErr) {
t.Fatalf("ApplyBulk err got: %v, want: %v", gotErr, wantErr)
}

// Assert individual muation errors
if len(gotErrs) != len(wantErrs) {
t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs)
}
for i := range gotErrs {
if !equalErrs(gotErrs[i], wantErrs[i]) {
t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i])
}
}

// Assert number of requests sent
wantReqCount := len(rowKeys) / maxMutations
if reqCount != wantReqCount {
t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount)
}

// Assert individual mutation apply success/failure by reading rows
gotErr = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool {
rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000))
if rowMutated && row.Key() == failedRow {
t.Error("Expected mutation to fail for row " + row.Key())
}
if !rowMutated && row.Key() != failedRow {
t.Error("Expected mutation to succeed for row " + row.Key())
}
return true
})
if gotErr != nil {
t.Fatalf("ReadRows failed: %v", gotErr)
}
}
Loading