From 665c719ed5d6f041c0f6e8fa07b554c439eba416 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Fri, 17 May 2024 06:12:58 +0000 Subject: [PATCH 01/24] Setup OTel and add tracecontext header in Spanner requests --- .../executor/internal/inputstream/handler.go | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 7582307ca2a6..b47f2fe0c8c1 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -24,10 +24,15 @@ import ( "log" "sync" + texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" + "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/actions" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -111,6 +116,34 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec return outcomeSender.FinishWithError(err) } + // Setup trace context propagation. + tc := propagation.TraceContext{} + // Register the TraceContext propagator globally. + otel.SetTextMapPropagator(tc) + + // Set up OTel tracing. + traceExporter, err := texporter.New( + texporter.WithContext(ctx), + texporter.WithProjectID("spanner-cloud-systest"), + texporter.WithTraceClientOptions(h.Options), + ) + if err != nil { + return outcomeSender.FinishWithError(fmt.Errorf("unable to set up tracing: %v", err)) + } + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(traceExporter), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + ) + defer func() { _ = tp.Shutdown(ctx) }() + + otel.SetTracerProvider(tp) + + tracer := tp.Tracer("nareshz-test.com/trace") + + // Create a span for the systest action. + ctx, span := tracer.Start(ctx, "systestactiontrace") + defer span.End() + // Create a channel to receive the error from the goroutine. errCh := make(chan error, 1) successCh := make(chan bool, 1) From c4c88c00463bc46802d8a64bc68636c4bff85240 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 21 May 2024 07:49:38 +0000 Subject: [PATCH 02/24] Add x-goog-spanner-end-to-end-tracing header for requests to SpanFE --- spanner/client.go | 13 ++++++++++++ spanner/client_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/spanner/client.go b/spanner/client.go index b0b26be7ead8..0267e95b0660 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -63,6 +63,10 @@ const ( requestsCompressionHeader = "x-response-encoding" + // endToEndTracingHeader is the name of the metadata header if client + // has opted-in for the creation of trace span's on the Spanner layer. + endToEndTracingHeader = "x-goog-spanner-end-to-end-tracing" + // numChannels is the default value for NumChannels of client. numChannels = 4 ) @@ -331,6 +335,12 @@ type ClientConfig struct { DirectedReadOptions *sppb.DirectedReadOptions OpenTelemetryMeterProvider metric.MeterProvider + + // EnableEndToEndTracing indicates whether end to end tracing is enabled or not. + // If it is enabled, trace span's will be created at Spanner layer. + // + // Default: false + EnableEndToEndTracing bool } type openTelemetryConfig struct { @@ -444,6 +454,9 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf if config.Compression == gzip.Name { md.Append(requestsCompressionHeader, gzip.Name) } + if config.EnableEndToEndTracing { + md.Append(endToEndTracingHeader, "true") + } // Create a session client. sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions) diff --git a/spanner/client_test.go b/spanner/client_test.go index 61915b82b6a6..7d010169c7ac 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -3955,6 +3955,52 @@ func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T) } } +func TestClient_WithEndToEndTracingHeader(t *testing.T) { + t.Parallel() + + server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) + defer serverTeardown() + + wantEndToEndTracing := true + config := ClientConfig{EnableEndToEndTracing: wantEndToEndTracing} + client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...) + if err != nil { + t.Fatalf("failed to get a client: %v", err) + } + gotEndToEndTracing := false + for _, val := range client.sc.md.Get(endToEndTracingHeader) { + if val == "true" { + gotEndToEndTracing = true + } + } + if gotEndToEndTracing != wantEndToEndTracing { + t.Fatalf("mismatch in client configuration for property EnableEndToEndTracing: got %v, want %v", gotEndToEndTracing, wantEndToEndTracing) + } +} + +func TestClient_WithoutEndToEndTracingHeader(t *testing.T) { + t.Parallel() + + server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) + defer serverTeardown() + + wantEndToEndTracing := false + config := ClientConfig{EnableEndToEndTracing: wantEndToEndTracing} + client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...) + if err != nil { + t.Fatalf("failed to get a client: %v", err) + } + gotEndToEndTracing := false + for _, val := range client.sc.md.Get(endToEndTracingHeader) { + if val == "true" { + gotEndToEndTracing = true + } + } + if gotEndToEndTracing != wantEndToEndTracing { + t.Fatalf("mismatch in client configuration for property EnableEndToEndTracing: got %v, want %v", gotEndToEndTracing, wantEndToEndTracing) + } +} + func TestClient_WithCustomBatchTimeout(t *testing.T) { t.Parallel() From 5a91d0f6461e998a20340a9ca9289f408041d86c Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 21 May 2024 17:28:36 +0000 Subject: [PATCH 03/24] resolve comments --- spanner/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index 0267e95b0660..5fa6299d0a13 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -64,7 +64,7 @@ const ( requestsCompressionHeader = "x-response-encoding" // endToEndTracingHeader is the name of the metadata header if client - // has opted-in for the creation of trace span's on the Spanner layer. + // has opted-in for the creation of trace spans on the Spanner layer. endToEndTracingHeader = "x-goog-spanner-end-to-end-tracing" // numChannels is the default value for NumChannels of client. @@ -337,7 +337,7 @@ type ClientConfig struct { OpenTelemetryMeterProvider metric.MeterProvider // EnableEndToEndTracing indicates whether end to end tracing is enabled or not. - // If it is enabled, trace span's will be created at Spanner layer. + // If it is enabled, trace spans will be created at Spanner layer. // // Default: false EnableEndToEndTracing bool From 898599280adf0ffe2d836d812fbe97b85f057b58 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Fri, 24 May 2024 05:26:17 +0000 Subject: [PATCH 04/24] enable opentelemetry --- .../cloudexecutor/executor/internal/inputstream/handler.go | 4 ++++ spanner/test/cloudexecutor/worker_proxy.go | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index b47f2fe0c8c1..04ce43e796a8 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -121,11 +121,15 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec // Register the TraceContext propagator globally. otel.SetTextMapPropagator(tc) + clientOpts := h.Options[1:] + clientOpts = append([]option.ClientOption{option.WithEndpoint("staging-cloudtrace.sandbox.googleapis.com:443")}, clientOpts...) + // Set up OTel tracing. traceExporter, err := texporter.New( texporter.WithContext(ctx), texporter.WithProjectID("spanner-cloud-systest"), texporter.WithTraceClientOptions(h.Options), + texporter.WithDestinationProjectQuota(), ) if err != nil { return outcomeSender.FinishWithError(fmt.Errorf("unable to set up tracing: %v", err)) diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go index 52ec2270ecdf..da360188a063 100644 --- a/spanner/test/cloudexecutor/worker_proxy.go +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -26,6 +26,7 @@ import ( "os" "strings" + "cloud.google.com/go/internal/trace" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor" "golang.org/x/oauth2" @@ -66,6 +67,12 @@ func main() { log.Fatal(err) } + // Enable opentelemetry tracing. + os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry") + trace.SetOpenTelemetryTracingEnabledField(true) + + log.Printf("opentelemetry tracing enabled: %v", trace.IsOpenTelemetryTracingEnabled()) + // Create a new gRPC server grpcServer := grpc.NewServer() From ce427921467b01ce3eb1f04e6ec92fc38cf6cfff Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Mon, 27 May 2024 10:22:36 +0000 Subject: [PATCH 05/24] test --- .../executor/internal/inputstream/handler.go | 10 ++++++---- spanner/test/cloudexecutor/worker_proxy.go | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 04ce43e796a8..0108dc06f2d9 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -23,6 +23,7 @@ import ( "io" "log" "sync" + "time" texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" @@ -121,15 +122,16 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec // Register the TraceContext propagator globally. otel.SetTextMapPropagator(tc) - clientOpts := h.Options[1:] - clientOpts = append([]option.ClientOption{option.WithEndpoint("staging-cloudtrace.sandbox.googleapis.com:443")}, clientOpts...) + // Add the custom endpoint option + clientOpts := append([]option.ClientOption{option.WithEndpoint("cloudtrace.googleapis.com:443")}, h.Options[1:]...) + log.Printf("clientOpts : %v", clientOpts) // Set up OTel tracing. traceExporter, err := texporter.New( texporter.WithContext(ctx), + texporter.WithTraceClientOptions(clientOpts), texporter.WithProjectID("spanner-cloud-systest"), - texporter.WithTraceClientOptions(h.Options), - texporter.WithDestinationProjectQuota(), + texporter.WithTimeout(time.Duration(600*time.Second)), ) if err != nil { return outcomeSender.FinishWithError(fmt.Errorf("unable to set up tracing: %v", err)) diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go index da360188a063..a6f2142f5c9c 100644 --- a/spanner/test/cloudexecutor/worker_proxy.go +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -106,14 +106,17 @@ func getClientOptionsForSysTests() []option.ClientOption { const ( spannerAdminScope = "https://www.googleapis.com/auth/spanner.admin" spannerDataScope = "https://www.googleapis.com/auth/spanner.data" + traceAppendScope = "https://www.googleapis.com/auth/trace.append" ) log.Println("Reading service key file in executor code") cloudSystestCredentialsJSON, err := os.ReadFile(*serviceKeyFile) + fileContents := string(cloudSystestCredentialsJSON) + log.Printf("serviceKeyFile contents: %v", fileContents) if err != nil { log.Fatal(err) } - config, err := google.JWTConfigFromJSON([]byte(cloudSystestCredentialsJSON), spannerAdminScope, spannerDataScope) + config, err := google.JWTConfigFromJSON([]byte(cloudSystestCredentialsJSON), spannerAdminScope, spannerDataScope, traceAppendScope) if err != nil { log.Println(err) } From 7bc7ec517bd21e2f608683ddcc4462ea59174bcf Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 28 May 2024 10:41:36 +0000 Subject: [PATCH 06/24] Roots CA changes --- .../executor/internal/inputstream/handler.go | 55 +++++++++++++++++-- spanner/test/cloudexecutor/worker_proxy.go | 11 ++-- trace/apiv2/trace_client.go | 1 + 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 0108dc06f2d9..ad31fb373c86 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -22,10 +22,12 @@ import ( "fmt" "io" "log" + "os" "sync" "time" texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" + "golang.org/x/oauth2/google" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/executor/apiv1/executorpb" @@ -35,7 +37,9 @@ import ( "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/api/option" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" ) @@ -96,6 +100,46 @@ func (h *CloudStreamHandler) Execute() error { return nil } +// getCloudTraceClientOptions returns the client options for creating Cloud Trace API client. +func getCloudTraceClientOptions() ([]option.ClientOption, error) { + // Read the service key file. + serviceKeyFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") + cloudSystestCredentialsJSON, err := os.ReadFile(serviceKeyFile) + if err != nil { + return nil, err + } + fileContents := string(cloudSystestCredentialsJSON) + log.Printf("serviceKeyFile contents: %v\n", fileContents) + + var traceClientOpts []option.ClientOption + traceClientOpts = append(traceClientOpts, option.WithEndpoint("cloudtrace.googleapis.com:443")) + + // perRPCCredentials, err := oauth.NewJWTAccessFromKey(cloudSystestCredentialsJSON) + // if err != nil { + // return outcomeSender.FinishWithError(err) + // } + rootCertFilePath := os.Getenv("ROOT_CERTIFICATE_FILE_PATH") + fmt.Printf("rootCertFilePath: %v\n", rootCertFilePath) + creds, err := credentials.NewClientTLSFromFile(rootCertFilePath, "") + if err != nil { + return nil, err + } + fmt.Printf("root CA: %v\n", creds) + traceClientOpts = append(traceClientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(creds))) + + const ( + cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" + traceAppendScope = "https://www.googleapis.com/auth/trace.append" + ) + tokenSource, err := google.JWTAccessTokenSourceWithScope([]byte(cloudSystestCredentialsJSON), cloudPlatformScope, traceAppendScope) + if err != nil { + return nil, err + } + traceClientOpts = append(traceClientOpts, option.WithTokenSource(tokenSource)) + traceClientOpts = append(traceClientOpts, option.WithCredentialsFile(serviceKeyFile)) + return traceClientOpts, nil +} + // startHandlingRequest takes care of the given request. It picks an actionHandler and starts // a goroutine in which that action will be executed. func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *executorpb.SpannerAsyncActionRequest) error { @@ -122,14 +166,15 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec // Register the TraceContext propagator globally. otel.SetTextMapPropagator(tc) - // Add the custom endpoint option - clientOpts := append([]option.ClientOption{option.WithEndpoint("cloudtrace.googleapis.com:443")}, h.Options[1:]...) - log.Printf("clientOpts : %v", clientOpts) - + traceClientOpts, err := getCloudTraceClientOptions() + if err != nil { + return outcomeSender.FinishWithError(err) + } + fmt.Printf("traceClientOpts: %v\n", traceClientOpts) // Set up OTel tracing. traceExporter, err := texporter.New( texporter.WithContext(ctx), - texporter.WithTraceClientOptions(clientOpts), + texporter.WithTraceClientOptions(traceClientOpts), texporter.WithProjectID("spanner-cloud-systest"), texporter.WithTimeout(time.Duration(600*time.Second)), ) diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go index a6f2142f5c9c..94d512f47059 100644 --- a/spanner/test/cloudexecutor/worker_proxy.go +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -42,6 +42,7 @@ var ( proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.") spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.") cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.") + rootCert = flag.String("root_cert", "", "Root certificate used for calls to Cloud Trace endpoint.") serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.") ipAddress = "127.0.0.1" ) @@ -70,9 +71,12 @@ func main() { // Enable opentelemetry tracing. os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry") trace.SetOpenTelemetryTracingEnabledField(true) - log.Printf("opentelemetry tracing enabled: %v", trace.IsOpenTelemetryTracingEnabled()) + // Set the default GOOGLE_APPLICATION_CREDENTIALS. + os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", *serviceKeyFile) + os.Setenv("ROOT_CERTIFICATE_FILE_PATH", *rootCert) + // Create a new gRPC server grpcServer := grpc.NewServer() @@ -106,17 +110,14 @@ func getClientOptionsForSysTests() []option.ClientOption { const ( spannerAdminScope = "https://www.googleapis.com/auth/spanner.admin" spannerDataScope = "https://www.googleapis.com/auth/spanner.data" - traceAppendScope = "https://www.googleapis.com/auth/trace.append" ) log.Println("Reading service key file in executor code") cloudSystestCredentialsJSON, err := os.ReadFile(*serviceKeyFile) - fileContents := string(cloudSystestCredentialsJSON) - log.Printf("serviceKeyFile contents: %v", fileContents) if err != nil { log.Fatal(err) } - config, err := google.JWTConfigFromJSON([]byte(cloudSystestCredentialsJSON), spannerAdminScope, spannerDataScope, traceAppendScope) + config, err := google.JWTConfigFromJSON([]byte(cloudSystestCredentialsJSON), spannerAdminScope, spannerDataScope) if err != nil { log.Println(err) } diff --git a/trace/apiv2/trace_client.go b/trace/apiv2/trace_client.go index b4709e28cdf4..bcde8822fcf1 100755 --- a/trace/apiv2/trace_client.go +++ b/trace/apiv2/trace_client.go @@ -344,6 +344,7 @@ func (c *gRPCClient) BatchWriteSpans(ctx context.Context, req *tracepb.BatchWrit err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { var err error _, err = c.client.BatchWriteSpans(ctx, req, settings.GRPC...) + fmt.Printf("error in BatchWriteSpans: %v, with opts: %v\n", err, opts) return err }, opts...) return err From c938f8d83b12e97d3f786bb2509e938c31c56636 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 28 May 2024 13:02:30 +0000 Subject: [PATCH 07/24] test --- .../executor/internal/inputstream/handler.go | 47 +++++++++++++++++-- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index ad31fb373c86..2fe391425408 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -112,7 +112,7 @@ func getCloudTraceClientOptions() ([]option.ClientOption, error) { log.Printf("serviceKeyFile contents: %v\n", fileContents) var traceClientOpts []option.ClientOption - traceClientOpts = append(traceClientOpts, option.WithEndpoint("cloudtrace.googleapis.com:443")) + traceClientOpts = append(traceClientOpts, option.WithEndpoint("staging-cloudtrace.sandbox.googleapis.com:443")) // perRPCCredentials, err := oauth.NewJWTAccessFromKey(cloudSystestCredentialsJSON) // if err != nil { @@ -160,6 +160,7 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec if err != nil { return outcomeSender.FinishWithError(err) } + actionHandlerType := actionHandlerType(inputAction) // Setup trace context propagation. tc := propagation.TraceContext{} @@ -183,16 +184,16 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(traceExporter), - sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.1)), ) defer func() { _ = tp.Shutdown(ctx) }() otel.SetTracerProvider(tp) - tracer := tp.Tracer("nareshz-test.com/trace") + tracer := tp.Tracer("nareshz-systest.com/trace") // Create a span for the systest action. - ctx, span := tracer.Start(ctx, "systestactiontrace") + ctx, span := tracer.Start(ctx, fmt.Sprintf("systestaction_%v", actionHandlerType)) defer span.End() // Create a channel to receive the error from the goroutine. @@ -222,6 +223,44 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } } +// newActionHandler instantiates an actionHandler for executing the given action. +func actionHandlerType(action *executorpb.SpannerAction) string { + switch action.GetAction().(type) { + case *executorpb.SpannerAction_Start: + return "SpannerAction_Start" + case *executorpb.SpannerAction_Finish: + return "SpannerAction_Finish" + case *executorpb.SpannerAction_Admin: + return "SpannerAction_Admin" + case *executorpb.SpannerAction_Read: + return "SpannerAction_Read" + case *executorpb.SpannerAction_Query: + return "SpannerAction_Query" + case *executorpb.SpannerAction_Mutation: + return "SpannerAction_Mutation" + case *executorpb.SpannerAction_Write: + return "SpannerAction_Write" + case *executorpb.SpannerAction_Dml: + return "SpannerAction_Dml" + case *executorpb.SpannerAction_StartBatchTxn: + return "SpannerAction_StartBatchTxn" + case *executorpb.SpannerAction_GenerateDbPartitionsRead: + return "SpannerAction_GenerateDbPartitionsRead" + case *executorpb.SpannerAction_GenerateDbPartitionsQuery: + return "SpannerAction_GenerateDbPartitionsQuery" + case *executorpb.SpannerAction_ExecutePartition: + return "SpannerAction_ExecutePartition" + case *executorpb.SpannerAction_PartitionedUpdate: + return "SpannerAction_PartitionedUpdate" + case *executorpb.SpannerAction_CloseBatchTxn: + return "SpannerAction_CloseBatchTxn" + case *executorpb.SpannerAction_BatchDml: + return "SpannerAction_BatchDml" + default: + return "SpannerAction_default" + } +} + // newActionHandler instantiates an actionHandler for executing the given action. func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (cloudActionHandler, error) { if action.DatabasePath != "" { From aeaadb40ed102300d0883969dc9a47aa315a73ee Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 30 May 2024 10:15:04 +0000 Subject: [PATCH 08/24] changes for correct trace --- .../executor/internal/inputstream/handler.go | 79 +------------------ spanner/test/cloudexecutor/worker_proxy.go | 70 +++++++++++++++- trace/apiv2/trace_client.go | 2 - 3 files changed, 67 insertions(+), 84 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 2fe391425408..8d6217221cc0 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -22,24 +22,15 @@ import ( "fmt" "io" "log" - "os" "sync" - "time" - - texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" - "golang.org/x/oauth2/google" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/actions" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/api/option" - "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" ) @@ -100,46 +91,6 @@ func (h *CloudStreamHandler) Execute() error { return nil } -// getCloudTraceClientOptions returns the client options for creating Cloud Trace API client. -func getCloudTraceClientOptions() ([]option.ClientOption, error) { - // Read the service key file. - serviceKeyFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") - cloudSystestCredentialsJSON, err := os.ReadFile(serviceKeyFile) - if err != nil { - return nil, err - } - fileContents := string(cloudSystestCredentialsJSON) - log.Printf("serviceKeyFile contents: %v\n", fileContents) - - var traceClientOpts []option.ClientOption - traceClientOpts = append(traceClientOpts, option.WithEndpoint("staging-cloudtrace.sandbox.googleapis.com:443")) - - // perRPCCredentials, err := oauth.NewJWTAccessFromKey(cloudSystestCredentialsJSON) - // if err != nil { - // return outcomeSender.FinishWithError(err) - // } - rootCertFilePath := os.Getenv("ROOT_CERTIFICATE_FILE_PATH") - fmt.Printf("rootCertFilePath: %v\n", rootCertFilePath) - creds, err := credentials.NewClientTLSFromFile(rootCertFilePath, "") - if err != nil { - return nil, err - } - fmt.Printf("root CA: %v\n", creds) - traceClientOpts = append(traceClientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(creds))) - - const ( - cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" - traceAppendScope = "https://www.googleapis.com/auth/trace.append" - ) - tokenSource, err := google.JWTAccessTokenSourceWithScope([]byte(cloudSystestCredentialsJSON), cloudPlatformScope, traceAppendScope) - if err != nil { - return nil, err - } - traceClientOpts = append(traceClientOpts, option.WithTokenSource(tokenSource)) - traceClientOpts = append(traceClientOpts, option.WithCredentialsFile(serviceKeyFile)) - return traceClientOpts, nil -} - // startHandlingRequest takes care of the given request. It picks an actionHandler and starts // a goroutine in which that action will be executed. func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *executorpb.SpannerAsyncActionRequest) error { @@ -162,35 +113,7 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } actionHandlerType := actionHandlerType(inputAction) - // Setup trace context propagation. - tc := propagation.TraceContext{} - // Register the TraceContext propagator globally. - otel.SetTextMapPropagator(tc) - - traceClientOpts, err := getCloudTraceClientOptions() - if err != nil { - return outcomeSender.FinishWithError(err) - } - fmt.Printf("traceClientOpts: %v\n", traceClientOpts) - // Set up OTel tracing. - traceExporter, err := texporter.New( - texporter.WithContext(ctx), - texporter.WithTraceClientOptions(traceClientOpts), - texporter.WithProjectID("spanner-cloud-systest"), - texporter.WithTimeout(time.Duration(600*time.Second)), - ) - if err != nil { - return outcomeSender.FinishWithError(fmt.Errorf("unable to set up tracing: %v", err)) - } - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(traceExporter), - sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.1)), - ) - defer func() { _ = tp.Shutdown(ctx) }() - - otel.SetTracerProvider(tp) - - tracer := tp.Tracer("nareshz-systest.com/trace") + tracer := otel.Tracer("nareshz-systest.com/trace") // Create a span for the systest action. ctx, span := tracer.Start(ctx, fmt.Sprintf("systestaction_%v", actionHandlerType)) diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go index 94d512f47059..80f9460dacac 100644 --- a/spanner/test/cloudexecutor/worker_proxy.go +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -25,10 +25,15 @@ import ( "net" "os" "strings" + "time" "cloud.google.com/go/internal/trace" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor" + texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/api/option" @@ -62,27 +67,53 @@ func main() { if *cert == "" { log.Fatalf("Certificate need to be assigned in order to start worker proxy.") } + if *rootCert == "" { + log.Fatalf("Root certificate need to be assigned in order to start worker proxy.") + } lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *proxyPort)) if err != nil { log.Fatal(err) } + ctx := context.Background() + // Enable opentelemetry tracing. os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry") trace.SetOpenTelemetryTracingEnabledField(true) log.Printf("opentelemetry tracing enabled: %v", trace.IsOpenTelemetryTracingEnabled()) - // Set the default GOOGLE_APPLICATION_CREDENTIALS. - os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", *serviceKeyFile) - os.Setenv("ROOT_CERTIFICATE_FILE_PATH", *rootCert) + traceClientOpts, err := getClientOptionsForCloudTrace() + if err != nil { + log.Fatalf("Setting OpenTelemetry failed: %v", err) + } + fmt.Printf("traceClientOpts: %v\n", traceClientOpts) + // Set up OTel tracing. + traceExporter, err := texporter.New( + texporter.WithContext(ctx), + texporter.WithTraceClientOptions(traceClientOpts), + texporter.WithProjectID("spanner-cloud-systest"), + texporter.WithTimeout(time.Duration(600*time.Second)), + ) + if err != nil { + log.Fatalf("unable to set up tracing: %v", err) + } + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(traceExporter), + sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.01))), + ) + defer func() { _ = tp.Shutdown(ctx) }() + + otel.SetTracerProvider(tp) + // Register the TraceContext propagator to send traceparent header. + otel.SetTextMapPropagator(propagation.TraceContext{}) // Create a new gRPC server grpcServer := grpc.NewServer() clientOptions := getClientOptionsForSysTests() // Create a new cloud proxy server - cloudProxyServer, err := executor.NewCloudProxyServer(context.Background(), clientOptions) + cloudProxyServer, err := executor.NewCloudProxyServer(ctx, clientOptions) if err != nil { log.Fatalf("Creating Cloud Proxy Server failed: %v", err) } @@ -101,6 +132,37 @@ func main() { } } +// getClientOptionsForCloudTrace returns the client options for creating Cloud Trace API client. +func getClientOptionsForCloudTrace() ([]option.ClientOption, error) { + var traceClientOpts []option.ClientOption + traceClientOpts = append(traceClientOpts, option.WithEndpoint("staging-cloudtrace.sandbox.googleapis.com:443")) + + // Read the credentials from roots.pem file. + creds, err := credentials.NewClientTLSFromFile(*rootCert, "") + if err != nil { + return nil, err + } + traceClientOpts = append(traceClientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(creds))) + + const ( + cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" + traceAppendScope = "https://www.googleapis.com/auth/trace.append" + ) + // Read the service key file. + cloudSystestCredentialsJSON, err := os.ReadFile(*serviceKeyFile) + if err != nil { + return nil, err + } + + tokenSource, err := google.JWTAccessTokenSourceWithScope([]byte(cloudSystestCredentialsJSON), cloudPlatformScope, traceAppendScope) + if err != nil { + return nil, err + } + traceClientOpts = append(traceClientOpts, option.WithTokenSource(tokenSource)) + traceClientOpts = append(traceClientOpts, option.WithCredentialsFile(*serviceKeyFile)) + return traceClientOpts, nil +} + // Constructs client options needed to run executor for systests func getClientOptionsForSysTests() []option.ClientOption { var options []option.ClientOption diff --git a/trace/apiv2/trace_client.go b/trace/apiv2/trace_client.go index bcde8822fcf1..6eced9638431 100755 --- a/trace/apiv2/trace_client.go +++ b/trace/apiv2/trace_client.go @@ -337,14 +337,12 @@ func (c *restClient) Connection() *grpc.ClientConn { } func (c *gRPCClient) BatchWriteSpans(ctx context.Context, req *tracepb.BatchWriteSpansRequest, opts ...gax.CallOption) error { hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))} - hds = append(c.xGoogHeaders, hds...) ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...) opts = append((*c.CallOptions).BatchWriteSpans[0:len((*c.CallOptions).BatchWriteSpans):len((*c.CallOptions).BatchWriteSpans)], opts...) err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { var err error _, err = c.client.BatchWriteSpans(ctx, req, settings.GRPC...) - fmt.Printf("error in BatchWriteSpans: %v, with opts: %v\n", err, opts) return err }, opts...) return err From 7344e03ba3b12375005abcd8302f0b7a9e9a48cd Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 27 Jun 2024 09:32:37 +0000 Subject: [PATCH 09/24] changes for end to end tracing feature testing in spanner executor --- spanner/session.go | 2 +- .../cloudexecutor/executor/actions/batch.go | 18 +- .../actions/execution_flow_context.go | 2 + .../executor/actions/transaction.go | 18 +- .../executor/executor_proxy_server_impl.go | 28 ++- .../executor/internal/inputstream/handler.go | 177 +++++++++++------- spanner/test/cloudexecutor/worker_proxy.go | 96 +++++----- 7 files changed, 205 insertions(+), 136 deletions(-) diff --git a/spanner/session.go b/spanner/session.go index b6b8996790fc..76d611fe262d 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -926,10 +926,10 @@ func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { if p.TrackSessionHandles || p.ActionOnInactiveTransaction == Warn || p.ActionOnInactiveTransaction == WarnAndClose || p.ActionOnInactiveTransaction == Close { p.mu.Lock() sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) + p.mu.Unlock() if p.TrackSessionHandles { sh.stack = debug.Stack() } - p.mu.Unlock() } return sh } diff --git a/spanner/test/cloudexecutor/executor/actions/batch.go b/spanner/test/cloudexecutor/executor/actions/batch.go index cbfe464981f4..64d3f310d495 100644 --- a/spanner/test/cloudexecutor/executor/actions/batch.go +++ b/spanner/test/cloudexecutor/executor/actions/batch.go @@ -23,6 +23,7 @@ import ( "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + trace "cloud.google.com/go/trace/apiv1" "google.golang.org/api/option" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -31,10 +32,11 @@ import ( // StartBatchTxnHandler holds the necessary components and options required for start batch transaction action. type StartBatchTxnHandler struct { - Action *executorpb.StartBatchTransactionAction - FlowContext *ExecutionFlowContext - OutcomeSender *outputstream.OutcomeSender - Options []option.ClientOption + Action *executorpb.StartBatchTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender + Options []option.ClientOption + TraceClientOptions []option.ClientOption } // ExecuteAction that starts a batch transaction @@ -50,10 +52,16 @@ func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error { return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action"))) } - client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + // Create a trace client to read the traces from Cloud Trace. + traceClient, err := trace.NewClient(ctx, h.TraceClientOptions...) if err != nil { return h.OutcomeSender.FinishWithError(err) } + h.FlowContext.TraceClient = traceClient var txn *spanner.BatchReadOnlyTransaction if h.Action.GetBatchTxnTime() != nil { timestamp := time.Unix(h.Action.GetBatchTxnTime().Seconds, int64(h.Action.GetBatchTxnTime().Nanos)) diff --git a/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go index fd6cf27736fb..c63eaa4518d7 100644 --- a/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go +++ b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go @@ -24,6 +24,7 @@ import ( "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + trace "cloud.google.com/go/trace/apiv1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -49,6 +50,7 @@ type ExecutionFlowContext struct { roTxn *spanner.ReadOnlyTransaction // Current read-only transaction batchTxn *spanner.BatchReadOnlyTransaction // Current batch read-only transaction DbClient *spanner.Client // Current database client + TraceClient *trace.Client // Current trace client tableMetadata *utility.TableMetadataHelper // If in a txn (except batch), this has metadata info about table columns numPendingReads int64 // Number of pending read/query actions. readAborted bool // Indicate whether there's a read/query action got aborted and the transaction need to be reset. diff --git a/spanner/test/cloudexecutor/executor/actions/transaction.go b/spanner/test/cloudexecutor/executor/actions/transaction.go index dc5ae5244be4..680f8f4ef78d 100644 --- a/spanner/test/cloudexecutor/executor/actions/transaction.go +++ b/spanner/test/cloudexecutor/executor/actions/transaction.go @@ -24,6 +24,7 @@ import ( "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + trace "cloud.google.com/go/trace/apiv1" "google.golang.org/api/option" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -33,10 +34,11 @@ import ( // StartTxnHandler holds the necessary components and options required for start transaction action. type StartTxnHandler struct { - Action *executorpb.StartTransactionAction - FlowContext *ExecutionFlowContext - OutcomeSender *outputstream.OutcomeSender - Options []option.ClientOption + Action *executorpb.StartTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender + Options []option.ClientOption + TraceClientOptions []option.ClientOption } // ExecuteAction that starts a read-write or read-only transaction. @@ -54,11 +56,17 @@ func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error { h.FlowContext.tableMetadata = metadata // TODO(harsha) where do I close the client? defer client.Close() - client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) if err != nil { return h.OutcomeSender.FinishWithError(err) } h.FlowContext.DbClient = client + // Create a trace client to read the traces. + traceClient, err := trace.NewClient(ctx, h.TraceClientOptions...) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.FlowContext.TraceClient = traceClient if h.FlowContext.isTransactionActiveLocked() { return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "already in a transaction"))) } diff --git a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go index 25e4da6c79e2..712552346e41 100644 --- a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go +++ b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go @@ -25,24 +25,36 @@ import ( "google.golang.org/api/option" ) +const MAX_CLOUD_TRACE_CHECK_LIMIT = 20 + // CloudProxyServer holds the cloud executor server. type CloudProxyServer struct { - serverContext context.Context - options []option.ClientOption + // members below should be set by the caller + serverContext context.Context + options []option.ClientOption + traceClientOptions []option.ClientOption + // members below represent internal state + cloudTraceCheckCount int } // NewCloudProxyServer initializes and returns a new CloudProxyServer instance. -func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption) (*CloudProxyServer, error) { - return &CloudProxyServer{serverContext: ctx, options: opts}, nil +func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption, traceClientOpts []option.ClientOption) (*CloudProxyServer, error) { + return &CloudProxyServer{serverContext: ctx, options: opts, traceClientOptions: traceClientOpts, cloudTraceCheckCount: 0}, nil } // ExecuteActionAsync is implementation of ExecuteActionAsync in SpannerExecutorProxyServer. It's a // streaming method in which client and server exchange SpannerActions and SpannerActionOutcomes. func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) error { handler := &inputstream.CloudStreamHandler{ - Stream: inputStream, - ServerContext: s.serverContext, - Options: s.options, + Stream: inputStream, + ServerContext: s.serverContext, + Options: s.options, + TraceClientOptions: s.traceClientOptions, + CloudTraceCheckAllowed: s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT, + } + err := handler.Execute() + if err != nil { + s.cloudTraceCheckCount += handler.GetCompletedCloudTraceCheckCount() } - return handler.Execute() + return err } diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 7743ba7003c2..cfde337a02c5 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -24,26 +24,42 @@ import ( "log" "sync" + "cloud.google.com/go/internal/trace" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/actions" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" - "go.opentelemetry.io/otel" + "cloud.google.com/go/trace/apiv1/tracepb" + ottrace "go.opentelemetry.io/otel/trace" "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +const MAX_TRACE_CHECKS_PER_STREAMING_REQUEST = 5 + // CloudStreamHandler handles a streaming ExecuteActions request by performing incoming // actions. It maintains a state associated with the request, such as current transaction. type CloudStreamHandler struct { // members below should be set by the caller - Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer - ServerContext context.Context - Options []option.ClientOption + Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer + ServerContext context.Context + Options []option.ClientOption + TraceClientOptions []option.ClientOption + CloudTraceCheckAllowed bool // indicates whether cloud trace checks can be performed // members below represent internal state executionFlowContext *actions.ExecutionFlowContext mu sync.Mutex // protects mutable internal state + exportedTraces []string // traces that will be checked using cloud trace api +} + +// GetCompletedCloudTraceCheckCount returns the number of Cloud Trace checks performed for +// streaming ExecuteActions request. +func (h *CloudStreamHandler) GetCompletedCloudTraceCheckCount() int { + if h.executionFlowContext.TraceClient == nil { + return 0 + } + return len(h.exportedTraces) } // Execute executes the given ExecuteActions request, blocking until it's done. It takes care of @@ -74,11 +90,22 @@ func (h *CloudStreamHandler) Execute() error { for { req, err := h.Stream.Recv() if err == io.EOF { + // Verify the traces exported to Cloud Trace. + if h.CloudTraceCheckAllowed { + if err = h.verifyCloudTraceExportedTraces(ctx); err != nil { + log.Printf("Verification failed for exported traces: %v", err) + return err + } + } log.Println("Client called Done, half-closed the stream") if h.executionFlowContext != nil && h.executionFlowContext.DbClient != nil { log.Println("Closing the client object in execution flow context") h.executionFlowContext.DbClient.Close() } + if h.executionFlowContext != nil && h.executionFlowContext.TraceClient != nil { + log.Println("Closing the trace client in execution flow context") + h.executionFlowContext.TraceClient.Close() + } break } if err != nil { @@ -111,17 +138,24 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } // Get a new action handler based on the input action. - actionHandler, err := h.newActionHandler(inputAction, outcomeSender) + actionType, actionHandler, err := h.newActionHandler(inputAction, outcomeSender) if err != nil { return outcomeSender.FinishWithError(err) } - actionHandlerType := actionHandlerType(inputAction) - - tracer := otel.Tracer("nareshz-systest.com/trace") + if !h.CloudTraceCheckAllowed { + trace.SetOpenTelemetryTracingEnabledField(false) + } // Create a span for the systest action. - ctx, span := tracer.Start(ctx, fmt.Sprintf("systestaction_%v", actionHandlerType)) - defer span.End() + ctx = trace.StartSpan(ctx, fmt.Sprintf("systestaction_%v", actionType)) + defer func() { trace.EndSpan(ctx, err) }() + + if trace.IsOpenTelemetryTracingEnabled() && h.CloudTraceCheckAllowed { + spanContext := ottrace.SpanContextFromContext(ctx) + if spanContext.IsSampled() && len(h.exportedTraces) < MAX_TRACE_CHECKS_PER_STREAMING_REQUEST && actionType != "Admin" && actionType != "Mutation" { + h.exportedTraces = append(h.exportedTraces, spanContext.TraceID().String()) + } + } // Create a channel to receive the error from the goroutine. errCh := make(chan error, 1) @@ -150,59 +184,61 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } } -// newActionHandler instantiates an actionHandler for executing the given action. -func actionHandlerType(action *executorpb.SpannerAction) string { - switch action.GetAction().(type) { - case *executorpb.SpannerAction_Start: - return "SpannerAction_Start" - case *executorpb.SpannerAction_Finish: - return "SpannerAction_Finish" - case *executorpb.SpannerAction_Admin: - return "SpannerAction_Admin" - case *executorpb.SpannerAction_Read: - return "SpannerAction_Read" - case *executorpb.SpannerAction_Query: - return "SpannerAction_Query" - case *executorpb.SpannerAction_Mutation: - return "SpannerAction_Mutation" - case *executorpb.SpannerAction_Write: - return "SpannerAction_Write" - case *executorpb.SpannerAction_Dml: - return "SpannerAction_Dml" - case *executorpb.SpannerAction_StartBatchTxn: - return "SpannerAction_StartBatchTxn" - case *executorpb.SpannerAction_GenerateDbPartitionsRead: - return "SpannerAction_GenerateDbPartitionsRead" - case *executorpb.SpannerAction_GenerateDbPartitionsQuery: - return "SpannerAction_GenerateDbPartitionsQuery" - case *executorpb.SpannerAction_ExecutePartition: - return "SpannerAction_ExecutePartition" - case *executorpb.SpannerAction_PartitionedUpdate: - return "SpannerAction_PartitionedUpdate" - case *executorpb.SpannerAction_CloseBatchTxn: - return "SpannerAction_CloseBatchTxn" - case *executorpb.SpannerAction_BatchDml: - return "SpannerAction_BatchDml" - default: - return "SpannerAction_default" +// verifyCloudTraceExportedTraces fetches the traces exported from client application using +// Cloud Trace API to cross verify if end to end tracing is working or not. +func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context) error { + if len(h.exportedTraces) == 0 { + return nil } + if h.executionFlowContext.TraceClient == nil { + log.Println("trace client not found") + return nil + } + log.Printf("start verification of exported cloud traces: len:%v, trace_ids:%v\n", len(h.exportedTraces), h.exportedTraces) + // time.Sleep(10 * time.Second) + for _, traceId := range h.exportedTraces { + getTraceRequest := &tracepb.GetTraceRequest{ + ProjectId: "spanner-cloud-systest", + TraceId: traceId, + } + resp, err := h.executionFlowContext.TraceClient.GetTrace(ctx, getTraceRequest) + if err != nil { + log.Printf("error while calling GetTrace api: %v", err) + return err + } + spanNamesCommaSeperated := "" + for i, span := range resp.Spans { + if i > 0 { + spanNamesCommaSeperated = spanNamesCommaSeperated + "," + span.Name + } else { + spanNamesCommaSeperated = span.Name + } + } + log.Printf("Trace_id:%v Spans:%v\n", traceId, spanNamesCommaSeperated) + if len(resp.Spans) == 0 { + return fmt.Errorf("no trace found with trace_id: %v", traceId) + } + } + log.Println("all traces were exported successfully") + return nil } // newActionHandler instantiates an actionHandler for executing the given action. -func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (cloudActionHandler, error) { +func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (string, cloudActionHandler, error) { if action.DatabasePath != "" { h.executionFlowContext.Database = action.DatabasePath } switch action.GetAction().(type) { case *executorpb.SpannerAction_Start: - return &actions.StartTxnHandler{ - Action: action.GetStart(), - FlowContext: h.executionFlowContext, - OutcomeSender: outcomeSender, - Options: h.Options, + return "Start", &actions.StartTxnHandler{ + Action: action.GetStart(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + Options: h.Options, + TraceClientOptions: h.TraceClientOptions, }, nil case *executorpb.SpannerAction_Finish: - return &actions.FinishTxnHandler{ + return "Finish", &actions.FinishTxnHandler{ Action: action.GetFinish(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, @@ -214,82 +250,83 @@ func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, OutcomeSender: outcomeSender, Options: h.Options, } - return adminAction, nil + return "Admin", adminAction, nil case *executorpb.SpannerAction_Read: - return &actions.ReadActionHandler{ + return "Read", &actions.ReadActionHandler{ Action: action.GetRead(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Query: - return &actions.QueryActionHandler{ + return "Query", &actions.QueryActionHandler{ Action: action.GetQuery(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Mutation: - return &actions.MutationActionHandler{ + return "Mutation", &actions.MutationActionHandler{ Action: action.GetMutation(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Write: - return &actions.WriteActionHandler{ + return "Write", &actions.WriteActionHandler{ Action: action.GetWrite().GetMutation(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Dml: - return &actions.DmlActionHandler{ + return "Dml", &actions.DmlActionHandler{ Action: action.GetDml(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_StartBatchTxn: - return &actions.StartBatchTxnHandler{ - Action: action.GetStartBatchTxn(), - FlowContext: h.executionFlowContext, - OutcomeSender: outcomeSender, - Options: h.Options, + return "StartBatchTxn", &actions.StartBatchTxnHandler{ + Action: action.GetStartBatchTxn(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + Options: h.Options, + TraceClientOptions: h.TraceClientOptions, }, nil case *executorpb.SpannerAction_GenerateDbPartitionsRead: - return &actions.PartitionReadActionHandler{ + return "GenerateDbPartitionsRead", &actions.PartitionReadActionHandler{ Action: action.GetGenerateDbPartitionsRead(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_GenerateDbPartitionsQuery: - return &actions.PartitionQueryActionHandler{ + return "GenerateDbPartitionsQuery", &actions.PartitionQueryActionHandler{ Action: action.GetGenerateDbPartitionsQuery(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_ExecutePartition: - return &actions.ExecutePartition{ + return "ExecutePartition", &actions.ExecutePartition{ Action: action.GetExecutePartition(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_PartitionedUpdate: - return &actions.PartitionedUpdate{ + return "PartitionedUpdate", &actions.PartitionedUpdate{ Action: action.GetPartitionedUpdate(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_CloseBatchTxn: - return &actions.CloseBatchTxnHandler{ + return "CloseBatchTxn", &actions.CloseBatchTxnHandler{ Action: action.GetCloseBatchTxn(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_BatchDml: - return &actions.BatchDmlHandler{ + return "BatchDml", &actions.BatchDmlHandler{ Action: action.GetBatchDml(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil default: - return nil, status.Error(codes.Unimplemented, fmt.Sprintf("not implemented yet %T", action.GetAction())) + return "", nil, status.Error(codes.Unimplemented, fmt.Sprintf("not implemented yet %T", action.GetAction())) } } diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go index 80f9460dacac..550720f16d19 100644 --- a/spanner/test/cloudexecutor/worker_proxy.go +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -25,9 +25,7 @@ import ( "net" "os" "strings" - "time" - "cloud.google.com/go/internal/trace" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor" texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" @@ -44,12 +42,14 @@ import ( ) var ( - proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.") - spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.") - cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.") - rootCert = flag.String("root_cert", "", "Root certificate used for calls to Cloud Trace endpoint.") - serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.") - ipAddress = "127.0.0.1" + proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.") + spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.") + cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.") + rootCert = flag.String("root_cert", "", "Root certificate used for calls to Cloud Trace.") + serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.") + ipAddress = "127.0.0.1" + cloudTraceEndpoint = "staging-cloudtrace.sandbox.googleapis.com:443" + projectID = "spanner-cloud-systest" ) func main() { @@ -80,32 +80,13 @@ func main() { // Enable opentelemetry tracing. os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry") - trace.SetOpenTelemetryTracingEnabledField(true) - log.Printf("opentelemetry tracing enabled: %v", trace.IsOpenTelemetryTracingEnabled()) - - traceClientOpts, err := getClientOptionsForCloudTrace() - if err != nil { - log.Fatalf("Setting OpenTelemetry failed: %v", err) - } - fmt.Printf("traceClientOpts: %v\n", traceClientOpts) - // Set up OTel tracing. - traceExporter, err := texporter.New( - texporter.WithContext(ctx), - texporter.WithTraceClientOptions(traceClientOpts), - texporter.WithProjectID("spanner-cloud-systest"), - texporter.WithTimeout(time.Duration(600*time.Second)), - ) - if err != nil { - log.Fatalf("unable to set up tracing: %v", err) - } - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(traceExporter), - sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.01))), - ) + // Set up OpenTelemetry tracing. + traceClientOpts := getClientOptionsForCloudTrace() + tp := getOpenTelemetryTracerProvider(ctx, traceClientOpts) defer func() { _ = tp.Shutdown(ctx) }() + // Register the tracer provider and text map propagator(to propagate trace context) globally. otel.SetTracerProvider(tp) - // Register the TraceContext propagator to send traceparent header. otel.SetTextMapPropagator(propagation.TraceContext{}) // Create a new gRPC server @@ -113,7 +94,7 @@ func main() { clientOptions := getClientOptionsForSysTests() // Create a new cloud proxy server - cloudProxyServer, err := executor.NewCloudProxyServer(ctx, clientOptions) + cloudProxyServer, err := executor.NewCloudProxyServer(ctx, clientOptions, traceClientOpts) if err != nil { log.Fatalf("Creating Cloud Proxy Server failed: %v", err) } @@ -132,35 +113,47 @@ func main() { } } -// getClientOptionsForCloudTrace returns the client options for creating Cloud Trace API client. -func getClientOptionsForCloudTrace() ([]option.ClientOption, error) { - var traceClientOpts []option.ClientOption - traceClientOpts = append(traceClientOpts, option.WithEndpoint("staging-cloudtrace.sandbox.googleapis.com:443")) - - // Read the credentials from roots.pem file. - creds, err := credentials.NewClientTLSFromFile(*rootCert, "") +// getOpenTelemetryTracerProvider sets up the OpenTelemetry by configuring exporter and sampler. +func getOpenTelemetryTracerProvider(ctx context.Context, traceClientOpts []option.ClientOption) *sdktrace.TracerProvider { + traceExporter, err := texporter.New( + texporter.WithContext(ctx), + texporter.WithTraceClientOptions(traceClientOpts), + texporter.WithProjectID(projectID), + ) if err != nil { - return nil, err + log.Fatalf("unable to set up tracing: %v", err) } - traceClientOpts = append(traceClientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(creds))) + return sdktrace.NewTracerProvider( + sdktrace.WithBatcher(traceExporter), + sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.01))), + ) +} + +// Constructs client options needed to interact with Cloud Trace APIs. +func getClientOptionsForCloudTrace() []option.ClientOption { + var traceClientOpts []option.ClientOption + traceClientOpts = append(traceClientOpts, option.WithEndpoint(cloudTraceEndpoint)) + traceClientOpts = append(traceClientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(getRootCredentials()))) const ( cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" traceAppendScope = "https://www.googleapis.com/auth/trace.append" + traceReadScope = "https://www.googleapis.com/auth/trace.readonly" ) - // Read the service key file. + + log.Println("Reading service key file in executor code for cloud trace client") cloudSystestCredentialsJSON, err := os.ReadFile(*serviceKeyFile) if err != nil { - return nil, err + log.Fatal(err) } - - tokenSource, err := google.JWTAccessTokenSourceWithScope([]byte(cloudSystestCredentialsJSON), cloudPlatformScope, traceAppendScope) + tokenSource, err := google.JWTAccessTokenSourceWithScope([]byte(cloudSystestCredentialsJSON), cloudPlatformScope, traceAppendScope, traceReadScope) if err != nil { - return nil, err + log.Fatal(err) } traceClientOpts = append(traceClientOpts, option.WithTokenSource(tokenSource)) traceClientOpts = append(traceClientOpts, option.WithCredentialsFile(*serviceKeyFile)) - return traceClientOpts, nil + + return traceClientOpts } // Constructs client options needed to run executor for systests @@ -219,6 +212,15 @@ func getCredentials() credentials.TransportCredentials { return creds } +// Fetches the root credentials for rootCert file. +func getRootCredentials() credentials.TransportCredentials { + creds, err := credentials.NewClientTLSFromFile(*rootCert, "") + if err != nil { + log.Println(err) + } + return creds +} + // Constructs client options needed to run executor on local machine func getClientOptionsForLocalTest() []option.ClientOption { var options []option.ClientOption From 4ffd0aa2fcfa0808f04d03cbb593276d2e7f8bf0 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 27 Jun 2024 09:39:49 +0000 Subject: [PATCH 10/24] fix unwanted changes --- spanner/session.go | 2 +- trace/apiv2/trace_client.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/spanner/session.go b/spanner/session.go index 76d611fe262d..b6b8996790fc 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -926,10 +926,10 @@ func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { if p.TrackSessionHandles || p.ActionOnInactiveTransaction == Warn || p.ActionOnInactiveTransaction == WarnAndClose || p.ActionOnInactiveTransaction == Close { p.mu.Lock() sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) - p.mu.Unlock() if p.TrackSessionHandles { sh.stack = debug.Stack() } + p.mu.Unlock() } return sh } diff --git a/trace/apiv2/trace_client.go b/trace/apiv2/trace_client.go index 682083f973ae..8f2e3417c442 100755 --- a/trace/apiv2/trace_client.go +++ b/trace/apiv2/trace_client.go @@ -344,6 +344,7 @@ func (c *restClient) Connection() *grpc.ClientConn { func (c *gRPCClient) BatchWriteSpans(ctx context.Context, req *tracepb.BatchWriteSpansRequest, opts ...gax.CallOption) error { hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))} hds = append(c.xGoogHeaders, hds...) + ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...) opts = append((*c.CallOptions).BatchWriteSpans[0:len((*c.CallOptions).BatchWriteSpans):len((*c.CallOptions).BatchWriteSpans)], opts...) err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { From 7691d4ed46165ceeea31de33ccf6e8a2cedf312c Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 27 Jun 2024 09:41:52 +0000 Subject: [PATCH 11/24] minor fix --- trace/apiv2/trace_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trace/apiv2/trace_client.go b/trace/apiv2/trace_client.go index 8f2e3417c442..05881bda7cf6 100755 --- a/trace/apiv2/trace_client.go +++ b/trace/apiv2/trace_client.go @@ -343,8 +343,8 @@ func (c *restClient) Connection() *grpc.ClientConn { } func (c *gRPCClient) BatchWriteSpans(ctx context.Context, req *tracepb.BatchWriteSpansRequest, opts ...gax.CallOption) error { hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))} + hds = append(c.xGoogHeaders, hds...) - ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...) opts = append((*c.CallOptions).BatchWriteSpans[0:len((*c.CallOptions).BatchWriteSpans):len((*c.CallOptions).BatchWriteSpans)], opts...) err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { From ae182e9e93f62cee1638ef30f6a7d441fe261858 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 27 Jun 2024 09:48:42 +0000 Subject: [PATCH 12/24] remove unnecessary log statements --- .../executor/internal/inputstream/handler.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index cfde337a02c5..37b3e8255d9c 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -203,23 +203,13 @@ func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context) } resp, err := h.executionFlowContext.TraceClient.GetTrace(ctx, getTraceRequest) if err != nil { - log.Printf("error while calling GetTrace api: %v", err) + log.Printf("failed to get trace_id %v using GetTrace api: %v", traceId, err) return err } - spanNamesCommaSeperated := "" - for i, span := range resp.Spans { - if i > 0 { - spanNamesCommaSeperated = spanNamesCommaSeperated + "," + span.Name - } else { - spanNamesCommaSeperated = span.Name - } - } - log.Printf("Trace_id:%v Spans:%v\n", traceId, spanNamesCommaSeperated) if len(resp.Spans) == 0 { return fmt.Errorf("no trace found with trace_id: %v", traceId) } } - log.Println("all traces were exported successfully") return nil } From 96c9798cfb5e49959463227d2649c589172f6182 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 27 Jun 2024 20:57:38 +0000 Subject: [PATCH 13/24] add spanner span check --- .../executor/executor_proxy_server_impl.go | 12 ++++----- .../executor/internal/inputstream/handler.go | 25 +++++++++++-------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go index 712552346e41..6ec0ef688f6b 100644 --- a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go +++ b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go @@ -25,7 +25,7 @@ import ( "google.golang.org/api/option" ) -const MAX_CLOUD_TRACE_CHECK_LIMIT = 20 +const MAX_CLOUD_TRACE_CHECK_LIMIT = 10 // CloudProxyServer holds the cloud executor server. type CloudProxyServer struct { @@ -50,11 +50,11 @@ func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExec ServerContext: s.serverContext, Options: s.options, TraceClientOptions: s.traceClientOptions, - CloudTraceCheckAllowed: s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT, + CloudTraceCheckAllowed: (s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT), } - err := handler.Execute() - if err != nil { - s.cloudTraceCheckCount += handler.GetCompletedCloudTraceCheckCount() + if err := handler.Execute(); err != nil { + return err } - return err + s.cloudTraceCheckCount += handler.GetCompletedCloudTraceCheckCount() + return nil } diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 37b3e8255d9c..f06f968415db 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -22,7 +22,9 @@ import ( "fmt" "io" "log" + "strings" "sync" + "time" "cloud.google.com/go/internal/trace" "cloud.google.com/go/spanner" @@ -143,18 +145,13 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec return outcomeSender.FinishWithError(err) } - if !h.CloudTraceCheckAllowed { - trace.SetOpenTelemetryTracingEnabledField(false) - } // Create a span for the systest action. ctx = trace.StartSpan(ctx, fmt.Sprintf("systestaction_%v", actionType)) defer func() { trace.EndSpan(ctx, err) }() - if trace.IsOpenTelemetryTracingEnabled() && h.CloudTraceCheckAllowed { - spanContext := ottrace.SpanContextFromContext(ctx) - if spanContext.IsSampled() && len(h.exportedTraces) < MAX_TRACE_CHECKS_PER_STREAMING_REQUEST && actionType != "Admin" && actionType != "Mutation" { - h.exportedTraces = append(h.exportedTraces, spanContext.TraceID().String()) - } + spanContext := ottrace.SpanContextFromContext(ctx) + if h.CloudTraceCheckAllowed && spanContext.IsSampled() && len(h.exportedTraces) < MAX_TRACE_CHECKS_PER_STREAMING_REQUEST && (actionType == "Read" || actionType == "Query" || actionType == "Dml") { + h.exportedTraces = append(h.exportedTraces, spanContext.TraceID().String()) } // Create a channel to receive the error from the goroutine. @@ -195,7 +192,7 @@ func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context) return nil } log.Printf("start verification of exported cloud traces: len:%v, trace_ids:%v\n", len(h.exportedTraces), h.exportedTraces) - // time.Sleep(10 * time.Second) + time.Sleep(20 * time.Second) for _, traceId := range h.exportedTraces { getTraceRequest := &tracepb.GetTraceRequest{ ProjectId: "spanner-cloud-systest", @@ -206,8 +203,14 @@ func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context) log.Printf("failed to get trace_id %v using GetTrace api: %v", traceId, err) return err } - if len(resp.Spans) == 0 { - return fmt.Errorf("no trace found with trace_id: %v", traceId) + spannerLayerSpanPresent := false + for _, span := range resp.Spans { + if strings.Contains(span.Name, "/Spanner.") { + spannerLayerSpanPresent = true + } + } + if !spannerLayerSpanPresent { + return fmt.Errorf("no internal span found for trace_id: %v", traceId) } } return nil From 787034db57529f1b6c8b8199f159521c78235fa9 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Mon, 1 Jul 2024 16:51:45 +0000 Subject: [PATCH 14/24] add verification of spans at grpc layer --- .../executor/internal/inputstream/handler.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index f06f968415db..3e7c8857e3b7 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -192,7 +192,7 @@ func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context) return nil } log.Printf("start verification of exported cloud traces: len:%v, trace_ids:%v\n", len(h.exportedTraces), h.exportedTraces) - time.Sleep(20 * time.Second) + time.Sleep(10 * time.Second) for _, traceId := range h.exportedTraces { getTraceRequest := &tracepb.GetTraceRequest{ ProjectId: "spanner-cloud-systest", @@ -203,6 +203,17 @@ func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context) log.Printf("failed to get trace_id %v using GetTrace api: %v", traceId, err) return err } + // Check if gRPC layer trace spans are present. Spans in gRPC layer have method + // name called in span name. + grpcLayerSpanPresent := false + for _, span := range resp.Spans { + if strings.Contains(span.Name, "google.spanner.v1.Spanner") { + grpcLayerSpanPresent = true + } + } + if !grpcLayerSpanPresent { + continue + } spannerLayerSpanPresent := false for _, span := range resp.Spans { if strings.Contains(span.Name, "/Spanner.") { From 689fcdb6596af47c400890cb05d063fc1bf9267a Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 8 Aug 2024 07:57:07 +0000 Subject: [PATCH 15/24] resolve comments --- .../cloudexecutor/executor/actions/batch.go | 16 +- .../executor/actions/transaction.go | 16 +- .../executor/executor_proxy_server_impl.go | 6 +- .../executor/internal/inputstream/handler.go | 216 ++++++++++-------- 4 files changed, 131 insertions(+), 123 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/actions/batch.go b/spanner/test/cloudexecutor/executor/actions/batch.go index 64d3f310d495..00129117c944 100644 --- a/spanner/test/cloudexecutor/executor/actions/batch.go +++ b/spanner/test/cloudexecutor/executor/actions/batch.go @@ -23,7 +23,6 @@ import ( "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" - trace "cloud.google.com/go/trace/apiv1" "google.golang.org/api/option" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -32,11 +31,10 @@ import ( // StartBatchTxnHandler holds the necessary components and options required for start batch transaction action. type StartBatchTxnHandler struct { - Action *executorpb.StartBatchTransactionAction - FlowContext *ExecutionFlowContext - OutcomeSender *outputstream.OutcomeSender - Options []option.ClientOption - TraceClientOptions []option.ClientOption + Action *executorpb.StartBatchTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender + Options []option.ClientOption } // ExecuteAction that starts a batch transaction @@ -56,12 +54,6 @@ func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error { if err != nil { return h.OutcomeSender.FinishWithError(err) } - // Create a trace client to read the traces from Cloud Trace. - traceClient, err := trace.NewClient(ctx, h.TraceClientOptions...) - if err != nil { - return h.OutcomeSender.FinishWithError(err) - } - h.FlowContext.TraceClient = traceClient var txn *spanner.BatchReadOnlyTransaction if h.Action.GetBatchTxnTime() != nil { timestamp := time.Unix(h.Action.GetBatchTxnTime().Seconds, int64(h.Action.GetBatchTxnTime().Nanos)) diff --git a/spanner/test/cloudexecutor/executor/actions/transaction.go b/spanner/test/cloudexecutor/executor/actions/transaction.go index 680f8f4ef78d..ff9ab5aa18f7 100644 --- a/spanner/test/cloudexecutor/executor/actions/transaction.go +++ b/spanner/test/cloudexecutor/executor/actions/transaction.go @@ -24,7 +24,6 @@ import ( "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" - trace "cloud.google.com/go/trace/apiv1" "google.golang.org/api/option" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -34,11 +33,10 @@ import ( // StartTxnHandler holds the necessary components and options required for start transaction action. type StartTxnHandler struct { - Action *executorpb.StartTransactionAction - FlowContext *ExecutionFlowContext - OutcomeSender *outputstream.OutcomeSender - Options []option.ClientOption - TraceClientOptions []option.ClientOption + Action *executorpb.StartTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender + Options []option.ClientOption } // ExecuteAction that starts a read-write or read-only transaction. @@ -61,12 +59,6 @@ func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error { return h.OutcomeSender.FinishWithError(err) } h.FlowContext.DbClient = client - // Create a trace client to read the traces. - traceClient, err := trace.NewClient(ctx, h.TraceClientOptions...) - if err != nil { - return h.OutcomeSender.FinishWithError(err) - } - h.FlowContext.TraceClient = traceClient if h.FlowContext.isTransactionActiveLocked() { return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "already in a transaction"))) } diff --git a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go index 6ec0ef688f6b..26fc1a5de034 100644 --- a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go +++ b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go @@ -25,7 +25,7 @@ import ( "google.golang.org/api/option" ) -const MAX_CLOUD_TRACE_CHECK_LIMIT = 10 +const MAX_CLOUD_TRACE_CHECK_LIMIT = 20 // CloudProxyServer holds the cloud executor server. type CloudProxyServer struct { @@ -55,6 +55,8 @@ func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExec if err := handler.Execute(); err != nil { return err } - s.cloudTraceCheckCount += handler.GetCompletedCloudTraceCheckCount() + if handler.IsServerSideTraceCheckDone() { + s.cloudTraceCheckCount++ + } return nil } diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 3e7c8857e3b7..dd8a2cece4d6 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -31,6 +31,7 @@ import ( "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/actions" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + traceapiv1 "cloud.google.com/go/trace/apiv1" "cloud.google.com/go/trace/apiv1/tracepb" ottrace "go.opentelemetry.io/otel/trace" "google.golang.org/api/option" @@ -38,8 +39,6 @@ import ( "google.golang.org/grpc/status" ) -const MAX_TRACE_CHECKS_PER_STREAMING_REQUEST = 5 - // CloudStreamHandler handles a streaming ExecuteActions request by performing incoming // actions. It maintains a state associated with the request, such as current transaction. type CloudStreamHandler struct { @@ -50,18 +49,15 @@ type CloudStreamHandler struct { TraceClientOptions []option.ClientOption CloudTraceCheckAllowed bool // indicates whether cloud trace checks can be performed // members below represent internal state - executionFlowContext *actions.ExecutionFlowContext - mu sync.Mutex // protects mutable internal state - exportedTraces []string // traces that will be checked using cloud trace api + executionFlowContext *actions.ExecutionFlowContext + mu sync.Mutex // protects mutable internal state + serverSideTraceCheckDone bool // indicates whether checks are performed to verify server side tracing } -// GetCompletedCloudTraceCheckCount returns the number of Cloud Trace checks performed for -// streaming ExecuteActions request. -func (h *CloudStreamHandler) GetCompletedCloudTraceCheckCount() int { - if h.executionFlowContext.TraceClient == nil { - return 0 - } - return len(h.exportedTraces) +// IsServerSideTraceCheckDone returns whether a check was done to verify if Spanner server +// side trace are generated or not. +func (h *CloudStreamHandler) IsServerSideTraceCheckDone() bool { + return h.serverSideTraceCheckDone } // Execute executes the given ExecuteActions request, blocking until it's done. It takes care of @@ -74,56 +70,92 @@ func (h *CloudStreamHandler) Execute() error { spanner.UseNumberWithJSONDecoderEncoder(true) var c *actions.ExecutionFlowContext - func() { + ctx := context.Background() + err := func() error { h.mu.Lock() defer h.mu.Unlock() c = &actions.ExecutionFlowContext{} + // Create a trace client to read the traces. + traceClient, err := traceapiv1.NewClient(ctx, h.TraceClientOptions...) + if err != nil { + return fmt.Errorf("Error creating trace client: %v", err) + } + c.TraceClient = traceClient h.executionFlowContext = c + return nil }() + if err != nil { + return err + } + + // Create a top-level OpenTelemetry span for streaming request. + ctx = trace.StartSpan(ctx, "go_systest_execute_actions_stream") - // In case this function returns abruptly, or client misbehaves, make sure to dispose of - // transactions. defer func() { + trace.EndSpan(ctx, err) + // Close the trace client. + log.Println("Closing the trace client in execution flow context") + c.TraceClient.Close() + + // In case this function returns abruptly, or client misbehaves, make sure to dispose of + // transactions. c.CloseOpenTransactions() }() - ctx := context.Background() + performServerSideTraceCheck := false // Main loop that receives and executes actions. for { req, err := h.Stream.Recv() if err == io.EOF { - // Verify the traces exported to Cloud Trace. - if h.CloudTraceCheckAllowed { - if err = h.verifyCloudTraceExportedTraces(ctx); err != nil { - log.Printf("Verification failed for exported traces: %v", err) - return err - } - } log.Println("Client called Done, half-closed the stream") if h.executionFlowContext != nil && h.executionFlowContext.DbClient != nil { log.Println("Closing the client object in execution flow context") h.executionFlowContext.DbClient.Close() } - if h.executionFlowContext != nil && h.executionFlowContext.TraceClient != nil { - log.Println("Closing the trace client in execution flow context") - h.executionFlowContext.TraceClient.Close() - } break } if err != nil { log.Printf("Failed to receive request from client: %v", err) return err } + // OpenTelemetry trace created for a streaming request will be verified using Cloud Trace APIs to make sure that + // Spanner server side tracing is working. Check will be performed only if there is atleast one action of type + // "Read" or "Query" in streaming request and number of checks performed are less than the max limit. + actionType := getActionType(req.Action) + spanContext := ottrace.SpanContextFromContext(ctx) + if h.CloudTraceCheckAllowed && spanContext.IsSampled() && (actionType == "Read" || actionType == "Query") { + performServerSideTraceCheck = true + } + if err = h.startHandlingRequest(ctx, req); err != nil { log.Printf("Failed to handle request %v, Client ends the stream with error: %v", req, err) // TODO(sriharshach): should we throw the error here instead of nil? return nil } } + h.serverSideTraceCheckDone = performServerSideTraceCheck + if performServerSideTraceCheck { + // Verify the end to end trace exported to Cloud Trace. + traceId := ottrace.SpanContextFromContext(ctx).TraceID().String() + if err = h.verifyCloudTraceExportedTraces(ctx, traceId); err != nil { + log.Printf("Verification failed for exported traces: %v", err) + return err + } + } log.Println("Done executing actions") return nil } +// getActionType returns the name of action type. +func getActionType(inputAction *executorpb.SpannerAction) string { + if inputAction == nil { + return "" + } + // Here action will have output as `*executorpb.SpannerAction_Query`. + action := fmt.Sprintf("%T", inputAction.GetAction()) + return strings.TrimPrefix(action, "*executorpb.SpannerAction_") +} + // startHandlingRequest takes care of the given request. It picks an actionHandler and starts // a goroutine in which that action will be executed. func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *executorpb.SpannerAsyncActionRequest) error { @@ -140,20 +172,16 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } // Get a new action handler based on the input action. - actionType, actionHandler, err := h.newActionHandler(inputAction, outcomeSender) + actionHandler, err := h.newActionHandler(inputAction, outcomeSender) if err != nil { return outcomeSender.FinishWithError(err) } // Create a span for the systest action. - ctx = trace.StartSpan(ctx, fmt.Sprintf("systestaction_%v", actionType)) + actionType := getActionType(inputAction) + ctx = trace.StartSpan(ctx, fmt.Sprintf("performaction_%v", actionType)) defer func() { trace.EndSpan(ctx, err) }() - spanContext := ottrace.SpanContextFromContext(ctx) - if h.CloudTraceCheckAllowed && spanContext.IsSampled() && len(h.exportedTraces) < MAX_TRACE_CHECKS_PER_STREAMING_REQUEST && (actionType == "Read" || actionType == "Query" || actionType == "Dml") { - h.exportedTraces = append(h.exportedTraces, spanContext.TraceID().String()) - } - // Create a channel to receive the error from the goroutine. errCh := make(chan error, 1) successCh := make(chan bool, 1) @@ -181,68 +209,63 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec } } -// verifyCloudTraceExportedTraces fetches the traces exported from client application using +// verifyCloudTraceExportedTraces fetches the traces exported from client application using // Cloud Trace API to cross verify if end to end tracing is working or not. -func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context) error { - if len(h.exportedTraces) == 0 { - return nil - } +func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context, traceId string) error { if h.executionFlowContext.TraceClient == nil { - log.Println("trace client not found") - return nil + return fmt.Errorf("trace client not found") } - log.Printf("start verification of exported cloud traces: len:%v, trace_ids:%v\n", len(h.exportedTraces), h.exportedTraces) time.Sleep(10 * time.Second) - for _, traceId := range h.exportedTraces { - getTraceRequest := &tracepb.GetTraceRequest{ - ProjectId: "spanner-cloud-systest", - TraceId: traceId, - } - resp, err := h.executionFlowContext.TraceClient.GetTrace(ctx, getTraceRequest) - if err != nil { - log.Printf("failed to get trace_id %v using GetTrace api: %v", traceId, err) - return err - } - // Check if gRPC layer trace spans are present. Spans in gRPC layer have method - // name called in span name. - grpcLayerSpanPresent := false - for _, span := range resp.Spans { - if strings.Contains(span.Name, "google.spanner.v1.Spanner") { - grpcLayerSpanPresent = true - } - } - if !grpcLayerSpanPresent { - continue - } - spannerLayerSpanPresent := false - for _, span := range resp.Spans { - if strings.Contains(span.Name, "/Spanner.") { - spannerLayerSpanPresent = true - } + log.Printf("start verification of exported cloud trace: trace_id:%s\n", traceId) + + getTraceRequest := &tracepb.GetTraceRequest{ + ProjectId: "spanner-cloud-systest", + TraceId: traceId, + } + resp, err := h.executionFlowContext.TraceClient.GetTrace(ctx, getTraceRequest) + if err != nil { + log.Printf("failed to get trace_id:%v using GetTrace api: %v", traceId, err) + return err + } + // Check if gRPC layer trace spans are present. Span names in gRPC layer contain the name of called + // Spanner method. + grpcLayerSpanPresent := false + for _, span := range resp.Spans { + if strings.Contains(span.Name, "google.spanner.v1.Spanner") { + grpcLayerSpanPresent = true } - if !spannerLayerSpanPresent { - return fmt.Errorf("no internal span found for trace_id: %v", traceId) + } + if !grpcLayerSpanPresent { + // No gRPC spans mean no call was made to Spanner. + return nil + } + spannerLayerSpanPresent := false + for _, span := range resp.Spans { + if strings.HasPrefix(span.Name, "Spanner.") { + spannerLayerSpanPresent = true } } + if !spannerLayerSpanPresent { + return fmt.Errorf("no internal span found for trace_id: %v", traceId) + } return nil } // newActionHandler instantiates an actionHandler for executing the given action. -func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (string, cloudActionHandler, error) { +func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, outcomeSender *outputstream.OutcomeSender) (cloudActionHandler, error) { if action.DatabasePath != "" { h.executionFlowContext.Database = action.DatabasePath } switch action.GetAction().(type) { case *executorpb.SpannerAction_Start: - return "Start", &actions.StartTxnHandler{ - Action: action.GetStart(), - FlowContext: h.executionFlowContext, - OutcomeSender: outcomeSender, - Options: h.Options, - TraceClientOptions: h.TraceClientOptions, + return &actions.StartTxnHandler{ + Action: action.GetStart(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + Options: h.Options, }, nil case *executorpb.SpannerAction_Finish: - return "Finish", &actions.FinishTxnHandler{ + return &actions.FinishTxnHandler{ Action: action.GetFinish(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, @@ -254,83 +277,82 @@ func (h *CloudStreamHandler) newActionHandler(action *executorpb.SpannerAction, OutcomeSender: outcomeSender, Options: h.Options, } - return "Admin", adminAction, nil + return adminAction, nil case *executorpb.SpannerAction_Read: - return "Read", &actions.ReadActionHandler{ + return &actions.ReadActionHandler{ Action: action.GetRead(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Query: - return "Query", &actions.QueryActionHandler{ + return &actions.QueryActionHandler{ Action: action.GetQuery(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Mutation: - return "Mutation", &actions.MutationActionHandler{ + return &actions.MutationActionHandler{ Action: action.GetMutation(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Write: - return "Write", &actions.WriteActionHandler{ + return &actions.WriteActionHandler{ Action: action.GetWrite().GetMutation(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_Dml: - return "Dml", &actions.DmlActionHandler{ + return &actions.DmlActionHandler{ Action: action.GetDml(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_StartBatchTxn: - return "StartBatchTxn", &actions.StartBatchTxnHandler{ - Action: action.GetStartBatchTxn(), - FlowContext: h.executionFlowContext, - OutcomeSender: outcomeSender, - Options: h.Options, - TraceClientOptions: h.TraceClientOptions, + return &actions.StartBatchTxnHandler{ + Action: action.GetStartBatchTxn(), + FlowContext: h.executionFlowContext, + OutcomeSender: outcomeSender, + Options: h.Options, }, nil case *executorpb.SpannerAction_GenerateDbPartitionsRead: - return "GenerateDbPartitionsRead", &actions.PartitionReadActionHandler{ + return &actions.PartitionReadActionHandler{ Action: action.GetGenerateDbPartitionsRead(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_GenerateDbPartitionsQuery: - return "GenerateDbPartitionsQuery", &actions.PartitionQueryActionHandler{ + return &actions.PartitionQueryActionHandler{ Action: action.GetGenerateDbPartitionsQuery(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_ExecutePartition: - return "ExecutePartition", &actions.ExecutePartition{ + return &actions.ExecutePartition{ Action: action.GetExecutePartition(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_PartitionedUpdate: - return "PartitionedUpdate", &actions.PartitionedUpdate{ + return &actions.PartitionedUpdate{ Action: action.GetPartitionedUpdate(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_CloseBatchTxn: - return "CloseBatchTxn", &actions.CloseBatchTxnHandler{ + return &actions.CloseBatchTxnHandler{ Action: action.GetCloseBatchTxn(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil case *executorpb.SpannerAction_BatchDml: - return "BatchDml", &actions.BatchDmlHandler{ + return &actions.BatchDmlHandler{ Action: action.GetBatchDml(), FlowContext: h.executionFlowContext, OutcomeSender: outcomeSender, }, nil default: - return "", nil, status.Error(codes.Unimplemented, fmt.Sprintf("not implemented yet %T", action.GetAction())) + return nil, status.Error(codes.Unimplemented, fmt.Sprintf("not implemented yet %T", action.GetAction())) } } From 96644a03ddac242075d3e91c43f35db020e5bcd2 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 8 Aug 2024 08:12:46 +0000 Subject: [PATCH 16/24] rename end to end tracing to server side tracing --- spanner/client.go | 12 ++++++------ spanner/client_test.go | 32 ++++++++++++++++---------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index 608aa0774428..55702da70051 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -63,9 +63,9 @@ const ( requestsCompressionHeader = "x-response-encoding" - // endToEndTracingHeader is the name of the metadata header if client + // serverSideTracingHeader is the name of the metadata header if client // has opted-in for the creation of trace spans on the Spanner layer. - endToEndTracingHeader = "x-goog-spanner-end-to-end-tracing" + serverSideTracingHeader = "x-goog-spanner-end-to-end-tracing" // numChannels is the default value for NumChannels of client. numChannels = 4 @@ -336,11 +336,11 @@ type ClientConfig struct { OpenTelemetryMeterProvider metric.MeterProvider - // EnableEndToEndTracing indicates whether end to end tracing is enabled or not. + // EnableServerSideTracing indicates whether server side tracing is enabled or not. // If it is enabled, trace spans will be created at Spanner layer. // // Default: false - EnableEndToEndTracing bool + EnableServerSideTracing bool } type openTelemetryConfig struct { @@ -456,8 +456,8 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf if config.Compression == gzip.Name { md.Append(requestsCompressionHeader, gzip.Name) } - if config.EnableEndToEndTracing { - md.Append(endToEndTracingHeader, "true") + if config.EnableServerSideTracing { + md.Append(serverSideTracingHeader, "true") } // Create a session client. diff --git a/spanner/client_test.go b/spanner/client_test.go index 170170106560..26e8a595a94e 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -4368,49 +4368,49 @@ func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T) } } -func TestClient_WithEndToEndTracingHeader(t *testing.T) { +func TestClient_WithServerSideTracingHeader(t *testing.T) { t.Parallel() server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) defer serverTeardown() - wantEndToEndTracing := true - config := ClientConfig{EnableEndToEndTracing: wantEndToEndTracing} + wantServerSideTracing := true + config := ClientConfig{EnableServerSideTracing: wantServerSideTracing} client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...) if err != nil { t.Fatalf("failed to get a client: %v", err) } - gotEndToEndTracing := false - for _, val := range client.sc.md.Get(endToEndTracingHeader) { + gotServerSideTracing := false + for _, val := range client.sc.md.Get(serverSideTracingHeader) { if val == "true" { - gotEndToEndTracing = true + gotServerSideTracing = true } } - if gotEndToEndTracing != wantEndToEndTracing { - t.Fatalf("mismatch in client configuration for property EnableEndToEndTracing: got %v, want %v", gotEndToEndTracing, wantEndToEndTracing) + if gotServerSideTracing != wantServerSideTracing { + t.Fatalf("mismatch in client configuration for property EnableServerSideTracing: got %v, want %v", gotServerSideTracing, wantServerSideTracing) } } -func TestClient_WithoutEndToEndTracingHeader(t *testing.T) { +func TestClient_WithoutServerSideTracingHeader(t *testing.T) { t.Parallel() server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) defer serverTeardown() - wantEndToEndTracing := false - config := ClientConfig{EnableEndToEndTracing: wantEndToEndTracing} + wantServerSideTracing := false + config := ClientConfig{EnableServerSideTracing: wantServerSideTracing} client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...) if err != nil { t.Fatalf("failed to get a client: %v", err) } - gotEndToEndTracing := false - for _, val := range client.sc.md.Get(endToEndTracingHeader) { + gotServerSideTracing := false + for _, val := range client.sc.md.Get(serverSideTracingHeader) { if val == "true" { - gotEndToEndTracing = true + gotServerSideTracing = true } } - if gotEndToEndTracing != wantEndToEndTracing { - t.Fatalf("mismatch in client configuration for property EnableEndToEndTracing: got %v, want %v", gotEndToEndTracing, wantEndToEndTracing) + if gotServerSideTracing != wantServerSideTracing { + t.Fatalf("mismatch in client configuration for property EnableServerSideTracing: got %v, want %v", gotServerSideTracing, wantServerSideTracing) } } From 2b10bf66a3b88dcfd399b734c04f878dc47556b9 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 8 Aug 2024 08:15:08 +0000 Subject: [PATCH 17/24] minor change --- spanner/test/cloudexecutor/executor/actions/batch.go | 2 +- spanner/test/cloudexecutor/executor/actions/transaction.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/actions/batch.go b/spanner/test/cloudexecutor/executor/actions/batch.go index 00129117c944..baec432b4727 100644 --- a/spanner/test/cloudexecutor/executor/actions/batch.go +++ b/spanner/test/cloudexecutor/executor/actions/batch.go @@ -50,7 +50,7 @@ func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error { return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action"))) } - client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableServerSideTracing: true}, h.Options...) if err != nil { return h.OutcomeSender.FinishWithError(err) } diff --git a/spanner/test/cloudexecutor/executor/actions/transaction.go b/spanner/test/cloudexecutor/executor/actions/transaction.go index ff9ab5aa18f7..126f647864ed 100644 --- a/spanner/test/cloudexecutor/executor/actions/transaction.go +++ b/spanner/test/cloudexecutor/executor/actions/transaction.go @@ -54,7 +54,7 @@ func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error { h.FlowContext.tableMetadata = metadata // TODO(harsha) where do I close the client? defer client.Close() - client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableServerSideTracing: true}, h.Options...) if err != nil { return h.OutcomeSender.FinishWithError(err) } From 352072ad1518b059f4d2dff0f97c27a636a1e874 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 20 Aug 2024 07:55:10 +0000 Subject: [PATCH 18/24] resolve comments --- .../actions/execution_flow_context.go | 2 - .../executor/executor_proxy_server_impl.go | 6 +- .../executor/internal/inputstream/handler.go | 78 ++++++++++--------- 3 files changed, 45 insertions(+), 41 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go index c63eaa4518d7..fd6cf27736fb 100644 --- a/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go +++ b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go @@ -24,7 +24,6 @@ import ( "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" - trace "cloud.google.com/go/trace/apiv1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -50,7 +49,6 @@ type ExecutionFlowContext struct { roTxn *spanner.ReadOnlyTransaction // Current read-only transaction batchTxn *spanner.BatchReadOnlyTransaction // Current batch read-only transaction DbClient *spanner.Client // Current database client - TraceClient *trace.Client // Current trace client tableMetadata *utility.TableMetadataHelper // If in a txn (except batch), this has metadata info about table columns numPendingReads int64 // Number of pending read/query actions. readAborted bool // Indicate whether there's a read/query action got aborted and the transaction need to be reset. diff --git a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go index 26fc1a5de034..15052d007eae 100644 --- a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go +++ b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go @@ -19,6 +19,7 @@ package executor import ( "context" + "sync" "cloud.google.com/go/spanner/executor/apiv1/executorpb" "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/inputstream" @@ -34,12 +35,13 @@ type CloudProxyServer struct { options []option.ClientOption traceClientOptions []option.ClientOption // members below represent internal state + mu sync.Mutex cloudTraceCheckCount int } // NewCloudProxyServer initializes and returns a new CloudProxyServer instance. func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption, traceClientOpts []option.ClientOption) (*CloudProxyServer, error) { - return &CloudProxyServer{serverContext: ctx, options: opts, traceClientOptions: traceClientOpts, cloudTraceCheckCount: 0}, nil + return &CloudProxyServer{serverContext: ctx, options: opts, traceClientOptions: traceClientOpts}, nil } // ExecuteActionAsync is implementation of ExecuteActionAsync in SpannerExecutorProxyServer. It's a @@ -56,6 +58,8 @@ func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExec return err } if handler.IsServerSideTraceCheckDone() { + s.mu.Lock() + defer s.mu.Unlock() s.cloudTraceCheckCount++ } return nil diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index dd8a2cece4d6..2563dc0e854f 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -70,39 +70,25 @@ func (h *CloudStreamHandler) Execute() error { spanner.UseNumberWithJSONDecoderEncoder(true) var c *actions.ExecutionFlowContext - ctx := context.Background() - err := func() error { + func() { h.mu.Lock() defer h.mu.Unlock() c = &actions.ExecutionFlowContext{} - // Create a trace client to read the traces. - traceClient, err := traceapiv1.NewClient(ctx, h.TraceClientOptions...) - if err != nil { - return fmt.Errorf("Error creating trace client: %v", err) - } - c.TraceClient = traceClient h.executionFlowContext = c - return nil }() - if err != nil { - return err - } - - // Create a top-level OpenTelemetry span for streaming request. - ctx = trace.StartSpan(ctx, "go_systest_execute_actions_stream") + // In case this function returns abruptly, or client misbehaves, make sure to dispose of + // transactions. defer func() { - trace.EndSpan(ctx, err) - // Close the trace client. - log.Println("Closing the trace client in execution flow context") - c.TraceClient.Close() - - // In case this function returns abruptly, or client misbehaves, make sure to dispose of - // transactions. c.CloseOpenTransactions() }() - performServerSideTraceCheck := false + ctx := context.Background() + + // Create a top-level OpenTelemetry span for streaming request. + ctx = trace.StartSpan(ctx, "go_systest_execute_actions_stream") + + readOrQueryActionPresent := false // Main loop that receives and executes actions. for { req, err := h.Stream.Recv() @@ -118,13 +104,9 @@ func (h *CloudStreamHandler) Execute() error { log.Printf("Failed to receive request from client: %v", err) return err } - // OpenTelemetry trace created for a streaming request will be verified using Cloud Trace APIs to make sure that - // Spanner server side tracing is working. Check will be performed only if there is atleast one action of type - // "Read" or "Query" in streaming request and number of checks performed are less than the max limit. actionType := getActionType(req.Action) - spanContext := ottrace.SpanContextFromContext(ctx) - if h.CloudTraceCheckAllowed && spanContext.IsSampled() && (actionType == "Read" || actionType == "Query") { - performServerSideTraceCheck = true + if actionType == "Read" || actionType == "Query" { + readOrQueryActionPresent = true } if err = h.startHandlingRequest(ctx, req); err != nil { @@ -133,11 +115,28 @@ func (h *CloudStreamHandler) Execute() error { return nil } } - h.serverSideTraceCheckDone = performServerSideTraceCheck - if performServerSideTraceCheck { + // End the top-level OpenTelemetry span. + trace.EndSpan(ctx, nil) + + // OpenTelemetry trace created for a streaming request will be verified using Cloud Trace APIs to make sure that + // Spanner server side tracing is working. Check will be performed only if there is atleast one action of type + // "Read" or "Query" in streaming request, trace is sampled and number of checks performed are less than the max limit. + if h.CloudTraceCheckAllowed && ottrace.SpanContextFromContext(ctx).IsSampled() && readOrQueryActionPresent { + h.serverSideTraceCheckDone = true + // Create a trace client to read the traces. + traceClient, err := traceapiv1.NewClient(ctx, h.TraceClientOptions...) + if err != nil { + return fmt.Errorf("Failed to create trace client: %v", err) + } + defer func() { + // Close the trace client. + log.Println("Closing the trace client") + traceClient.Close() + }() + // Verify the end to end trace exported to Cloud Trace. traceId := ottrace.SpanContextFromContext(ctx).TraceID().String() - if err = h.verifyCloudTraceExportedTraces(ctx, traceId); err != nil { + if err = verifyCloudTraceExportedTraces(ctx, traceClient, traceId); err != nil { log.Printf("Verification failed for exported traces: %v", err) return err } @@ -211,18 +210,21 @@ func (h *CloudStreamHandler) startHandlingRequest(ctx context.Context, req *exec // verifyCloudTraceExportedTraces fetches the traces exported from client application using // Cloud Trace API to cross verify if end to end tracing is working or not. -func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context, traceId string) error { - if h.executionFlowContext.TraceClient == nil { +func verifyCloudTraceExportedTraces(ctx context.Context, traceClient *traceapiv1.Client, traceId string) error { + if traceClient == nil { return fmt.Errorf("trace client not found") } - time.Sleep(10 * time.Second) - log.Printf("start verification of exported cloud trace: trace_id:%s\n", traceId) + // Traces may not yet be exported, sleep for sufficient time before verifying end to end traces. + duration := 10 * time.Second + log.Printf("sleeping for %v seconds before verifying end to end trace", duration) + time.Sleep(duration) + log.Printf("start verification of exported cloud trace: trace_id:%s\n", traceId) getTraceRequest := &tracepb.GetTraceRequest{ ProjectId: "spanner-cloud-systest", TraceId: traceId, } - resp, err := h.executionFlowContext.TraceClient.GetTrace(ctx, getTraceRequest) + resp, err := traceClient.GetTrace(ctx, getTraceRequest) if err != nil { log.Printf("failed to get trace_id:%v using GetTrace api: %v", traceId, err) return err @@ -246,7 +248,7 @@ func (h *CloudStreamHandler) verifyCloudTraceExportedTraces(ctx context.Context, } } if !spannerLayerSpanPresent { - return fmt.Errorf("no internal span found for trace_id: %v", traceId) + return fmt.Errorf("no server side span found for trace_id: %v", traceId) } return nil } From f32dc9802860c2c326b0f09785b3023616553e7a Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 20 Aug 2024 10:09:26 +0000 Subject: [PATCH 19/24] add comment for server side tracing option --- spanner/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spanner/client.go b/spanner/client.go index 55702da70051..52ac1bcf60ba 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -337,7 +337,9 @@ type ClientConfig struct { OpenTelemetryMeterProvider metric.MeterProvider // EnableServerSideTracing indicates whether server side tracing is enabled or not. - // If it is enabled, trace spans will be created at Spanner layer. + // If it is enabled, trace spans will be created at Spanner layer. Enabling server + // side tracing requires OpenTelemetry to be set up properly. Simply enabling this + // option won't generate server side traces. // // Default: false EnableServerSideTracing bool From 6defac2be45b26c42143ac76ead085d1dfd01f67 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Wed, 11 Sep 2024 06:11:45 +0000 Subject: [PATCH 20/24] resolve comments --- .../executor/executor_proxy_server_impl.go | 17 ++++++++++++----- .../executor/internal/inputstream/handler.go | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go index 15052d007eae..452ca85e4fdb 100644 --- a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go +++ b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go @@ -48,12 +48,19 @@ func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption, traceC // streaming method in which client and server exchange SpannerActions and SpannerActionOutcomes. func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) error { handler := &inputstream.CloudStreamHandler{ - Stream: inputStream, - ServerContext: s.serverContext, - Options: s.options, - TraceClientOptions: s.traceClientOptions, - CloudTraceCheckAllowed: (s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT), + Stream: inputStream, + ServerContext: s.serverContext, + Options: s.options, + TraceClientOptions: s.traceClientOptions, } + func() { + s.mu.Lock() + defer s.mu.Unlock() + if s.cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT { + handler.CloudTraceCheckAllowed = true + } + }() + if err := handler.Execute(); err != nil { return err } diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go index 2563dc0e854f..92836997b06a 100644 --- a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -117,6 +117,7 @@ func (h *CloudStreamHandler) Execute() error { } // End the top-level OpenTelemetry span. trace.EndSpan(ctx, nil) + log.Println("Done executing actions") // OpenTelemetry trace created for a streaming request will be verified using Cloud Trace APIs to make sure that // Spanner server side tracing is working. Check will be performed only if there is atleast one action of type @@ -141,7 +142,6 @@ func (h *CloudStreamHandler) Execute() error { return err } } - log.Println("Done executing actions") return nil } From 0a9d2db57e1519202422b1073611426898bfc825 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Wed, 11 Sep 2024 07:22:49 +0000 Subject: [PATCH 21/24] remove opt-in header changes from executor pr --- spanner/client.go | 15 -------------- spanner/client_test.go | 46 ------------------------------------------ 2 files changed, 61 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index ceb90df57ad5..5d3d078a5b0d 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -63,10 +63,6 @@ const ( requestsCompressionHeader = "x-response-encoding" - // serverSideTracingHeader is the name of the metadata header if client - // has opted-in for the creation of trace spans on the Spanner layer. - serverSideTracingHeader = "x-goog-spanner-end-to-end-tracing" - // numChannels is the default value for NumChannels of client. numChannels = 4 ) @@ -335,14 +331,6 @@ type ClientConfig struct { DirectedReadOptions *sppb.DirectedReadOptions OpenTelemetryMeterProvider metric.MeterProvider - - // EnableServerSideTracing indicates whether server side tracing is enabled or not. - // If it is enabled, trace spans will be created at Spanner layer. Enabling server - // side tracing requires OpenTelemetry to be set up properly. Simply enabling this - // option won't generate server side traces. - // - // Default: false - EnableServerSideTracing bool } type openTelemetryConfig struct { @@ -458,9 +446,6 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf if config.Compression == gzip.Name { md.Append(requestsCompressionHeader, gzip.Name) } - if config.EnableServerSideTracing { - md.Append(serverSideTracingHeader, "true") - } // Create a session client. sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions) diff --git a/spanner/client_test.go b/spanner/client_test.go index 18a8ff77c3f2..20891a5a1519 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -4393,52 +4393,6 @@ func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T) } } -func TestClient_WithServerSideTracingHeader(t *testing.T) { - t.Parallel() - - server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) - defer serverTeardown() - - wantServerSideTracing := true - config := ClientConfig{EnableServerSideTracing: wantServerSideTracing} - client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...) - if err != nil { - t.Fatalf("failed to get a client: %v", err) - } - gotServerSideTracing := false - for _, val := range client.sc.md.Get(serverSideTracingHeader) { - if val == "true" { - gotServerSideTracing = true - } - } - if gotServerSideTracing != wantServerSideTracing { - t.Fatalf("mismatch in client configuration for property EnableServerSideTracing: got %v, want %v", gotServerSideTracing, wantServerSideTracing) - } -} - -func TestClient_WithoutServerSideTracingHeader(t *testing.T) { - t.Parallel() - - server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t) - defer serverTeardown() - - wantServerSideTracing := false - config := ClientConfig{EnableServerSideTracing: wantServerSideTracing} - client, err := makeClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", config, server.ServerAddress, opts...) - if err != nil { - t.Fatalf("failed to get a client: %v", err) - } - gotServerSideTracing := false - for _, val := range client.sc.md.Get(serverSideTracingHeader) { - if val == "true" { - gotServerSideTracing = true - } - } - if gotServerSideTracing != wantServerSideTracing { - t.Fatalf("mismatch in client configuration for property EnableServerSideTracing: got %v, want %v", gotServerSideTracing, wantServerSideTracing) - } -} - func TestClient_WithCustomBatchTimeout(t *testing.T) { t.Parallel() From 97fe4e78db5b47980bbb49cd2bb8310cfd996f03 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 17 Sep 2024 06:55:37 +0000 Subject: [PATCH 22/24] temp --- spanner/test/cloudexecutor/executor/actions/admin.go | 1 + spanner/test/cloudexecutor/worker_proxy.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/spanner/test/cloudexecutor/executor/actions/admin.go b/spanner/test/cloudexecutor/executor/actions/admin.go index d9421fe68e63..4651c07e30da 100644 --- a/spanner/test/cloudexecutor/executor/actions/admin.go +++ b/spanner/test/cloudexecutor/executor/actions/admin.go @@ -261,6 +261,7 @@ func executeListCloudInstances(ctx context.Context, action *executorpb.ListCloud // execute action that lists cloud instance configs. func executeListInstanceConfigs(ctx context.Context, action *executorpb.ListCloudInstanceConfigsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { log.Printf("listing instance configs: %v", action) + log.Printf("executeListInstanceConfigs options: %v", opts) projectID := action.GetProjectId() instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) if err != nil { diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go index 550720f16d19..a3f3b48d72ad 100644 --- a/spanner/test/cloudexecutor/worker_proxy.go +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -209,6 +209,7 @@ func getCredentials() credentials.TransportCredentials { if err != nil { log.Println(err) } + fmt.Printf("CAcert credentials: %v", creds) return creds } @@ -218,6 +219,7 @@ func getRootCredentials() credentials.TransportCredentials { if err != nil { log.Println(err) } + fmt.Printf("Root credentials: %v", creds) return creds } From d098d1f8122c7be93a94b21781f627f2f1f65cb1 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Wed, 9 Oct 2024 17:30:52 +0000 Subject: [PATCH 23/24] changes as per the latest naming --- spanner/test/cloudexecutor/executor/actions/batch.go | 2 +- spanner/test/cloudexecutor/executor/actions/transaction.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spanner/test/cloudexecutor/executor/actions/batch.go b/spanner/test/cloudexecutor/executor/actions/batch.go index baec432b4727..00129117c944 100644 --- a/spanner/test/cloudexecutor/executor/actions/batch.go +++ b/spanner/test/cloudexecutor/executor/actions/batch.go @@ -50,7 +50,7 @@ func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error { return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action"))) } - client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableServerSideTracing: true}, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) if err != nil { return h.OutcomeSender.FinishWithError(err) } diff --git a/spanner/test/cloudexecutor/executor/actions/transaction.go b/spanner/test/cloudexecutor/executor/actions/transaction.go index 126f647864ed..ff9ab5aa18f7 100644 --- a/spanner/test/cloudexecutor/executor/actions/transaction.go +++ b/spanner/test/cloudexecutor/executor/actions/transaction.go @@ -54,7 +54,7 @@ func (h *StartTxnHandler) ExecuteAction(ctx context.Context) error { h.FlowContext.tableMetadata = metadata // TODO(harsha) where do I close the client? defer client.Close() - client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableServerSideTracing: true}, h.Options...) + client, err := spanner.NewClientWithConfig(ctx, h.FlowContext.Database, spanner.ClientConfig{SessionPoolConfig: spanner.DefaultSessionPoolConfig, DisableRouteToLeader: false, EnableEndToEndTracing: true}, h.Options...) if err != nil { return h.OutcomeSender.FinishWithError(err) } From ec1095f6478e453c6c56ebf11feadea717802f45 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Wed, 9 Oct 2024 17:39:54 +0000 Subject: [PATCH 24/24] remove unnecessary comment --- spanner/test/cloudexecutor/executor/actions/admin.go | 1 - 1 file changed, 1 deletion(-) diff --git a/spanner/test/cloudexecutor/executor/actions/admin.go b/spanner/test/cloudexecutor/executor/actions/admin.go index 4651c07e30da..d9421fe68e63 100644 --- a/spanner/test/cloudexecutor/executor/actions/admin.go +++ b/spanner/test/cloudexecutor/executor/actions/admin.go @@ -261,7 +261,6 @@ func executeListCloudInstances(ctx context.Context, action *executorpb.ListCloud // execute action that lists cloud instance configs. func executeListInstanceConfigs(ctx context.Context, action *executorpb.ListCloudInstanceConfigsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { log.Printf("listing instance configs: %v", action) - log.Printf("executeListInstanceConfigs options: %v", opts) projectID := action.GetProjectId() instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) if err != nil {