Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support max_commit_delay in Spanner transactions #9299

Merged
merged 10 commits into from
Feb 8, 2024
26 changes: 26 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4120,24 +4120,50 @@ type TransactionOptionsTestCase struct {
}

func transactionOptionsTestCases() []TransactionOptionsTestCase {
duration, _ := time.ParseDuration("100ms")
otherDuration, _ := time.ParseDuration("50ms")

return []TransactionOptionsTestCase{
{
name: "Client level",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Client level with MaxCommitDelay",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Write level",
client: &TransactionOptions{},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Write level with MaxCommitDelay",
client: &TransactionOptions{},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Write level has precedence than client level",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: false}, TransactionTag: "clientTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
},
{
name: "Write level nil MaxCommitDelay does not unset client level MaxCommitDelay",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: false, MaxCommitDelay: &duration}, TransactionTag: "clientTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
},
{
name: "Write level has precedence than client level MaxCommitDelay",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: false, MaxCommitDelay: &duration}, TransactionTag: "clientTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &otherDuration}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &otherDuration}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
},
{
name: "Read lock mode is optimistic",
client: &TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC},
Expand Down
3 changes: 2 additions & 1 deletion spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,8 @@ func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) {
}
}

txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
duration, _ := time.ParseDuration("100ms")
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}}
resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Query Foo's balance and Bar's balance.
queryAccountByID := "SELECT Balance FROM Accounts WHERE AccountId = @p1"
Expand Down
16 changes: 15 additions & 1 deletion spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/spanner/apiv1"
durationpb "google.golang.org/protobuf/types/known/durationpb"
)

// transactionID stores a transaction ID which uniquely identifies a transaction
Expand Down Expand Up @@ -1486,14 +1487,22 @@ type CommitResponse struct {
// CommitOptions provides options for committing a transaction in a database.
type CommitOptions struct {
ReturnCommitStats bool
MaxCommitDelay *time.Duration
}

// merge combines two CommitOptions that the input parameter will have higher
// order of precedence.
func (co CommitOptions) merge(opts CommitOptions) CommitOptions {
return CommitOptions{
var newOpts CommitOptions
newOpts = CommitOptions{
ReturnCommitStats: co.ReturnCommitStats || opts.ReturnCommitStats,
MaxCommitDelay: opts.MaxCommitDelay,
}

if newOpts.MaxCommitDelay == nil {
newOpts.MaxCommitDelay = co.MaxCommitDelay
}
return newOpts
}

// commit tries to commit a readwrite transaction to Cloud Spanner. It also
Expand Down Expand Up @@ -1532,6 +1541,10 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions
t.sh.updateLastUseTime()

var md metadata.MD
var maxCommitDelay *durationpb.Duration
if options.MaxCommitDelay != nil {
maxCommitDelay = durationpb.New(*(options.MaxCommitDelay))
}
res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata(), t.disableRouteToLeader), &sppb.CommitRequest{
Session: sid,
Transaction: &sppb.CommitRequest_TransactionId{
Expand All @@ -1540,6 +1553,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions
RequestOptions: createRequestOptions(t.txOpts.CommitPriority, "", t.txOpts.TransactionTag),
Mutations: mPb,
ReturnCommitStats: options.ReturnCommitStats,
MaxCommitDelay: maxCommitDelay,
}, gax.WithGRPCOptions(grpc.Header(&md)))
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "commit"); err != nil {
Expand Down
Loading