Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(spanner): Add changes in Spanner executor for testing end to end tracing #10450

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
665c719
Setup OTel and add tracecontext header in Spanner requests
nareshz May 17, 2024
c4c88c0
Add x-goog-spanner-end-to-end-tracing header for requests to SpanFE
nareshz May 21, 2024
167a6b3
Merge remote-tracking branch 'root/main' into e2e-trace-optin
nareshz May 21, 2024
68a0647
Merge remote-tracking branch 'root/main' into e2e-trace-optin
nareshz May 21, 2024
5a91d0f
resolve comments
nareshz May 21, 2024
8985992
enable opentelemetry
nareshz May 24, 2024
ce42792
test
nareshz May 27, 2024
7bc7ec5
Roots CA changes
nareshz May 28, 2024
c938f8d
test
nareshz May 28, 2024
aeaadb4
changes for correct trace
nareshz May 30, 2024
eb6e7ed
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz May 30, 2024
592a6de
Merge branch 'main' into e2e-trace-optin
rahul2393 Jun 5, 2024
e61838c
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 12, 2024
6e7ba8b
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 12, 2024
61aced3
Merge branch 'e2e-trace-optin' into systest-changes
nareshz Jun 12, 2024
b420890
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 27, 2024
7344e03
changes for end to end tracing feature testing in spanner executor
nareshz Jun 27, 2024
f01af3a
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jun 27, 2024
4ffd0aa
fix unwanted changes
nareshz Jun 27, 2024
7691d4e
minor fix
nareshz Jun 27, 2024
ae182e9
remove unnecessary log statements
nareshz Jun 27, 2024
96c9798
add spanner span check
nareshz Jun 27, 2024
787034d
add verification of spans at grpc layer
nareshz Jul 1, 2024
7e77935
Merge branch 'main' into systest-changes
harshachinta Jul 1, 2024
308066a
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Jul 22, 2024
689fcdb
resolve comments
nareshz Aug 8, 2024
2660d43
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 8, 2024
96644a0
rename end to end tracing to server side tracing
nareshz Aug 8, 2024
2b10bf6
minor change
nareshz Aug 8, 2024
b60ef9b
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 9, 2024
031a309
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 11, 2024
37fc4dc
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 13, 2024
352072a
resolve comments
nareshz Aug 20, 2024
f32dc98
add comment for server side tracing option
nareshz Aug 20, 2024
be209c3
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Aug 20, 2024
8f9c8a0
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 6, 2024
759decb
Merge branch 'main' into systest-changes
harshachinta Sep 10, 2024
6defac2
resolve comments
nareshz Sep 11, 2024
22fe619
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 11, 2024
0a9d2db
remove opt-in header changes from executor pr
nareshz Sep 11, 2024
a3b0e3e
Merge branch 'main' into systest-changes
harshachinta Sep 12, 2024
52138e7
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 17, 2024
97fe4e7
temp
nareshz Sep 17, 2024
27814c0
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Sep 30, 2024
d098d1f
changes as per the latest naming
nareshz Oct 9, 2024
e844659
Merge branch 'main' of https://github.com/googleapis/google-cloud-go …
nareshz Oct 9, 2024
ec1095f
remove unnecessary comment
nareshz Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spanner/test/cloudexecutor/executor/actions/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error {
return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action")))
}

client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...)
client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableServerSideTracing: true}, h.Options...)
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.tableMetadata = metadata

// TODO(harsha) where do I close the client? defer client.Close()
client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...)
client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableServerSideTracing: true}, h.Options...)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,55 @@ package executor

import (
"context"
"sync"

"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/inputstream"
"google.golang.org/api/option"
)

const MAX_CLOUD_TRACE_CHECK_LIMIT = 20

// CloudProxyServer holds the cloud executor server.
type CloudProxyServer struct {
serverContext context.Context
options []option.ClientOption
// members below should be set by the caller
serverContext context.Context
options []option.ClientOption
traceClientOptions []option.ClientOption
// members below represent internal state
mu sync.Mutex
cloudTraceCheckCount int
}

// NewCloudProxyServer initializes and returns a new CloudProxyServer instance.
func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption) (*CloudProxyServer, error) {
return &CloudProxyServer{serverContext: ctx, options: opts}, nil
func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption, traceClientOpts []option.ClientOption) (*CloudProxyServer, error) {
return &CloudProxyServer{serverContext: ctx, options: opts, traceClientOptions: traceClientOpts}, nil
}

// ExecuteActionAsync is implementation of ExecuteActionAsync in SpannerExecutorProxyServer. It's a
// streaming method in which client and server exchange SpannerActions and SpannerActionOutcomes.
func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) error {
handler := &inputstream.CloudStreamHandler{
Stream: inputStream,
ServerContext: s.serverContext,
Options: s.options,
Stream: inputStream,
ServerContext: s.serverContext,
Options: s.options,
TraceClientOptions: s.traceClientOptions,
}
func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT {
handler.CloudTraceCheckAllowed = true
}
}()

if err := handler.Execute(); err != nil {
return err
}
if handler.IsServerSideTraceCheckDone() {
s.mu.Lock()
defer s.mu.Unlock()
s.cloudTraceCheckCount++
nareshz marked this conversation as resolved.
Show resolved Hide resolved
}
return handler.Execute()
return nil
}
121 changes: 116 additions & 5 deletions spanner/test/cloudexecutor/executor/internal/inputstream/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ import (
"fmt"
"io"
"log"
"strings"
"sync"
"time"

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/actions"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
traceapiv1 "cloud.google.com/go/trace/apiv1"
"cloud.google.com/go/trace/apiv1/tracepb"
ottrace "go.opentelemetry.io/otel/trace"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -37,12 +43,21 @@ import (
// actions. It maintains a state associated with the request, such as current transaction.
type CloudStreamHandler struct {
// members below should be set by the caller
Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer
ServerContext context.Context
Options []option.ClientOption
Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer
ServerContext context.Context
Options []option.ClientOption
TraceClientOptions []option.ClientOption
CloudTraceCheckAllowed bool // indicates whether cloud trace checks can be performed
// members below represent internal state
executionFlowContext *actions.ExecutionFlowContext
mu sync.Mutex // protects mutable internal state
executionFlowContext *actions.ExecutionFlowContext
mu sync.Mutex // protects mutable internal state
serverSideTraceCheckDone bool // indicates whether checks are performed to verify server side tracing
}

// IsServerSideTraceCheckDone returns whether a check was done to verify if Spanner server
// side trace are generated or not.
func (h *CloudStreamHandler) IsServerSideTraceCheckDone() bool {
return h.serverSideTraceCheckDone
}

// Execute executes the given ExecuteActions request, blocking until it's done. It takes care of
Expand All @@ -69,6 +84,11 @@ func (h *CloudStreamHandler) Execute() error {
}()

ctx := context.Background()

// Create a top-level OpenTelemetry span for streaming request.
ctx = trace.StartSpan(ctx, "go_systest_execute_actions_stream")

readOrQueryActionPresent := false
// Main loop that receives and executes actions.
for {
req, err := h.Stream.Recv()
Expand All @@ -84,16 +104,57 @@ func (h *CloudStreamHandler) Execute() error {
log.Printf("Failed to receive request from client: %v", err)
return err
}
actionType := getActionType(req.Action)
if actionType == "Read" || actionType == "Query" {
readOrQueryActionPresent = true
}

if err = h.startHandlingRequest(ctx, req); err != nil {
log.Printf("Failed to handle request %v, Client ends the stream with error: %v", req, err)
// TODO(sriharshach): should we throw the error here instead of nil?
return nil
}
}
// End the top-level OpenTelemetry span.
trace.EndSpan(ctx, nil)
log.Println("Done executing actions")

// OpenTelemetry trace created for a streaming request will be verified using Cloud Trace APIs to make sure that
// Spanner server side tracing is working. Check will be performed only if there is atleast one action of type
// "Read" or "Query" in streaming request, trace is sampled and number of checks performed are less than the max limit.
if h.CloudTraceCheckAllowed && ottrace.SpanContextFromContext(ctx).IsSampled() && readOrQueryActionPresent {
h.serverSideTraceCheckDone = true
// Create a trace client to read the traces.
traceClient, err := traceapiv1.NewClient(ctx, h.TraceClientOptions...)
if err != nil {
return fmt.Errorf("Failed to create trace client: %v", err)
}
defer func() {
// Close the trace client.
log.Println("Closing the trace client")
traceClient.Close()
}()

// Verify the end to end trace exported to Cloud Trace.
traceId := ottrace.SpanContextFromContext(ctx).TraceID().String()
if err = verifyCloudTraceExportedTraces(ctx, traceClient, traceId); err != nil {
log.Printf("Verification failed for exported traces: %v", err)
return err
}
}
return nil
}

// getActionType returns the name of action type.
func getActionType(inputAction *executorpb.SpannerAction) string {
if inputAction == nil {
return ""
}
// Here action will have output as `*executorpb.SpannerAction_Query`.
action := fmt.Sprintf("%T", inputAction.GetAction())
return strings.TrimPrefix(action, "*executorpb.SpannerAction_")
}

// startHandlingRequest takes care of the given request. It picks an actionHandler and starts
// a goroutine in which that action will be executed.
func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *executorpb.SpannerAsyncActionRequest) error {
Expand All @@ -115,6 +176,11 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec
return outcomeSender.FinishWithError(err)
}

// Create a span for the systest action.
actionType := getActionType(inputAction)
ctx = trace.StartSpan(ctx, fmt.Sprintf("performaction_%v", actionType))
defer func() { trace.EndSpan(ctx, err) }()

// Create a channel to receive the error from the goroutine.
errCh := make(chan error, 1)
successCh := make(chan bool, 1)
Expand Down Expand Up @@ -142,6 +208,51 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec
}
}

// verifyCloudTraceExportedTraces fetches the traces exported from client application using
// Cloud Trace API to cross verify if end to end tracing is working or not.
func verifyCloudTraceExportedTraces(ctx context.Context, traceClient *traceapiv1.Client, traceId string) error {
if traceClient == nil {
return fmt.Errorf("trace client not found")
}
// Traces may not yet be exported, sleep for sufficient time before verifying end to end traces.
duration := 10 * time.Second
log.Printf("sleeping for %v seconds before verifying end to end trace", duration)
time.Sleep(duration)

log.Printf("start verification of exported cloud trace: trace_id:%s\n", traceId)
getTraceRequest := &tracepb.GetTraceRequest{
ProjectId: "spanner-cloud-systest",
TraceId: traceId,
}
resp, err := traceClient.GetTrace(ctx, getTraceRequest)
if err != nil {
log.Printf("failed to get trace_id:%v using GetTrace api: %v", traceId, err)
return err
}
// Check if gRPC layer trace spans are present. Span names in gRPC layer contain the name of called
// Spanner method.
grpcLayerSpanPresent := false
for _, span := range resp.Spans {
if strings.Contains(span.Name, "google.spanner.v1.Spanner") {
grpcLayerSpanPresent = true
}
}
if !grpcLayerSpanPresent {
// No gRPC spans mean no call was made to Spanner.
return nil
}
spannerLayerSpanPresent := false
for _, span := range resp.Spans {
if strings.HasPrefix(span.Name, "Spanner.") {
spannerLayerSpanPresent = true
}
}
if !spannerLayerSpanPresent {
return fmt.Errorf("no server side span found for trace_id: %v", traceId)
}
return nil
}

// newActionHandler instantiates an actionHandler for executing the given action.
func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (cloudActionHandler, error) {
if action.DatabasePath != "" {
Expand Down
87 changes: 81 additions & 6 deletions spanner/test/cloudexecutor/worker_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (

"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried about this dep since we are adding a direct dependency of OTEL SDK package to the main package. This will download SDK dep to customer.

Can you check if we can have a seperate dependency file for executor which now relies on main package dependency. I have done something similar for opentelemetry tests since it also had SDK dependency.

https://github.com/googleapis/google-cloud-go/tree/main/spanner/test/opentelemetry/test

"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
Expand All @@ -38,11 +42,14 @@ import (
)

var (
proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.")
spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.")
cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.")
serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.")
ipAddress = "127.0.0.1"
proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.")
spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.")
cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.")
rootCert = flag.String("root_cert", "", "Root certificate used for calls to Cloud Trace.")
serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.")
ipAddress = "127.0.0.1"
cloudTraceEndpoint = "staging-cloudtrace.sandbox.googleapis.com:443"
projectID = "spanner-cloud-systest"
)

func main() {
Expand All @@ -60,18 +67,34 @@ func main() {
if *cert == "" {
log.Fatalf("Certificate need to be assigned in order to start worker proxy.")
}
if *rootCert == "" {
log.Fatalf("Root certificate need to be assigned in order to start worker proxy.")
}

lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *proxyPort))
if err != nil {
log.Fatal(err)
}

ctx := context.Background()

// Enable opentelemetry tracing.
os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry")
// Set up OpenTelemetry tracing.
traceClientOpts := getClientOptionsForCloudTrace()
tp := getOpenTelemetryTracerProvider(ctx, traceClientOpts)
defer func() { _ = tp.Shutdown(ctx) }()

// Register the tracer provider and text map propagator(to propagate trace context) globally.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{})

// Create a new gRPC server
grpcServer := grpc.NewServer()

clientOptions := getClientOptionsForSysTests()
// Create a new cloud proxy server
cloudProxyServer, err := executor.NewCloudProxyServer(context.Background(), clientOptions)
cloudProxyServer, err := executor.NewCloudProxyServer(ctx, clientOptions, traceClientOpts)
if err != nil {
log.Fatalf("Creating Cloud Proxy Server failed: %v", err)
}
Expand All @@ -90,6 +113,49 @@ func main() {
}
}

// getOpenTelemetryTracerProvider sets up the OpenTelemetry by configuring exporter and sampler.
func getOpenTelemetryTracerProvider(ctx context.Context, traceClientOpts []option.ClientOption) *sdktrace.TracerProvider {
traceExporter, err := texporter.New(
texporter.WithContext(ctx),
texporter.WithTraceClientOptions(traceClientOpts),
texporter.WithProjectID(projectID),
)
if err != nil {
log.Fatalf("unable to set up tracing: %v", err)
}
return sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.01))),
)
}

// Constructs client options needed to interact with Cloud Trace APIs.
func getClientOptionsForCloudTrace() []option.ClientOption {
var traceClientOpts []option.ClientOption
traceClientOpts = append(traceClientOpts, option.WithEndpoint(cloudTraceEndpoint))
traceClientOpts = append(traceClientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(getRootCredentials())))

const (
cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform"
traceAppendScope = "https://www.googleapis.com/auth/trace.append"
traceReadScope = "https://www.googleapis.com/auth/trace.readonly"
)

log.Println("Reading service key file in executor code for cloud trace client")
cloudSystestCredentialsJSON, err := os.ReadFile(*serviceKeyFile)
if err != nil {
log.Fatal(err)
}
tokenSource, err := google.JWTAccessTokenSourceWithScope([]byte(cloudSystestCredentialsJSON), cloudPlatformScope, traceAppendScope, traceReadScope)
if err != nil {
log.Fatal(err)
}
traceClientOpts = append(traceClientOpts, option.WithTokenSource(tokenSource))
traceClientOpts = append(traceClientOpts, option.WithCredentialsFile(*serviceKeyFile))

return traceClientOpts
}

// Constructs client options needed to run executor for systests
func getClientOptionsForSysTests() []option.ClientOption {
var options []option.ClientOption
Expand Down Expand Up @@ -146,6 +212,15 @@ func getCredentials() credentials.TransportCredentials {
return creds
}

// Fetches the root credentials for rootCert file.
func getRootCredentials() credentials.TransportCredentials {
creds, err := credentials.NewClientTLSFromFile(*rootCert, "")
if err != nil {
log.Println(err)
}
return creds
}

// Constructs client options needed to run executor on local machine
func getClientOptionsForLocalTest() []option.ClientOption {
var options []option.ClientOption
Expand Down