Skip to content

Commit

Permalink
feat(datastore): Add read time from client while running queries (#11447
Browse files Browse the repository at this point in the history
)

* fix(datastore): Add read time from client to RunQueryRequest and RunAggregationQueryRequest

* handle transaction and client read time

* fix unit tests

* move common code to separate function
  • Loading branch information
bhshkh authored Jan 16, 2025
1 parent 6ffe32b commit bd1657f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 38 deletions.
8 changes: 6 additions & 2 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error)
}

var opts *pb.ReadOptions
if !c.readSettings.readTime.IsZero() {
if c.readSettings.readTimeExists() {
opts = &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
// Timestamp cannot be less than microseconds accuracy. See #6938
Expand Down Expand Up @@ -470,7 +470,7 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er
defer func() { trace.EndSpan(ctx, err) }()

var opts *pb.ReadOptions
if c.readSettings != nil && !c.readSettings.readTime.IsZero() {
if c.readSettings.readTimeExists() {
opts = &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
// Timestamp cannot be less than microseconds accuracy. See #6938
Expand Down Expand Up @@ -875,6 +875,10 @@ type readSettings struct {
readTime time.Time
}

func (rs *readSettings) readTimeExists() bool {
return rs != nil && !rs.readTime.IsZero()
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
// The client uses this value for subsequent reads, unless additional ReadOptions
Expand Down
91 changes: 84 additions & 7 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,52 @@ func TestIntegration_GetWithReadTime(t *testing.T) {
_ = client.Delete(ctx, k)
}

func TestIntegration_RunWithReadTime(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
defer cancel()
defer client.Close()

type RT struct {
TimeCreated time.Time
}

rt1 := RT{time.Now()}
k := NameKey("RT", "ReadTime", nil)

tx, err := client.NewTransaction(ctx)
if err != nil {
t.Fatal(err)
}

if _, err := tx.Put(k, &rt1); err != nil {
t.Fatalf("Transaction.Put: %v\n", err)
}

if _, err := tx.Commit(); err != nil {
t.Fatalf("Transaction.Commit: %v\n", err)
}

testutil.Retry(t, 5, time.Duration(10*time.Second), func(r *testutil.R) {
got := RT{}
tm := ReadTime(time.Now())

client.WithReadOptions(tm)

// If the Entity isn't available at the requested read time, we get
// a "datastore: no such entity" error. The ReadTime is otherwise not
// exposed in anyway in the response.
err = client.Get(ctx, k, &got)
client.Run(ctx, NewQuery("RT"))
if err != nil {
r.Errorf("client.Get: %v", err)
}
})

// Cleanup
_ = client.Delete(ctx, k)
}

func TestIntegration_TopLevelKeyLoaded(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
Expand Down Expand Up @@ -1225,6 +1271,8 @@ func TestIntegration_AggregationQueries(t *testing.T) {
client := newTestClient(ctx, t)
defer client.Close()

beforeCreate := time.Now().Truncate(time.Millisecond)

parent := NameKey("SQParent", keyPrefix+"AggregationQueries"+suffix, nil)
now := timeNow.Truncate(time.Millisecond).Unix()
children := []*SQChild{
Expand Down Expand Up @@ -1255,12 +1303,13 @@ func TestIntegration_AggregationQueries(t *testing.T) {
}()

testCases := []struct {
desc string
aggQuery *AggregationQuery
transactionOpts []TransactionOption
wantFailure bool
wantErrMsg string
wantAggResult AggregationResult
desc string
aggQuery *AggregationQuery
transactionOpts []TransactionOption
clientReadOptions []ReadOption
wantFailure bool
wantErrMsg string
wantAggResult AggregationResult
}{

{
Expand All @@ -1280,6 +1329,26 @@ func TestIntegration_AggregationQueries(t *testing.T) {
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}},
},
},
{
desc: "Count success before create with client read time",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Filter("I>=", 3).
NewAggregationQuery().
WithCount("count"),
clientReadOptions: []ReadOption{ReadTime(beforeCreate)},
wantAggResult: map[string]interface{}{
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}},
},
},
{
desc: "Count success after create with client read time",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Filter("I>=", 3).
NewAggregationQuery().
WithCount("count"),
clientReadOptions: []ReadOption{ReadTime(time.Now().Truncate(time.Millisecond))},
wantAggResult: map[string]interface{}{
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}},
},
},
{
desc: "Multiple aggregations",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
Expand Down Expand Up @@ -1352,8 +1421,16 @@ func TestIntegration_AggregationQueries(t *testing.T) {
}

for _, testCase := range testCases {
testClient := client
if testCase.clientReadOptions != nil {
clientWithReadTime := newTestClient(ctx, t)
clientWithReadTime.WithReadOptions(testCase.clientReadOptions...)
defer clientWithReadTime.Close()

testClient = clientWithReadTime
}
testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
gotAggResult, gotErr := client.RunAggregationQuery(ctx, testCase.aggQuery)
gotAggResult, gotErr := testClient.RunAggregationQuery(ctx, testCase.aggQuery)
gotFailure := gotErr != nil

if gotFailure != testCase.wantFailure ||
Expand Down
44 changes: 30 additions & 14 deletions datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"cloud.google.com/go/internal/protostruct"
"cloud.google.com/go/internal/trace"
"google.golang.org/api/iterator"
"google.golang.org/protobuf/types/known/timestamppb"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
)

Expand Down Expand Up @@ -501,13 +502,13 @@ func (q *Query) End(c Cursor) *Query {
}

// toRunQueryRequest converts the query to a protocol buffer.
func (q *Query) toRunQueryRequest(req *pb.RunQueryRequest) error {
func (q *Query) toRunQueryRequest(req *pb.RunQueryRequest, clientReadSettings *readSettings) error {
dst, err := q.toProto()
if err != nil {
return err
}

req.ReadOptions, err = parseQueryReadOptions(q.eventual, q.trans)
req.ReadOptions, err = parseQueryReadOptions(q.eventual, q.trans, clientReadSettings)
if err != nil {
return err
}
Expand Down Expand Up @@ -879,7 +880,7 @@ func (c *Client) run(ctx context.Context, q *Query, opts ...RunOption) *Iterator
t.req.ExplainOptions = runSettings.explainOptions
}

if err := q.toRunQueryRequest(t.req); err != nil {
if err := q.toRunQueryRequest(t.req, c.readSettings); err != nil {
t.err = err
}
return t
Expand Down Expand Up @@ -948,7 +949,7 @@ func (c *Client) RunAggregationQueryWithOptions(ctx context.Context, aq *Aggrega
defer txn.stateLockDeferUnlock()()
}

req.ReadOptions, err = parseQueryReadOptions(aq.query.eventual, txn)
req.ReadOptions, err = parseQueryReadOptions(aq.query.eventual, txn, c.readSettings)
if err != nil {
return ar, err
}
Expand Down Expand Up @@ -976,22 +977,30 @@ func (c *Client) RunAggregationQueryWithOptions(ctx context.Context, aq *Aggrega
return ar, nil
}

func validateReadOptions(eventual bool, t *Transaction) error {
if t == nil {
func validateReadOptions(eventual bool, t *Transaction, clientReadSettings *readSettings) error {
if !clientReadSettings.readTimeExists() {
if t == nil {
return nil
}
if eventual {
return errEventualConsistencyTransaction
}
if t.state == transactionStateExpired {
return errExpiredTransaction
}
return nil
}
if eventual {
return errEventualConsistencyTransaction
}
if t.state == transactionStateExpired {
return errExpiredTransaction

if t != nil || eventual {
return errEventualConsistencyTxnClientReadTime
}

return nil
}

// parseQueryReadOptions translates Query read options into protobuf format.
func parseQueryReadOptions(eventual bool, t *Transaction) (*pb.ReadOptions, error) {
err := validateReadOptions(eventual, t)
func parseQueryReadOptions(eventual bool, t *Transaction, clientReadSettings *readSettings) (*pb.ReadOptions, error) {
err := validateReadOptions(eventual, t, clientReadSettings)
if err != nil {
return nil, err
}
Expand All @@ -1004,6 +1013,13 @@ func parseQueryReadOptions(eventual bool, t *Transaction) (*pb.ReadOptions, erro
return &pb.ReadOptions{ConsistencyType: &pb.ReadOptions_ReadConsistency_{ReadConsistency: pb.ReadOptions_EVENTUAL}}, nil
}

if clientReadSettings.readTimeExists() {
return &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
ReadTime: timestamppb.New(clientReadSettings.readTime),
},
}, nil
}
return nil, nil
}

Expand Down Expand Up @@ -1117,7 +1133,7 @@ func (t *Iterator) nextBatch() error {
}

var err error
t.req.ReadOptions, err = parseQueryReadOptions(t.eventual, txn)
t.req.ReadOptions, err = parseQueryReadOptions(t.eventual, txn, t.client.readSettings)
if err != nil {
return err
}
Expand Down
63 changes: 50 additions & 13 deletions datastore/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"sort"
"strings"
"testing"
"time"

pb "cloud.google.com/go/datastore/apiv1/datastorepb"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand Down Expand Up @@ -723,16 +725,29 @@ func TestNamespaceQuery(t *testing.T) {
}
}

func TestReadOptions(t *testing.T) {
func TestToRunQueryRequest(t *testing.T) {
clientReadTime := time.Now()
tid := []byte{1}
for _, test := range []struct {
q *Query
crs *readSettings
want *pb.ReadOptions
}{
{
q: NewQuery(""),
want: nil,
},
{
q: NewQuery(""),
crs: &readSettings{
readTime: clientReadTime,
},
want: &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
ReadTime: timestamppb.New(clientReadTime),
},
},
},
{
q: NewQuery("").Transaction(nil),
want: nil,
Expand All @@ -755,20 +770,10 @@ func TestReadOptions(t *testing.T) {
},
} {
req := &pb.RunQueryRequest{}
if err := test.q.toRunQueryRequest(req); err != nil {
if err := test.q.toRunQueryRequest(req, test.crs); err != nil {
t.Fatalf("%+v: got %v, want no error", test.q, err)
}
}
// Test errors.
for _, q := range []*Query{
NewQuery("").Transaction(&Transaction{id: nil, state: transactionStateExpired}),
NewQuery("").Transaction(&Transaction{id: tid, state: transactionStateInProgress}).EventualConsistency(),
} {
req := &pb.RunQueryRequest{}
if err := q.toRunQueryRequest(req); err == nil {
t.Errorf("%+v: got nil, wanted error", q)
}
}
}

func TestInvalidFilters(t *testing.T) {
Expand Down Expand Up @@ -956,6 +961,7 @@ func TestValidateReadOptions(t *testing.T) {
desc string
eventual bool
trans *Transaction
crs *readSettings
wantErr error
}{
{
Expand Down Expand Up @@ -991,8 +997,39 @@ func TestValidateReadOptions(t *testing.T) {
desc: "No transaction in eventual query",
eventual: true,
},
{
desc: "Eventual query with client read time",
eventual: true,
crs: &readSettings{
readTime: time.Now(),
},
wantErr: errEventualConsistencyTxnClientReadTime,
},
{
desc: "Eventual query and transaction with client read time",
eventual: true,
crs: &readSettings{
readTime: time.Now(),
},
trans: &Transaction{
id: []byte("test id"),
state: transactionStateInProgress,
},
wantErr: errEventualConsistencyTxnClientReadTime,
},
{
desc: "Transaction with client read time",
crs: &readSettings{
readTime: time.Now(),
},
trans: &Transaction{
id: []byte("test id"),
state: transactionStateInProgress,
},
wantErr: errEventualConsistencyTxnClientReadTime,
},
} {
gotErr := validateReadOptions(test.eventual, test.trans)
gotErr := validateReadOptions(test.eventual, test.trans, test.crs)
gotErrMsg := ""
if gotErr != nil {
gotErrMsg = gotErr.Error()
Expand Down
Loading

0 comments on commit bd1657f

Please sign in to comment.