From 77a83fab53de3336d19a0746f5e0401e6dd16c2f Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 10 Jan 2025 19:27:02 +0000 Subject: [PATCH 1/6] fix(bigtable): Mutate groups even after first error --- bigtable/bigtable.go | 33 +++++++++-- bigtable/bigtable_test.go | 114 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 6 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 94e21f4b6994..daf1c17ea82c 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1013,7 +1013,8 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool { return true } -const maxMutations = 100000 +// Overriden in tests +var maxMutations = 100000 // Apply mutates a row atomically. A mutation must contain at least one // operation and at most 100000 operations. @@ -1257,10 +1258,11 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}} } + var firstGroupErr error for _, group := range groupEntries(origEntries, maxMutations) { err := t.applyGroup(ctx, group, opts...) - if err != nil { - return nil, err + if err != nil && firstGroupErr == nil { + firstGroupErr = err } } @@ -1274,7 +1276,7 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio errs = append(errs, entry.Err) } if foundErr { - return errs, nil + return errs, firstGroupErr } return nil, nil } @@ -1343,6 +1345,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD stream, err := t.c.client.MutateRows(ctx, req) if err != nil { + populatePerMutationErrors(entryErrs, nil, err) return err } @@ -1355,11 +1358,22 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD *trailerMD = stream.Trailer() break } + + populatePerMutationErrors(entryErrs, res, err) if err != nil { *trailerMD = stream.Trailer() return err } + after(res) + } + return nil +} + +func populatePerMutationErrors(entryErrs []*entryErr, res *btpb.MutateRowsResponse, err error) { + // If response received from Cloud Bigtable service is not nil, + // populate per mutation errors with the Entries from response + if res != nil { for _, entry := range res.Entries { s := entry.Status if s.Code == int32(codes.OK) { @@ -1368,9 +1382,16 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD entryErrs[entry.Index].Err = status.Errorf(codes.Code(s.Code), s.Message) } } - after(res) + return + } + + // If response received from Cloud Bigtable service is nil and error received from Cloud Bigtable service is not nil + // set per mutation errors to the error + if err != nil { + for i, _ := range entryErrs { + _, entryErrs[i].Err = convertToGrpcStatusErr(err) + } } - return nil } // groupEntries groups entries into groups of a specified size without breaking up diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 6664670ea6b0..9a22045de929 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -28,6 +28,8 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}} @@ -875,3 +877,115 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) { t.Error() } } + +type rowKeyCheckingInterceptor struct { + grpc.ClientStream + failRow string + requestCounter *int +} + +func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error { + *i.requestCounter = *i.requestCounter + 1 + if req, ok := m.(*btpb.MutateRowsRequest); ok { + for _, entry := range req.Entries { + if string(entry.RowKey) == i.failRow { + return status.Error(codes.InvalidArgument, "Invalid row key") + } + } + } + return i.ClientStream.SendMsg(m) +} + +func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error { + return i.ClientStream.RecvMsg(m) +} + +// Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service +// This test validates that even if one of the group receives error, requests are sent for further groups +func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { + testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) + if err != nil { + t.Fatalf("NewEmulatedEnv failed: %v", err) + } + failedRow := "row2" + reqCount := 0 + conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)), + grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + clientStream, err := streamer(ctx, desc, cc, method, opts...) + return &rowKeyCheckingInterceptor{ + ClientStream: clientStream, + failRow: failedRow, + requestCounter: &reqCount, + }, err + }), + ) + if err != nil { + t.Fatalf("grpc.Dial failed: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + defer adminClient.Close() + + tableConf := &TableConf{ + TableID: testEnv.config.Table, + ColumnFamilies: map[string]Family{ + "f": { + ValueType: AggregateType{ + Input: Int64Type{}, + Aggregator: SumAggregator{}, + }, + }, + }, + } + if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil { + t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) + } + + client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClientWithConfig failed: %v", err) + } + defer client.Close() + table := client.Open(testEnv.config.Table) + + m1 := NewMutation() + m1.AddIntToCell("f", "q", 0, 1000) + m2 := NewMutation() + m2.AddIntToCell("f", "q", 0, 2000) + + origMaxMutations := maxMutations + t.Cleanup(func() { + maxMutations = origMaxMutations + }) + maxMutations = 2 + + rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"} + _, err = table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2}) + if err == nil { + t.Fatalf("Expected ApplyBulk to fail") + } + + wantReqCount := len(rowKeys) / maxMutations + if reqCount != wantReqCount { + t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount) + } + + err = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool { + rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) + if rowMutated && row.Key() == failedRow { + t.Error("Expected mutation to fail for row " + row.Key()) + } + if !rowMutated && row.Key() != failedRow { + t.Error("Expected mutation to succeed for row " + row.Key()) + } + return true + }) + if err != nil { + t.Fatalf("ReadRows failed: %v", err) + } +} From 3f55b861ecc0c3e04d08fee86e36bc1ab13c8b86 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 10 Jan 2025 21:49:34 +0000 Subject: [PATCH 2/6] simplify range use --- bigtable/bigtable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index daf1c17ea82c..fc20518bd346 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1388,7 +1388,7 @@ func populatePerMutationErrors(entryErrs []*entryErr, res *btpb.MutateRowsRespon // If response received from Cloud Bigtable service is nil and error received from Cloud Bigtable service is not nil // set per mutation errors to the error if err != nil { - for i, _ := range entryErrs { + for i := range entryErrs { _, entryErrs[i].Err = convertToGrpcStatusErr(err) } } From 3f04dd02ea81c8106be31013daa80d81dede3f32 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Tue, 14 Jan 2025 05:59:59 +0000 Subject: [PATCH 3/6] fix unit tests --- bigtable/bigtable.go | 63 +++++++++++++++++++-------------------- bigtable/bigtable_test.go | 12 +++++--- bigtable/retry_test.go | 4 +-- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index fc20518bd346..e52c74001cb3 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1260,10 +1260,17 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio var firstGroupErr error for _, group := range groupEntries(origEntries, maxMutations) { - err := t.applyGroup(ctx, group, opts...) + err := t.applyGroup(ctx, &group, opts...) if err != nil && firstGroupErr == nil { firstGroupErr = err } + + // Populate per mutation error if top level error is not nil + if group.topLevelErr != nil { + for i := range group.entries { + group.entries[i].Err = group.topLevelErr + } + } } // All the errors are accumulated into an array and returned, interspersed with nils for successful @@ -1281,21 +1288,21 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio return nil, nil } -func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...ApplyOption) (err error) { +func (t *Table) applyGroup(ctx context.Context, group *entriesGroup, opts ...ApplyOption) (err error) { attrMap := make(map[string]interface{}) mt := t.newBuiltinMetricsTracer(ctx, true) defer recordOperationCompletion(mt) err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { - attrMap["rowCount"] = len(group) + attrMap["rowCount"] = len(group.entries) trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") err := t.doApplyBulk(ctx, group, headerMD, trailerMD, opts...) if err != nil { // We want to retry the entire request with the current group return err } - group = t.getApplyBulkRetries(group) - if len(group) > 0 && len(idempotentRetryCodes) > 0 { + group.entries = t.getApplyBulkRetries(group.entries) + if len(group.entries) > 0 && len(idempotentRetryCodes) > 0 { // We have at least one mutation that needs to be retried. // Return an arbitrary error that is retryable according to callOptions. return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk") @@ -1322,15 +1329,15 @@ func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { } // doApplyBulk does the work of a single ApplyBulk invocation -func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD, trailerMD *metadata.MD, opts ...ApplyOption) error { +func (t *Table) doApplyBulk(ctx context.Context, group *entriesGroup, headerMD, trailerMD *metadata.MD, opts ...ApplyOption) error { after := func(res proto.Message) { for _, o := range opts { o.after(res) } } - entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) - for i, entryErr := range entryErrs { + entries := make([]*btpb.MutateRowsRequest_Entry, len(group.entries)) + for i, entryErr := range group.entries { entries[i] = entryErr.Entry } req := &btpb.MutateRowsRequest{ @@ -1345,7 +1352,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD stream, err := t.c.client.MutateRows(ctx, req) if err != nil { - populatePerMutationErrors(entryErrs, nil, err) + _, group.topLevelErr = convertToGrpcStatusErr(err) return err } @@ -1358,53 +1365,43 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD *trailerMD = stream.Trailer() break } - - populatePerMutationErrors(entryErrs, res, err) if err != nil { *trailerMD = stream.Trailer() + _, group.topLevelErr = convertToGrpcStatusErr(err) return err } - after(res) - } - return nil -} - -func populatePerMutationErrors(entryErrs []*entryErr, res *btpb.MutateRowsResponse, err error) { - // If response received from Cloud Bigtable service is not nil, - // populate per mutation errors with the Entries from response - if res != nil { for _, entry := range res.Entries { s := entry.Status if s.Code == int32(codes.OK) { - entryErrs[entry.Index].Err = nil + group.entries[entry.Index].Err = nil } else { - entryErrs[entry.Index].Err = status.Errorf(codes.Code(s.Code), s.Message) + group.entries[entry.Index].Err = status.Errorf(codes.Code(s.Code), s.Message) } } - return + after(res) } + return nil +} - // If response received from Cloud Bigtable service is nil and error received from Cloud Bigtable service is not nil - // set per mutation errors to the error - if err != nil { - for i := range entryErrs { - _, entryErrs[i].Err = convertToGrpcStatusErr(err) - } - } +type entriesGroup struct { + entries []*entryErr + topLevelErr error // top level error either from stream.Recv or MutateRows } // groupEntries groups entries into groups of a specified size without breaking up // individual entries. -func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr { +func groupEntries(entries []*entryErr, maxSize int) []entriesGroup { var ( - res [][]*entryErr + res []entriesGroup start int gmuts int ) addGroup := func(end int) { if end-start > 0 { - res = append(res, entries[start:end]) + res = append(res, entriesGroup{ + entries: entries[start:end], + }) start = end gmuts = 0 } diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 9a22045de929..b7d50c4961ed 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -328,7 +328,11 @@ func TestGroupEntries(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - if got, want := groupEntries(test.in, test.size), test.want; !cmp.Equal(mutationCounts(got), mutationCounts(want)) { + wantEntriesGroup := []entriesGroup{} + for _, w := range test.want { + wantEntriesGroup = append(wantEntriesGroup, entriesGroup{entries: w}) + } + if got, want := groupEntries(test.in, test.size), wantEntriesGroup; !cmp.Equal(mutationCounts(got), mutationCounts(want)) { t.Fatalf("[%s] want = %v, got = %v", test.desc, mutationCounts(want), mutationCounts(got)) } }) @@ -343,11 +347,11 @@ func buildEntry(numMutations int) *entryErr { return &entryErr{Entry: &btpb.MutateRowsRequest_Entry{Mutations: muts}} } -func mutationCounts(batched [][]*entryErr) []int { +func mutationCounts(batched []entriesGroup) []int { var res []int - for _, entries := range batched { + for _, group := range batched { var count int - for _, e := range entries { + for _, e := range group.entries { count += len(e.Entry.Mutations) } res = append(res, count) diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go index f330dac16616..d96d376e73cd 100644 --- a/bigtable/retry_test.go +++ b/bigtable/retry_test.go @@ -354,8 +354,8 @@ func TestRetryApplyBulk_IndividualErrorsAndDeadlineExceeded(t *testing.T) { if !equalErrs(wantErr, err) { t.Fatalf("deadline exceeded error: got: %v, want: %v", err, wantErr) } - if errors != nil { - t.Errorf("deadline exceeded errors: got: %v, want: nil", err) + if errors == nil { + t.Errorf("deadline exceeded errors: got: %v, want: %v", err, []error{wantErr}) } } From 71f4745c5866049930a3b2ea76017e5e5a11f3a3 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 15 Jan 2025 03:22:23 +0000 Subject: [PATCH 4/6] refactor code --- bigtable/bigtable.go | 76 +++++++++++++++++++-------------- bigtable/bigtable_test.go | 88 ++++++++++++++++++++++++--------------- 2 files changed, 99 insertions(+), 65 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index e52c74001cb3..2336ad3d5f00 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1228,9 +1228,14 @@ func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb. type entryErr struct { Entry *btpb.MutateRowsRequest_Entry Err error + + // TopLevelErr is the error received either from + // 1. client.MutateRows + // 2. stream.Recv + TopLevelErr error } -// ApplyBulk applies multiple Mutations, up to a maximum of 100,000. +// ApplyBulk applies multiple Mutations. // Each mutation is individually applied atomically, // but the set of mutations may be applied in any order. // @@ -1259,50 +1264,57 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio } var firstGroupErr error - for _, group := range groupEntries(origEntries, maxMutations) { - err := t.applyGroup(ctx, &group, opts...) - if err != nil && firstGroupErr == nil { - firstGroupErr = err - } - - // Populate per mutation error if top level error is not nil - if group.topLevelErr != nil { - for i := range group.entries { - group.entries[i].Err = group.topLevelErr + numFailed := 0 + groups := groupEntries(origEntries, maxMutations) + for _, group := range groups { + err := t.applyGroup(ctx, group, opts...) + if err != nil { + if firstGroupErr == nil { + firstGroupErr = err } + numFailed++ } } + if numFailed == len(groups) { + return nil, firstGroupErr + } + // All the errors are accumulated into an array and returned, interspersed with nils for successful // entries. The absence of any errors means we should return nil. var foundErr bool for _, entry := range origEntries { + if entry.Err == nil && entry.TopLevelErr != nil { + // Populate per mutation error if top level error is not nil + entry.Err = entry.TopLevelErr + } if entry.Err != nil { foundErr = true } errs = append(errs, entry.Err) } if foundErr { - return errs, firstGroupErr + return errs, nil } return nil, nil } -func (t *Table) applyGroup(ctx context.Context, group *entriesGroup, opts ...ApplyOption) (err error) { +func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...ApplyOption) (err error) { attrMap := make(map[string]interface{}) mt := t.newBuiltinMetricsTracer(ctx, true) defer recordOperationCompletion(mt) err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { - attrMap["rowCount"] = len(group.entries) + attrMap["rowCount"] = len(group) trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") err := t.doApplyBulk(ctx, group, headerMD, trailerMD, opts...) if err != nil { // We want to retry the entire request with the current group return err } - group.entries = t.getApplyBulkRetries(group.entries) - if len(group.entries) > 0 && len(idempotentRetryCodes) > 0 { + // Get the entries that need to be retried + group = t.getApplyBulkRetries(group) + if len(group) > 0 && len(idempotentRetryCodes) > 0 { // We have at least one mutation that needs to be retried. // Return an arbitrary error that is retryable according to callOptions. return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk") @@ -1329,15 +1341,15 @@ func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { } // doApplyBulk does the work of a single ApplyBulk invocation -func (t *Table) doApplyBulk(ctx context.Context, group *entriesGroup, headerMD, trailerMD *metadata.MD, opts ...ApplyOption) error { +func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD, trailerMD *metadata.MD, opts ...ApplyOption) error { after := func(res proto.Message) { for _, o := range opts { o.after(res) } } - entries := make([]*btpb.MutateRowsRequest_Entry, len(group.entries)) - for i, entryErr := range group.entries { + entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) + for i, entryErr := range entryErrs { entries[i] = entryErr.Entry } req := &btpb.MutateRowsRequest{ @@ -1352,7 +1364,8 @@ func (t *Table) doApplyBulk(ctx context.Context, group *entriesGroup, headerMD, stream, err := t.c.client.MutateRows(ctx, req) if err != nil { - _, group.topLevelErr = convertToGrpcStatusErr(err) + _, topLevelErr := convertToGrpcStatusErr(err) + populateTopLevelError(entryErrs, topLevelErr) return err } @@ -1367,16 +1380,18 @@ func (t *Table) doApplyBulk(ctx context.Context, group *entriesGroup, headerMD, } if err != nil { *trailerMD = stream.Trailer() - _, group.topLevelErr = convertToGrpcStatusErr(err) + _, topLevelErr := convertToGrpcStatusErr(err) + populateTopLevelError(entryErrs, topLevelErr) return err } + populateTopLevelError(entryErrs, nil) for _, entry := range res.Entries { s := entry.Status if s.Code == int32(codes.OK) { - group.entries[entry.Index].Err = nil + entryErrs[entry.Index].Err = nil } else { - group.entries[entry.Index].Err = status.Errorf(codes.Code(s.Code), s.Message) + entryErrs[entry.Index].Err = status.Errorf(codes.Code(s.Code), s.Message) } } after(res) @@ -1384,24 +1399,23 @@ func (t *Table) doApplyBulk(ctx context.Context, group *entriesGroup, headerMD, return nil } -type entriesGroup struct { - entries []*entryErr - topLevelErr error // top level error either from stream.Recv or MutateRows +func populateTopLevelError(entries []*entryErr, topLevelErr error) { + for _, entry := range entries { + entry.TopLevelErr = topLevelErr + } } // groupEntries groups entries into groups of a specified size without breaking up // individual entries. -func groupEntries(entries []*entryErr, maxSize int) []entriesGroup { +func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr { var ( - res []entriesGroup + res [][]*entryErr start int gmuts int ) addGroup := func(end int) { if end-start > 0 { - res = append(res, entriesGroup{ - entries: entries[start:end], - }) + res = append(res, entries[start:end]) start = end gmuts = 0 } diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index b7d50c4961ed..92f22e7bf45d 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -328,11 +328,7 @@ func TestGroupEntries(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - wantEntriesGroup := []entriesGroup{} - for _, w := range test.want { - wantEntriesGroup = append(wantEntriesGroup, entriesGroup{entries: w}) - } - if got, want := groupEntries(test.in, test.size), wantEntriesGroup; !cmp.Equal(mutationCounts(got), mutationCounts(want)) { + if got, want := groupEntries(test.in, test.size), test.want; !cmp.Equal(mutationCounts(got), mutationCounts(want)) { t.Fatalf("[%s] want = %v, got = %v", test.desc, mutationCounts(want), mutationCounts(got)) } }) @@ -347,11 +343,11 @@ func buildEntry(numMutations int) *entryErr { return &entryErr{Entry: &btpb.MutateRowsRequest_Entry{Mutations: muts}} } -func mutationCounts(batched []entriesGroup) []int { +func mutationCounts(batched [][]*entryErr) []int { var res []int - for _, group := range batched { + for _, entries := range batched { var count int - for _, e := range group.entries { + for _, e := range entries { count += len(e.Entry.Mutations) } res = append(res, count) @@ -885,6 +881,7 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) { type rowKeyCheckingInterceptor struct { grpc.ClientStream failRow string + failErr error // error to use while sending failed reponse for fail row requestCounter *int } @@ -893,7 +890,7 @@ func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error { if req, ok := m.(*btpb.MutateRowsRequest); ok { for _, entry := range req.Entries { if string(entry.RowKey) == i.failRow { - return status.Error(codes.InvalidArgument, "Invalid row key") + return i.failErr } } } @@ -907,13 +904,16 @@ func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error { // Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service // This test validates that even if one of the group receives error, requests are sent for further groups func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { - testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) - if err != nil { - t.Fatalf("NewEmulatedEnv failed: %v", err) + testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{}) + if gotErr != nil { + t.Fatalf("NewEmulatedEnv failed: %v", gotErr) } + + // Add interceptor to fail rows failedRow := "row2" + failErr := status.Error(codes.InvalidArgument, "Invalid row key") reqCount := 0 - conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), + conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)), grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { clientStream, err := streamer(ctx, desc, cc, method, opts...) @@ -921,20 +921,22 @@ func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { ClientStream: clientStream, failRow: failedRow, requestCounter: &reqCount, + failErr: failErr, }, err }), ) - if err != nil { - t.Fatalf("grpc.Dial failed: %v", err) + if gotErr != nil { + t.Fatalf("grpc.Dial failed: %v", gotErr) } + + // Create client and table ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) - if err != nil { - t.Fatalf("NewClient failed: %v", err) + adminClient, gotErr := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if gotErr != nil { + t.Fatalf("NewClient failed: %v", gotErr) } defer adminClient.Close() - tableConf := &TableConf{ TableID: testEnv.config.Table, ColumnFamilies: map[string]Family{ @@ -949,37 +951,55 @@ func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil { t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) } - - client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) - if err != nil { - t.Fatalf("NewClientWithConfig failed: %v", err) + client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) + if gotErr != nil { + t.Fatalf("NewClientWithConfig failed: %v", gotErr) } defer client.Close() table := client.Open(testEnv.config.Table) - m1 := NewMutation() - m1.AddIntToCell("f", "q", 0, 1000) - m2 := NewMutation() - m2.AddIntToCell("f", "q", 0, 2000) - + // Override maxMutations to break mutations into smaller groups origMaxMutations := maxMutations t.Cleanup(func() { maxMutations = origMaxMutations }) maxMutations = 2 + // Create mutations + m1 := NewMutation() + m1.AddIntToCell("f", "q", 0, 1000) + m2 := NewMutation() + m2.AddIntToCell("f", "q", 0, 2000) + + // Perform ApplyBulk operation and compare errors rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"} - _, err = table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2}) - if err == nil { - t.Fatalf("Expected ApplyBulk to fail") + var wantErr error + wantErrs := []error{nil, nil, failErr, failErr, nil, nil} + gotErrs, gotErr := table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2}) + + // Assert overall error + if !equalErrs(gotErr, wantErr) { + t.Fatalf("ApplyBulk err got: %v, want: %v", gotErr, wantErr) + } + + // Assert individual muation errors + if len(gotErrs) != len(wantErrs) { + t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs) + } + for i, _ := range gotErrs { + if !equalErrs(gotErrs[i], wantErrs[i]) { + t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i]) + } } + // Assert number of requests sent wantReqCount := len(rowKeys) / maxMutations if reqCount != wantReqCount { t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount) } - err = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool { + // Assert individual mutation apply success/failure by reading rows + gotErr = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool { rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) if rowMutated && row.Key() == failedRow { t.Error("Expected mutation to fail for row " + row.Key()) @@ -989,7 +1009,7 @@ func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { } return true }) - if err != nil { - t.Fatalf("ReadRows failed: %v", err) + if gotErr != nil { + t.Fatalf("ReadRows failed: %v", gotErr) } } From 748fe7f381bf8eb13de098e056a3ad25e45923a9 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 15 Jan 2025 03:26:58 +0000 Subject: [PATCH 5/6] resolve vet failures --- bigtable/bigtable_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 92f22e7bf45d..472def5c4b38 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -986,7 +986,7 @@ func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { if len(gotErrs) != len(wantErrs) { t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs) } - for i, _ := range gotErrs { + for i := range gotErrs { if !equalErrs(gotErrs[i], wantErrs[i]) { t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i]) } From b44cbd00e070596085e0dcbd4f39e34e0c18b9ed Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 15 Jan 2025 04:56:43 +0000 Subject: [PATCH 6/6] fix tests --- bigtable/bigtable.go | 12 +++++++----- bigtable/retry_test.go | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 2336ad3d5f00..1a2231dd9b76 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1348,6 +1348,11 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD } } + var topLevelErr error + defer func() { + populateTopLevelError(entryErrs, topLevelErr) + }() + entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) for i, entryErr := range entryErrs { entries[i] = entryErr.Entry @@ -1364,8 +1369,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD stream, err := t.c.client.MutateRows(ctx, req) if err != nil { - _, topLevelErr := convertToGrpcStatusErr(err) - populateTopLevelError(entryErrs, topLevelErr) + _, topLevelErr = convertToGrpcStatusErr(err) return err } @@ -1380,12 +1384,10 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD } if err != nil { *trailerMD = stream.Trailer() - _, topLevelErr := convertToGrpcStatusErr(err) - populateTopLevelError(entryErrs, topLevelErr) + _, topLevelErr = convertToGrpcStatusErr(err) return err } - populateTopLevelError(entryErrs, nil) for _, entry := range res.Entries { s := entry.Status if s.Code == int32(codes.OK) { diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go index d96d376e73cd..f330dac16616 100644 --- a/bigtable/retry_test.go +++ b/bigtable/retry_test.go @@ -354,8 +354,8 @@ func TestRetryApplyBulk_IndividualErrorsAndDeadlineExceeded(t *testing.T) { if !equalErrs(wantErr, err) { t.Fatalf("deadline exceeded error: got: %v, want: %v", err, wantErr) } - if errors == nil { - t.Errorf("deadline exceeded errors: got: %v, want: %v", err, []error{wantErr}) + if errors != nil { + t.Errorf("deadline exceeded errors: got: %v, want: nil", err) } }