Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(spanner): Add changes in Spanner executor for testing end to end tracing #10450

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
665c719
Setup OTel and add tracecontext header in Spanner requests
nareshz May 17, 2024
c4c88c0
Add x-goog-spanner-end-to-end-tracing header for requests to SpanFE
nareshz May 21, 2024
167a6b3
Merge remote-tracking branch 'root/main' into e2e-trace-optin
nareshz May 21, 2024
68a0647
Merge remote-tracking branch 'root/main' into e2e-trace-optin
nareshz May 21, 2024
5a91d0f
resolve comments
nareshz May 21, 2024
8985992
enable opentelemetry
nareshz May 24, 2024
ce42792
test
nareshz May 27, 2024
7bc7ec5
Roots CA changes
nareshz May 28, 2024
c938f8d
test
nareshz May 28, 2024
aeaadb4
changes for correct trace
nareshz May 30, 2024
eb6e7ed
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz May 30, 2024
592a6de
Merge branch 'main' into e2e-trace-optin
rahul2393 Jun 5, 2024
e61838c
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 12, 2024
6e7ba8b
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 12, 2024
61aced3
Merge branch 'e2e-trace-optin' into systest-changes
nareshz Jun 12, 2024
b420890
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 27, 2024
7344e03
changes for end to end tracing feature testing in spanner executor
nareshz Jun 27, 2024
f01af3a
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 27, 2024
4ffd0aa
fix unwanted changes
nareshz Jun 27, 2024
7691d4e
minor fix
nareshz Jun 27, 2024
ae182e9
remove unnecessary log statements
nareshz Jun 27, 2024
96c9798
add spanner span check
nareshz Jun 27, 2024
787034d
add verification of spans at grpc layer
nareshz Jul 1, 2024
7e77935
Merge branch 'main' into systest-changes
harshachinta Jul 1, 2024
308066a
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jul 22, 2024
689fcdb
resolve comments
nareshz Aug 8, 2024
2660d43
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 8, 2024
96644a0
rename end to end tracing to server side tracing
nareshz Aug 8, 2024
2b10bf6
minor change
nareshz Aug 8, 2024
b60ef9b
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 9, 2024
031a309
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 11, 2024
37fc4dc
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 13, 2024
352072a
resolve comments
nareshz Aug 20, 2024
f32dc98
add comment for server side tracing option
nareshz Aug 20, 2024
be209c3
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 20, 2024
8f9c8a0
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 6, 2024
759decb
Merge branch 'main' into systest-changes
harshachinta Sep 10, 2024
6defac2
resolve comments
nareshz Sep 11, 2024
22fe619
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 11, 2024
0a9d2db
remove opt-in header changes from executor pr
nareshz Sep 11, 2024
a3b0e3e
Merge branch 'main' into systest-changes
harshachinta Sep 12, 2024
52138e7
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 17, 2024
97fe4e7
temp
nareshz Sep 17, 2024
27814c0
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 30, 2024
d098d1f
changes as per the latest naming
nareshz Oct 9, 2024
e844659
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Oct 9, 2024
ec1095f
remove unnecessary comment
nareshz Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ const (

requestsCompressionHeader = "x-response-encoding"

// endToEndTracingHeader is the name of the metadata header if client
// has opted-in for the creation of trace spans on the Spanner layer.
endToEndTracingHeader = "x-goog-spanner-end-to-end-tracing"

// numChannels is the default value for NumChannels of client.
numChannels = 4
)
Expand Down Expand Up @@ -331,6 +335,12 @@ type ClientConfig struct {
DirectedReadOptions *sppb.DirectedReadOptions

OpenTelemetryMeterProvider metric.MeterProvider

// EnableEndToEndTracing indicates whether end to end tracing is enabled or not.
// If it is enabled, trace spans will be created at Spanner layer.
//
// Default: false
EnableEndToEndTracing bool
}

type openTelemetryConfig struct {
Expand Down Expand Up @@ -444,6 +454,9 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
if config.Compression == gzip.Name {
md.Append(requestsCompressionHeader, gzip.Name)
}
if config.EnableEndToEndTracing {
md.Append(endToEndTracingHeader, "true")
}

// Create a session client.
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions)
Expand Down
46 changes: 46 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4059,6 +4059,52 @@ func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T)
}
}

func TestClient_WithEndToEndTracingHeader(t *testing.T) {
t.Parallel()

server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

wantEndToEndTracing := true
config := ClientConfig{EnableEndToEndTracing: wantEndToEndTracing}
client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...)
if err != nil {
t.Fatalf("failed to get a client: %v", err)
}
gotEndToEndTracing := false
for _, val := range client.sc.md.Get(endToEndTracingHeader) {
if val == "true" {
gotEndToEndTracing = true
}
}
if gotEndToEndTracing != wantEndToEndTracing {
t.Fatalf("mismatch in client configuration for property EnableEndToEndTracing: got %v, want %v", gotEndToEndTracing, wantEndToEndTracing)
}
}

func TestClient_WithoutEndToEndTracingHeader(t *testing.T) {
t.Parallel()

server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

wantEndToEndTracing := false
config := ClientConfig{EnableEndToEndTracing: wantEndToEndTracing}
client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...)
if err != nil {
t.Fatalf("failed to get a client: %v", err)
}
gotEndToEndTracing := false
for _, val := range client.sc.md.Get(endToEndTracingHeader) {
if val == "true" {
gotEndToEndTracing = true
}
}
if gotEndToEndTracing != wantEndToEndTracing {
t.Fatalf("mismatch in client configuration for property EnableEndToEndTracing: got %v, want %v", gotEndToEndTracing, wantEndToEndTracing)
}
}

func TestClient_WithCustomBatchTimeout(t *testing.T) {
t.Parallel()

Expand Down
18 changes: 13 additions & 5 deletions spanner/test/cloudexecutor/executor/actions/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
trace "cloud.google.com/go/trace/apiv1"
"google.golang.org/api/option"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
Expand All @@ -31,10 +32,11 @@ import (

// StartBatchTxnHandler holds the necessary components and options required for start batch transaction action.
type StartBatchTxnHandler struct {
Action *executorpb.StartBatchTransactionAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
Options []option.ClientOption
Action *executorpb.StartBatchTransactionAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
Options []option.ClientOption
TraceClientOptions []option.ClientOption
nareshz marked this conversation as resolved.
Show resolved Hide resolved
}

// ExecuteAction that starts a batch transaction
Expand All @@ -50,10 +52,16 @@ func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error {
return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action")))
}

client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...)
client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
// Create a trace client to read the traces from Cloud Trace.
traceClient, err := trace.NewClient(ctx, h.TraceClientOptions...)
nareshz marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.FlowContext.TraceClient = traceClient
var txn *spanner.BatchReadOnlyTransaction
if h.Action.GetBatchTxnTime() != nil {
timestamp := time.Unix(h.Action.GetBatchTxnTime().Seconds, int64(h.Action.GetBatchTxnTime().Nanos))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
trace "cloud.google.com/go/trace/apiv1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -49,6 +50,7 @@ type ExecutionFlowContext struct {
roTxn *spanner.ReadOnlyTransaction // Current read-only transaction
batchTxn *spanner.BatchReadOnlyTransaction // Current batch read-only transaction
DbClient *spanner.Client // Current database client
TraceClient *trace.Client // Current trace client
tableMetadata *utility.TableMetadataHelper // If in a txn (except batch), this has metadata info about table columns
numPendingReads int64 // Number of pending read/query actions.
readAborted bool // Indicate whether there's a read/query action got aborted and the transaction need to be reset.
Expand Down
18 changes: 13 additions & 5 deletions spanner/test/cloudexecutor/executor/actions/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
trace "cloud.google.com/go/trace/apiv1"
"google.golang.org/api/option"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
Expand All @@ -33,10 +34,11 @@ import (

// StartTxnHandler holds the necessary components and options required for start transaction action.
type StartTxnHandler struct {
Action *executorpb.StartTransactionAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
Options []option.ClientOption
Action *executorpb.StartTransactionAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
Options []option.ClientOption
TraceClientOptions []option.ClientOption
}

// ExecuteAction that starts a read-write or read-only transaction.
Expand All @@ -54,11 +56,17 @@ func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.tableMetadata = metadata

// TODO(harsha) where do I close the client? defer client.Close()
client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...)
client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.FlowContext.DbClient = client
// Create a trace client to read the traces.
traceClient, err := trace.NewClient(ctx, h.TraceClientOptions...)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.FlowContext.TraceClient = traceClient
if h.FlowContext.isTransactionActiveLocked() {
return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "already in a transaction")))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,36 @@ import (
"google.golang.org/api/option"
)

const MAX_CLOUD_TRACE_CHECK_LIMIT = 10

// CloudProxyServer holds the cloud executor server.
type CloudProxyServer struct {
serverContext context.Context
options []option.ClientOption
// members below should be set by the caller
serverContext context.Context
options []option.ClientOption
traceClientOptions []option.ClientOption
// members below represent internal state
cloudTraceCheckCount int
}

// NewCloudProxyServer initializes and returns a new CloudProxyServer instance.
func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption) (*CloudProxyServer, error) {
return &CloudProxyServer{serverContext: ctx, options: opts}, nil
func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption, traceClientOpts []option.ClientOption) (*CloudProxyServer, error) {
return &CloudProxyServer{serverContext: ctx, options: opts, traceClientOptions: traceClientOpts, cloudTraceCheckCount: 0}, nil
nareshz marked this conversation as resolved.
Show resolved Hide resolved
}

// ExecuteActionAsync is implementation of ExecuteActionAsync in SpannerExecutorProxyServer. It's a
// streaming method in which client and server exchange SpannerActions and SpannerActionOutcomes.
func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) error {
handler := &inputstream.CloudStreamHandler{
Stream: inputStream,
ServerContext: s.serverContext,
Options: s.options,
Stream: inputStream,
ServerContext: s.serverContext,
Options: s.options,
TraceClientOptions: s.traceClientOptions,
CloudTraceCheckAllowed: (s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT),
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
}
if err := handler.Execute(); err != nil {
return err
}
return handler.Execute()
s.cloudTraceCheckCount += handler.GetCompletedCloudTraceCheckCount()
return nil
}
Loading
Loading