From bd1657f7f3d1bc82af7bf504ca23c5095812403b Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 15 Jan 2025 18:32:36 -0800 Subject: [PATCH] feat(datastore): Add read time from client while running queries (#11447) * fix(datastore): Add read time from client to RunQueryRequest and RunAggregationQueryRequest * handle transaction and client read time * fix unit tests * move common code to separate function --- datastore/datastore.go | 8 ++- datastore/integration_test.go | 91 ++++++++++++++++++++++++++++++++--- datastore/query.go | 44 +++++++++++------ datastore/query_test.go | 63 +++++++++++++++++++----- datastore/transaction.go | 10 +++- 5 files changed, 178 insertions(+), 38 deletions(-) diff --git a/datastore/datastore.go b/datastore/datastore.go index 4ee71cc18b04..64aef7a1aa65 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -435,7 +435,7 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error) } var opts *pb.ReadOptions - if !c.readSettings.readTime.IsZero() { + if c.readSettings.readTimeExists() { opts = &pb.ReadOptions{ ConsistencyType: &pb.ReadOptions_ReadTime{ // Timestamp cannot be less than microseconds accuracy. See #6938 @@ -470,7 +470,7 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er defer func() { trace.EndSpan(ctx, err) }() var opts *pb.ReadOptions - if c.readSettings != nil && !c.readSettings.readTime.IsZero() { + if c.readSettings.readTimeExists() { opts = &pb.ReadOptions{ ConsistencyType: &pb.ReadOptions_ReadTime{ // Timestamp cannot be less than microseconds accuracy. See #6938 @@ -875,6 +875,10 @@ type readSettings struct { readTime time.Time } +func (rs *readSettings) readTimeExists() bool { + return rs != nil && !rs.readTime.IsZero() +} + // WithReadOptions specifies constraints for accessing documents from the database, // e.g. at what time snapshot to read the documents. // The client uses this value for subsequent reads, unless additional ReadOptions diff --git a/datastore/integration_test.go b/datastore/integration_test.go index d2ebd836b82e..b93d7f3dc8e9 100644 --- a/datastore/integration_test.go +++ b/datastore/integration_test.go @@ -435,6 +435,52 @@ func TestIntegration_GetWithReadTime(t *testing.T) { _ = client.Delete(ctx, k) } +func TestIntegration_RunWithReadTime(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + client := newTestClient(ctx, t) + defer cancel() + defer client.Close() + + type RT struct { + TimeCreated time.Time + } + + rt1 := RT{time.Now()} + k := NameKey("RT", "ReadTime", nil) + + tx, err := client.NewTransaction(ctx) + if err != nil { + t.Fatal(err) + } + + if _, err := tx.Put(k, &rt1); err != nil { + t.Fatalf("Transaction.Put: %v\n", err) + } + + if _, err := tx.Commit(); err != nil { + t.Fatalf("Transaction.Commit: %v\n", err) + } + + testutil.Retry(t, 5, time.Duration(10*time.Second), func(r *testutil.R) { + got := RT{} + tm := ReadTime(time.Now()) + + client.WithReadOptions(tm) + + // If the Entity isn't available at the requested read time, we get + // a "datastore: no such entity" error. The ReadTime is otherwise not + // exposed in anyway in the response. + err = client.Get(ctx, k, &got) + client.Run(ctx, NewQuery("RT")) + if err != nil { + r.Errorf("client.Get: %v", err) + } + }) + + // Cleanup + _ = client.Delete(ctx, k) +} + func TestIntegration_TopLevelKeyLoaded(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() @@ -1225,6 +1271,8 @@ func TestIntegration_AggregationQueries(t *testing.T) { client := newTestClient(ctx, t) defer client.Close() + beforeCreate := time.Now().Truncate(time.Millisecond) + parent := NameKey("SQParent", keyPrefix+"AggregationQueries"+suffix, nil) now := timeNow.Truncate(time.Millisecond).Unix() children := []*SQChild{ @@ -1255,12 +1303,13 @@ func TestIntegration_AggregationQueries(t *testing.T) { }() testCases := []struct { - desc string - aggQuery *AggregationQuery - transactionOpts []TransactionOption - wantFailure bool - wantErrMsg string - wantAggResult AggregationResult + desc string + aggQuery *AggregationQuery + transactionOpts []TransactionOption + clientReadOptions []ReadOption + wantFailure bool + wantErrMsg string + wantAggResult AggregationResult }{ { @@ -1280,6 +1329,26 @@ func TestIntegration_AggregationQueries(t *testing.T) { "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}}, }, }, + { + desc: "Count success before create with client read time", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Filter("I>=", 3). + NewAggregationQuery(). + WithCount("count"), + clientReadOptions: []ReadOption{ReadTime(beforeCreate)}, + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, + }, + }, + { + desc: "Count success after create with client read time", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Filter("I>=", 3). + NewAggregationQuery(). + WithCount("count"), + clientReadOptions: []ReadOption{ReadTime(time.Now().Truncate(time.Millisecond))}, + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}}, + }, + }, { desc: "Multiple aggregations", aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). @@ -1352,8 +1421,16 @@ func TestIntegration_AggregationQueries(t *testing.T) { } for _, testCase := range testCases { + testClient := client + if testCase.clientReadOptions != nil { + clientWithReadTime := newTestClient(ctx, t) + clientWithReadTime.WithReadOptions(testCase.clientReadOptions...) + defer clientWithReadTime.Close() + + testClient = clientWithReadTime + } testutil.Retry(t, 10, time.Second, func(r *testutil.R) { - gotAggResult, gotErr := client.RunAggregationQuery(ctx, testCase.aggQuery) + gotAggResult, gotErr := testClient.RunAggregationQuery(ctx, testCase.aggQuery) gotFailure := gotErr != nil if gotFailure != testCase.wantFailure || diff --git a/datastore/query.go b/datastore/query.go index f5dd66d81236..0b241c81ca2e 100644 --- a/datastore/query.go +++ b/datastore/query.go @@ -29,6 +29,7 @@ import ( "cloud.google.com/go/internal/protostruct" "cloud.google.com/go/internal/trace" "google.golang.org/api/iterator" + "google.golang.org/protobuf/types/known/timestamppb" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -501,13 +502,13 @@ func (q *Query) End(c Cursor) *Query { } // toRunQueryRequest converts the query to a protocol buffer. -func (q *Query) toRunQueryRequest(req *pb.RunQueryRequest) error { +func (q *Query) toRunQueryRequest(req *pb.RunQueryRequest, clientReadSettings *readSettings) error { dst, err := q.toProto() if err != nil { return err } - req.ReadOptions, err = parseQueryReadOptions(q.eventual, q.trans) + req.ReadOptions, err = parseQueryReadOptions(q.eventual, q.trans, clientReadSettings) if err != nil { return err } @@ -879,7 +880,7 @@ func (c *Client) run(ctx context.Context, q *Query, opts ...RunOption) *Iterator t.req.ExplainOptions = runSettings.explainOptions } - if err := q.toRunQueryRequest(t.req); err != nil { + if err := q.toRunQueryRequest(t.req, c.readSettings); err != nil { t.err = err } return t @@ -948,7 +949,7 @@ func (c *Client) RunAggregationQueryWithOptions(ctx context.Context, aq *Aggrega defer txn.stateLockDeferUnlock()() } - req.ReadOptions, err = parseQueryReadOptions(aq.query.eventual, txn) + req.ReadOptions, err = parseQueryReadOptions(aq.query.eventual, txn, c.readSettings) if err != nil { return ar, err } @@ -976,22 +977,30 @@ func (c *Client) RunAggregationQueryWithOptions(ctx context.Context, aq *Aggrega return ar, nil } -func validateReadOptions(eventual bool, t *Transaction) error { - if t == nil { +func validateReadOptions(eventual bool, t *Transaction, clientReadSettings *readSettings) error { + if !clientReadSettings.readTimeExists() { + if t == nil { + return nil + } + if eventual { + return errEventualConsistencyTransaction + } + if t.state == transactionStateExpired { + return errExpiredTransaction + } return nil } - if eventual { - return errEventualConsistencyTransaction - } - if t.state == transactionStateExpired { - return errExpiredTransaction + + if t != nil || eventual { + return errEventualConsistencyTxnClientReadTime } + return nil } // parseQueryReadOptions translates Query read options into protobuf format. -func parseQueryReadOptions(eventual bool, t *Transaction) (*pb.ReadOptions, error) { - err := validateReadOptions(eventual, t) +func parseQueryReadOptions(eventual bool, t *Transaction, clientReadSettings *readSettings) (*pb.ReadOptions, error) { + err := validateReadOptions(eventual, t, clientReadSettings) if err != nil { return nil, err } @@ -1004,6 +1013,13 @@ func parseQueryReadOptions(eventual bool, t *Transaction) (*pb.ReadOptions, erro return &pb.ReadOptions{ConsistencyType: &pb.ReadOptions_ReadConsistency_{ReadConsistency: pb.ReadOptions_EVENTUAL}}, nil } + if clientReadSettings.readTimeExists() { + return &pb.ReadOptions{ + ConsistencyType: &pb.ReadOptions_ReadTime{ + ReadTime: timestamppb.New(clientReadSettings.readTime), + }, + }, nil + } return nil, nil } @@ -1117,7 +1133,7 @@ func (t *Iterator) nextBatch() error { } var err error - t.req.ReadOptions, err = parseQueryReadOptions(t.eventual, txn) + t.req.ReadOptions, err = parseQueryReadOptions(t.eventual, txn, t.client.readSettings) if err != nil { return err } diff --git a/datastore/query_test.go b/datastore/query_test.go index 414583c7acdc..f3ef6b720def 100644 --- a/datastore/query_test.go +++ b/datastore/query_test.go @@ -22,12 +22,14 @@ import ( "sort" "strings" "testing" + "time" pb "cloud.google.com/go/datastore/apiv1/datastorepb" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" ) var ( @@ -723,16 +725,29 @@ func TestNamespaceQuery(t *testing.T) { } } -func TestReadOptions(t *testing.T) { +func TestToRunQueryRequest(t *testing.T) { + clientReadTime := time.Now() tid := []byte{1} for _, test := range []struct { q *Query + crs *readSettings want *pb.ReadOptions }{ { q: NewQuery(""), want: nil, }, + { + q: NewQuery(""), + crs: &readSettings{ + readTime: clientReadTime, + }, + want: &pb.ReadOptions{ + ConsistencyType: &pb.ReadOptions_ReadTime{ + ReadTime: timestamppb.New(clientReadTime), + }, + }, + }, { q: NewQuery("").Transaction(nil), want: nil, @@ -755,20 +770,10 @@ func TestReadOptions(t *testing.T) { }, } { req := &pb.RunQueryRequest{} - if err := test.q.toRunQueryRequest(req); err != nil { + if err := test.q.toRunQueryRequest(req, test.crs); err != nil { t.Fatalf("%+v: got %v, want no error", test.q, err) } } - // Test errors. - for _, q := range []*Query{ - NewQuery("").Transaction(&Transaction{id: nil, state: transactionStateExpired}), - NewQuery("").Transaction(&Transaction{id: tid, state: transactionStateInProgress}).EventualConsistency(), - } { - req := &pb.RunQueryRequest{} - if err := q.toRunQueryRequest(req); err == nil { - t.Errorf("%+v: got nil, wanted error", q) - } - } } func TestInvalidFilters(t *testing.T) { @@ -956,6 +961,7 @@ func TestValidateReadOptions(t *testing.T) { desc string eventual bool trans *Transaction + crs *readSettings wantErr error }{ { @@ -991,8 +997,39 @@ func TestValidateReadOptions(t *testing.T) { desc: "No transaction in eventual query", eventual: true, }, + { + desc: "Eventual query with client read time", + eventual: true, + crs: &readSettings{ + readTime: time.Now(), + }, + wantErr: errEventualConsistencyTxnClientReadTime, + }, + { + desc: "Eventual query and transaction with client read time", + eventual: true, + crs: &readSettings{ + readTime: time.Now(), + }, + trans: &Transaction{ + id: []byte("test id"), + state: transactionStateInProgress, + }, + wantErr: errEventualConsistencyTxnClientReadTime, + }, + { + desc: "Transaction with client read time", + crs: &readSettings{ + readTime: time.Now(), + }, + trans: &Transaction{ + id: []byte("test id"), + state: transactionStateInProgress, + }, + wantErr: errEventualConsistencyTxnClientReadTime, + }, } { - gotErr := validateReadOptions(test.eventual, test.trans) + gotErr := validateReadOptions(test.eventual, test.trans, test.crs) gotErrMsg := "" if gotErr != nil { gotErrMsg = gotErr.Error() diff --git a/datastore/transaction.go b/datastore/transaction.go index 2f86e1506973..e835291aa44d 100644 --- a/datastore/transaction.go +++ b/datastore/transaction.go @@ -35,8 +35,10 @@ const maxIndividualReqTxnRetry = 5 var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction") var ( - errExpiredTransaction = errors.New("datastore: transaction expired") - errEventualConsistencyTransaction = errors.New("datastore: cannot use EventualConsistency query in a transaction") + errExpiredTransaction = errors.New("datastore: transaction expired") + errEventualConsistencyTransaction = errors.New("datastore: cannot use EventualConsistency query in a transaction") + errEventualConsistencyTxnClientReadTime = errors.New("datastore: cannot use EventualConsistency query when read time is specified on client or query is in a transaction") + errTxnClientReadTime = errors.New("datastore: cannot use query in a transaction when read time is specified on client") txnBackoff = gax.Backoff{ Initial: 20 * time.Millisecond, @@ -542,6 +544,10 @@ func (t *Transaction) Rollback() (err error) { } func (t *Transaction) parseReadOptions() (*pb.ReadOptions, error) { + if t.client != nil && t.client.readSettings.readTimeExists() { + return nil, errTxnClientReadTime + } + var opts *pb.ReadOptions switch t.state { case transactionStateExpired: