diff --git a/spanner/test/cloudexecutor/executor/actions/batch.go b/spanner/test/cloudexecutor/executor/actions/batch.go index cbfe464981f4..00129117c944 100644 --- a/spanner/test/cloudexecutor/executor/actions/batch.go +++ b/spanner/test/cloudexecutor/executor/actions/batch.go @@ -50,7 +50,7 @@ func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error { return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action"))) } - client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) if err != nil { return h.OutcomeSender.FinishWithError(err) } diff --git a/spanner/test/cloudexecutor/executor/actions/transaction.go b/spanner/test/cloudexecutor/executor/actions/transaction.go index dc5ae5244be4..ff9ab5aa18f7 100644 --- a/spanner/test/cloudexecutor/executor/actions/transaction.go +++ b/spanner/test/cloudexecutor/executor/actions/transaction.go @@ -54,7 +54,7 @@ func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error { h.FlowContext.tableMetadata = metadata // TODO(harsha) where do I close the client? defer client.Close() - client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) if err != nil { return h.OutcomeSender.FinishWithError(err) } diff --git a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go index 25e4da6c79e2..452ca85e4fdb 100644 --- a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go +++ b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go @@ -19,30 +19,55 @@ package executor import ( "context" + "sync" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/inputstream" "google.golang.org/api/option" ) +const MAX_CLOUD_TRACE_CHECK_LIMIT = 20 + // CloudProxyServer holds the cloud executor server. type CloudProxyServer struct { - serverContext context.Context - options []option.ClientOption + // members below should be set by the caller + serverContext context.Context + options []option.ClientOption + traceClientOptions []option.ClientOption + // members below represent internal state + mu sync.Mutex + cloudTraceCheckCount int } // NewCloudProxyServer initializes and returns a new CloudProxyServer instance. -func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption) (*CloudProxyServer, error) { - return &CloudProxyServer{serverContext: ctx, options: opts}, nil +func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption, traceClientOpts []option.ClientOption) (*CloudProxyServer, error) { + return &CloudProxyServer{serverContext: ctx, options: opts, traceClientOptions: traceClientOpts}, nil } // ExecuteActionAsync is implementation of ExecuteActionAsync in SpannerExecutorProxyServer. It's a // streaming method in which client and server exchange SpannerActions and SpannerActionOutcomes. func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) error { handler := &inputstream.CloudStreamHandler{ - Stream: inputStream, - ServerContext: s.serverContext, - Options: s.options, + Stream: inputStream, + ServerContext: s.serverContext, + Options: s.options, + TraceClientOptions: s.traceClientOptions, + } + func() { + s.mu.Lock() + defer s.mu.Unlock() + if s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT { + handler.CloudTraceCheckAllowed = true + } + }() + + if err := handler.Execute(); err != nil { + return err + } + if handler.IsServerSideTraceCheckDone() { + s.mu.Lock() + defer s.mu.Unlock() + s.cloudTraceCheckCount++ } - return handler.Execute() + return nil } diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index f2c74f374f8b..92836997b06a 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -22,12 +22,18 @@ import ( "fmt" "io" "log" + "strings" "sync" + "time" + "cloud.google.com/go/internal/trace" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/actions" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + traceapiv1 "cloud.google.com/go/trace/apiv1" + "cloud.google.com/go/trace/apiv1/tracepb" + ottrace "go.opentelemetry.io/otel/trace" "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -37,12 +43,21 @@ import ( // actions. It maintains a state associated with the request, such as current transaction. type CloudStreamHandler struct { // members below should be set by the caller - Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer - ServerContext context.Context - Options []option.ClientOption + Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer + ServerContext context.Context + Options []option.ClientOption + TraceClientOptions []option.ClientOption + CloudTraceCheckAllowed bool // indicates whether cloud trace checks can be performed // members below represent internal state - executionFlowContext *actions.ExecutionFlowContext - mu sync.Mutex // protects mutable internal state + executionFlowContext *actions.ExecutionFlowContext + mu sync.Mutex // protects mutable internal state + serverSideTraceCheckDone bool // indicates whether checks are performed to verify server side tracing +} + +// IsServerSideTraceCheckDone returns whether a check was done to verify if Spanner server +// side trace are generated or not. +func (h *CloudStreamHandler) IsServerSideTraceCheckDone() bool { + return h.serverSideTraceCheckDone } // Execute executes the given ExecuteActions request, blocking until it's done. It takes care of @@ -69,6 +84,11 @@ func (h *CloudStreamHandler) Execute() error { }() ctx := context.Background() + + // Create a top-level OpenTelemetry span for streaming request. + ctx = trace.StartSpan(ctx, "go_systest_execute_actions_stream") + + readOrQueryActionPresent := false // Main loop that receives and executes actions. for { req, err := h.Stream.Recv() @@ -84,16 +104,57 @@ func (h *CloudStreamHandler) Execute() error { log.Printf("Failed to receive request from client: %v", err) return err } + actionType := getActionType(req.Action) + if actionType == "Read" || actionType == "Query" { + readOrQueryActionPresent = true + } + if err = h.startHandlingRequest(ctx, req); err != nil { log.Printf("Failed to handle request %v, Client ends the stream with error: %v", req, err) // TODO(sriharshach): should we throw the error here instead of nil? return nil } } + // End the top-level OpenTelemetry span. + trace.EndSpan(ctx, nil) log.Println("Done executing actions") + + // OpenTelemetry trace created for a streaming request will be verified using Cloud Trace APIs to make sure that + // Spanner server side tracing is working. Check will be performed only if there is atleast one action of type + // "Read" or "Query" in streaming request, trace is sampled and number of checks performed are less than the max limit. + if h.CloudTraceCheckAllowed && ottrace.SpanContextFromContext(ctx).IsSampled() && readOrQueryActionPresent { + h.serverSideTraceCheckDone = true + // Create a trace client to read the traces. + traceClient, err := traceapiv1.NewClient(ctx, h.TraceClientOptions...) + if err != nil { + return fmt.Errorf("Failed to create trace client: %v", err) + } + defer func() { + // Close the trace client. + log.Println("Closing the trace client") + traceClient.Close() + }() + + // Verify the end to end trace exported to Cloud Trace. + traceId := ottrace.SpanContextFromContext(ctx).TraceID().String() + if err = verifyCloudTraceExportedTraces(ctx, traceClient, traceId); err != nil { + log.Printf("Verification failed for exported traces: %v", err) + return err + } + } return nil } +// getActionType returns the name of action type. +func getActionType(inputAction *executorpb.SpannerAction) string { + if inputAction == nil { + return "" + } + // Here action will have output as `*executorpb.SpannerAction_Query`. + action := fmt.Sprintf("%T", inputAction.GetAction()) + return strings.TrimPrefix(action, "*executorpb.SpannerAction_") +} + // startHandlingRequest takes care of the given request. It picks an actionHandler and starts // a goroutine in which that action will be executed. func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *executorpb.SpannerAsyncActionRequest) error { @@ -115,6 +176,11 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec return outcomeSender.FinishWithError(err) } + // Create a span for the systest action. + actionType := getActionType(inputAction) + ctx = trace.StartSpan(ctx, fmt.Sprintf("performaction_%v", actionType)) + defer func() { trace.EndSpan(ctx, err) }() + // Create a channel to receive the error from the goroutine. errCh := make(chan error, 1) successCh := make(chan bool, 1) @@ -142,6 +208,51 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } } +// verifyCloudTraceExportedTraces fetches the traces exported from client application using +// Cloud Trace API to cross verify if end to end tracing is working or not. +func verifyCloudTraceExportedTraces(ctx context.Context, traceClient *traceapiv1.Client, traceId string) error { + if traceClient == nil { + return fmt.Errorf("trace client not found") + } + // Traces may not yet be exported, sleep for sufficient time before verifying end to end traces. + duration := 10 * time.Second + log.Printf("sleeping for %v seconds before verifying end to end trace", duration) + time.Sleep(duration) + + log.Printf("start verification of exported cloud trace: trace_id:%s\n", traceId) + getTraceRequest := &tracepb.GetTraceRequest{ + ProjectId: "spanner-cloud-systest", + TraceId: traceId, + } + resp, err := traceClient.GetTrace(ctx, getTraceRequest) + if err != nil { + log.Printf("failed to get trace_id:%v using GetTrace api: %v", traceId, err) + return err + } + // Check if gRPC layer trace spans are present. Span names in gRPC layer contain the name of called + // Spanner method. + grpcLayerSpanPresent := false + for _, span := range resp.Spans { + if strings.Contains(span.Name, "google.spanner.v1.Spanner") { + grpcLayerSpanPresent = true + } + } + if !grpcLayerSpanPresent { + // No gRPC spans mean no call was made to Spanner. + return nil + } + spannerLayerSpanPresent := false + for _, span := range resp.Spans { + if strings.HasPrefix(span.Name, "Spanner.") { + spannerLayerSpanPresent = true + } + } + if !spannerLayerSpanPresent { + return fmt.Errorf("no server side span found for trace_id: %v", traceId) + } + return nil +} + // newActionHandler instantiates an actionHandler for executing the given action. func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (cloudActionHandler, error) { if action.DatabasePath != "" { diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go index 52ec2270ecdf..a3f3b48d72ad 100644 --- a/spanner/test/cloudexecutor/worker_proxy.go +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -28,6 +28,10 @@ import ( "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor" + texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/api/option" @@ -38,11 +42,14 @@ import ( ) var ( - proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.") - spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.") - cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.") - serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.") - ipAddress = "127.0.0.1" + proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.") + spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.") + cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.") + rootCert = flag.String("root_cert", "", "Root certificate used for calls to Cloud Trace.") + serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.") + ipAddress = "127.0.0.1" + cloudTraceEndpoint = "staging-cloudtrace.sandbox.googleapis.com:443" + projectID = "spanner-cloud-systest" ) func main() { @@ -60,18 +67,34 @@ func main() { if *cert == "" { log.Fatalf("Certificate need to be assigned in order to start worker proxy.") } + if *rootCert == "" { + log.Fatalf("Root certificate need to be assigned in order to start worker proxy.") + } lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *proxyPort)) if err != nil { log.Fatal(err) } + ctx := context.Background() + + // Enable opentelemetry tracing. + os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry") + // Set up OpenTelemetry tracing. + traceClientOpts := getClientOptionsForCloudTrace() + tp := getOpenTelemetryTracerProvider(ctx, traceClientOpts) + defer func() { _ = tp.Shutdown(ctx) }() + + // Register the tracer provider and text map propagator(to propagate trace context) globally. + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + // Create a new gRPC server grpcServer := grpc.NewServer() clientOptions := getClientOptionsForSysTests() // Create a new cloud proxy server - cloudProxyServer, err := executor.NewCloudProxyServer(context.Background(), clientOptions) + cloudProxyServer, err := executor.NewCloudProxyServer(ctx, clientOptions, traceClientOpts) if err != nil { log.Fatalf("Creating Cloud Proxy Server failed: %v", err) } @@ -90,6 +113,49 @@ func main() { } } +// getOpenTelemetryTracerProvider sets up the OpenTelemetry by configuring exporter and sampler. +func getOpenTelemetryTracerProvider(ctx context.Context, traceClientOpts []option.ClientOption) *sdktrace.TracerProvider { + traceExporter, err := texporter.New( + texporter.WithContext(ctx), + texporter.WithTraceClientOptions(traceClientOpts), + texporter.WithProjectID(projectID), + ) + if err != nil { + log.Fatalf("unable to set up tracing: %v", err) + } + return sdktrace.NewTracerProvider( + sdktrace.WithBatcher(traceExporter), + sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.01))), + ) +} + +// Constructs client options needed to interact with Cloud Trace APIs. +func getClientOptionsForCloudTrace() []option.ClientOption { + var traceClientOpts []option.ClientOption + traceClientOpts = append(traceClientOpts, option.WithEndpoint(cloudTraceEndpoint)) + traceClientOpts = append(traceClientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(getRootCredentials()))) + + const ( + cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" + traceAppendScope = "https://www.googleapis.com/auth/trace.append" + traceReadScope = "https://www.googleapis.com/auth/trace.readonly" + ) + + log.Println("Reading service key file in executor code for cloud trace client") + cloudSystestCredentialsJSON, err := os.ReadFile(*serviceKeyFile) + if err != nil { + log.Fatal(err) + } + tokenSource, err := google.JWTAccessTokenSourceWithScope([]byte(cloudSystestCredentialsJSON), cloudPlatformScope, traceAppendScope, traceReadScope) + if err != nil { + log.Fatal(err) + } + traceClientOpts = append(traceClientOpts, option.WithTokenSource(tokenSource)) + traceClientOpts = append(traceClientOpts, option.WithCredentialsFile(*serviceKeyFile)) + + return traceClientOpts +} + // Constructs client options needed to run executor for systests func getClientOptionsForSysTests() []option.ClientOption { var options []option.ClientOption @@ -143,6 +209,17 @@ func getCredentials() credentials.TransportCredentials { if err != nil { log.Println(err) } + fmt.Printf("CAcert credentials: %v", creds) + return creds +} + +// Fetches the root credentials for rootCert file. +func getRootCredentials() credentials.TransportCredentials { + creds, err := credentials.NewClientTLSFromFile(*rootCert, "") + if err != nil { + log.Println(err) + } + fmt.Printf("Root credentials: %v", creds) return creds }