From 99989457202f58d5f2715077f56be8ff8a291c84 Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 16:38:47 +0900 Subject: [PATCH 1/7] impl for trace decorator --- .../otel/tracing/context_decorator.go | 218 ++++++++++++++++++ actor/middleware/otel/tracing/span.go | 69 ++++++ 2 files changed, 287 insertions(+) create mode 100644 actor/middleware/otel/tracing/context_decorator.go create mode 100644 actor/middleware/otel/tracing/span.go diff --git a/actor/middleware/otel/tracing/context_decorator.go b/actor/middleware/otel/tracing/context_decorator.go new file mode 100644 index 00000000..885baa8b --- /dev/null +++ b/actor/middleware/otel/tracing/context_decorator.go @@ -0,0 +1,218 @@ +package tracing + +import ( + context2 "context" + "errors" + "fmt" + "github.com/asynkron/protoactor-go/actor" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "log/slog" + "time" +) + +// ActorContext is a decorator for actor.Context that adds OpenTelemetry tracing capabilities. +type ActorContext struct { + actor.Context + withTracingSpanContext context2.Context +} + +var _ actor.Context = (*ActorContext)(nil) + +func (ac *ActorContext) Receive(envelope *actor.MessageEnvelope) { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + ac.Context.Receive(envelope) + return + } + ac.withTracingSpanContext = context2.Background() + defer func() { + ac.withTracingSpanContext = nil + }() + if envelope.Header != nil { + spanContext, err := spanContextFromMessageHeader(envelope.Header) + if errors.Is(err, ErrSpanContextNotFound) { + ac.Logger().Debug("No spanContext found", slog.Any("self", ac.Self()), slog.Any("error", err)) + } else if err != nil { + ac.Logger().Debug("Error extracting spanContext", slog.Any("self", ac.Self()), slog.Any("error", err)) + } else { + ac.withTracingSpanContext = trace.ContextWithSpanContext(ac.withTracingSpanContext, spanContext) + } + } + startSpan := func(suffix string) trace.Span { + ctx, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_receive/%T/%s", ac.Actor(), suffix)) + span.SetAttributes(attribute.String("ActorPID", ac.Self().String())) + span.SetAttributes(attribute.String("ActorType", fmt.Sprintf("%T", ac.Actor()))) + span.SetAttributes(attribute.String("MessageType", fmt.Sprintf("%T", envelope.Message))) + ac.withTracingSpanContext = ctx + return span + } + + switch envelope.Message.(type) { + case *actor.Started: + span := startSpan("started") + defer span.End() + case *actor.Stopping: + span := startSpan("stopping") + defer span.End() + case *actor.Stopped: + span := startSpan("stopped") + defer span.End() + default: + span := startSpan(fmt.Sprintf("%T", envelope.Message)) + defer span.End() + } + + ac.Context.Receive(envelope) +} + +func (ac *ActorContext) Send(pid *actor.PID, message interface{}) { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + ac.Context.Send(pid, message) + return + } + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_send/%T", message)) + defer span.End() + setSenderSpanAttributes(pid, message, span, ac) + envelop := messageToEnvelop(message, ac, ac.Self()) + + ac.Context.Send(pid, envelop) +} + +func (ac *ActorContext) Request(pid *actor.PID, message interface{}) { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + ac.Context.Request(pid, message) + return + } + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_request/%T", message)) + defer span.End() + setSenderSpanAttributes(pid, message, span, ac) + envelop := messageToEnvelop(message, ac, ac.Self()) + + ac.Context.Send(pid, envelop) +} + +func (ac *ActorContext) RequestWithCustomSender(pid *actor.PID, message interface{}, sender *actor.PID) { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + ac.Context.RequestWithCustomSender(pid, message, sender) + return + } + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_request_with_custom_sender/%T", message)) + defer span.End() + setSenderSpanAttributes(pid, message, span, ac) + span.SetAttributes(attribute.String("CustomSenderActorPID", sender.String())) + + envelop := messageToEnvelop(message, ac, sender) + + ac.Context.Send(pid, envelop) +} + +func messageToEnvelop(message interface{}, t *ActorContext, sender *actor.PID) *actor.MessageEnvelope { + envelop, ok := message.(*actor.MessageEnvelope) + if ok { + setSpanContextToEnvelope(trace.SpanContextFromContext(t.withTracingSpanContext), envelop) + } else { + envelop = wrapEnvelopeWithSpanContext(trace.SpanContextFromContext(t.withTracingSpanContext), message, sender) + } + return envelop +} + +func (ac *ActorContext) RequestFuture(pid *actor.PID, message interface{}, timeout time.Duration) *actor.Future { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + return ac.Context.RequestFuture(pid, message, timeout) + } + future := actor.NewFuture(ac.ActorSystem(), timeout) + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_request_future/%T", message)) + defer span.End() + setSenderSpanAttributes(pid, message, span, ac) + envelop := messageToEnvelop(message, ac, future.PID()) + + ac.Context.RequestFuture(pid, envelop, timeout) + return future + +} +func (ac *ActorContext) Respond(response interface{}) { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + ac.Context.Respond(response) + return + } + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_respond/%T", response)) + defer span.End() + setSenderSpanAttributes(ac.Sender(), response, span, ac) + envelop := messageToEnvelop(response, ac, ac.Self()) + + ac.Context.Respond(envelop) +} + +func ContextDecorator() func(next actor.ContextDecoratorFunc) actor.ContextDecoratorFunc { + return func(next actor.ContextDecoratorFunc) actor.ContextDecoratorFunc { + return func(ctx actor.Context) actor.Context { + return next(&ActorContext{ctx, nil}) + } + } +} + +func (ac *ActorContext) Spawn(props *actor.Props) *actor.PID { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + return ac.Context.Spawn(props) + } + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, "spawn") + defer span.End() + props.Configure(actor.WithContextDecorator(ContextDecorator())) + pid := ac.Context.Spawn(props) + span.SetName(fmt.Sprintf("spawn/%s", pid.Id)) + span.SetAttributes(attribute.String("SpawnActorPID", pid.String())) + return pid +} + +func (ac *ActorContext) SpawnPrefix(props *actor.Props, prefix string) *actor.PID { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + return ac.Context.SpawnPrefix(props, prefix) + } + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, "spawn") + defer span.End() + props.Configure(actor.WithContextDecorator(ContextDecorator())) + pid := ac.Context.SpawnPrefix(props, prefix) + span.SetAttributes(attribute.String("Prefix", prefix)) + + span.SetName(fmt.Sprintf("spawn/%s", pid.Id)) + span.SetAttributes(attribute.String("SpawnActorPID", pid.String())) + return pid +} + +func (ac *ActorContext) SpawnNamed(props *actor.Props, id string) (*actor.PID, error) { + traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + if !ok { + ac.Logger().Debug("TraceExtension not registered") + return ac.Context.SpawnNamed(props, id) + } + _, span := traceExt.Tracer().Start(ac.withTracingSpanContext, "spawn") + defer span.End() + props.Configure(actor.WithContextDecorator(ContextDecorator())) + span.SetAttributes(attribute.String("ID", id)) + pid, err := ac.Context.SpawnNamed(props, id) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetName(fmt.Sprintf("spawn/%s", pid.Id)) + span.SetAttributes(attribute.String("SpawnActorPID", pid.String())) + } + return pid, err +} diff --git a/actor/middleware/otel/tracing/span.go b/actor/middleware/otel/tracing/span.go new file mode 100644 index 00000000..ee1cc545 --- /dev/null +++ b/actor/middleware/otel/tracing/span.go @@ -0,0 +1,69 @@ +package tracing + +import ( + "encoding/hex" + "fmt" + "github.com/asynkron/protoactor-go/actor" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var ErrSpanContextNotFound = fmt.Errorf("spanContext not found") + +func spanContextFromMessageHeader(header actor.ReadonlyMessageHeader) (trace.SpanContext, error) { + if header == nil { + return trace.SpanContext{}, ErrSpanContextNotFound + } + + gotSpanId, err := trace.SpanIDFromHex(header.Get("parent-id")) + if err != nil { + return trace.SpanContext{}, fmt.Errorf("failed to parse spanId: %v: %w", err, ErrSpanContextNotFound) + } + + gotTraceId, err := trace.TraceIDFromHex(header.Get("trace-id")) + if err != nil { + return trace.SpanContext{}, fmt.Errorf("failed to parse traceId: %v: %w", err, ErrSpanContextNotFound) + } + + tracestate, err := trace.ParseTraceState(header.Get("tracestate")) + if err != nil { + return trace.SpanContext{}, fmt.Errorf("failed to parse tracestate: %v: %w", err, ErrSpanContextNotFound) + } + + traceFlags, err := hex.DecodeString(header.Get("trace-flags")) + if err != nil { + return trace.SpanContext{}, fmt.Errorf("failed to parse traceFlags: %v: %w", err, ErrSpanContextNotFound) + } + + return trace.NewSpanContext( + trace.SpanContextConfig{ + SpanID: gotSpanId, + TraceID: gotTraceId, + TraceState: tracestate, + TraceFlags: trace.TraceFlags(traceFlags[0]), + Remote: true, + }), nil +} + +func setSenderSpanAttributes(sendTargetActorPID *actor.PID, message interface{}, span trace.Span, t *ActorContext) { + span.SetAttributes(attribute.String("SenderActorPID", t.Self().String())) + span.SetAttributes(attribute.String("SenderActorType", fmt.Sprintf("%T", t.Actor()))) + span.SetAttributes(attribute.String("TargetActorPID", sendTargetActorPID.String())) + span.SetAttributes(attribute.String("MessageType", fmt.Sprintf("%T", message))) +} + +func wrapEnvelopeWithSpanContext(spanCtx trace.SpanContext, message any, sender *actor.PID) *actor.MessageEnvelope { + envelope := &actor.MessageEnvelope{ + Message: message, + Sender: sender, + } + setSpanContextToEnvelope(spanCtx, envelope) + return envelope +} + +func setSpanContextToEnvelope(spanCtx trace.SpanContext, envelope *actor.MessageEnvelope) { + envelope.SetHeader("parent-id", spanCtx.SpanID().String()) + envelope.SetHeader("trace-id", spanCtx.TraceID().String()) + envelope.SetHeader("tracestate", spanCtx.TraceState().String()) + envelope.SetHeader("trace-flags", fmt.Sprintf("%02x", byte(spanCtx.TraceFlags()))) +} From 0761f26e29ae8801a3c457047e5dc11873c2810e Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 16:39:22 +0900 Subject: [PATCH 2/7] impl for root context middleware --- .../otel/tracing/sender_middleware.go | 63 +++++++++++++++++++ .../otel/tracing/spawn_middleware.go | 51 +++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 actor/middleware/otel/tracing/sender_middleware.go create mode 100644 actor/middleware/otel/tracing/spawn_middleware.go diff --git a/actor/middleware/otel/tracing/sender_middleware.go b/actor/middleware/otel/tracing/sender_middleware.go new file mode 100644 index 00000000..2f8bd1a3 --- /dev/null +++ b/actor/middleware/otel/tracing/sender_middleware.go @@ -0,0 +1,63 @@ +package tracing + +import ( + context2 "context" + "errors" + "fmt" + "github.com/asynkron/protoactor-go/actor" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "log/slog" +) + +func MapFromSpanContext(spanCtx trace.SpanContext) map[string]string { + return map[string]string{ + "parent-id": spanCtx.SpanID().String(), + "trace-id": spanCtx.TraceID().String(), + "tracestate": spanCtx.TraceState().String(), + "trace-flags": fmt.Sprintf("%02x", byte(spanCtx.TraceFlags())), + } +} + +func extractSpanContextFromSenderFuncArgs(c actor.SenderContext, envelope *actor.MessageEnvelope) (trace.SpanContext, error) { + fromCtxMessageHeader, err := spanContextFromMessageHeader(c.MessageHeader()) + if !errors.Is(err, ErrSpanContextNotFound) { + return fromCtxMessageHeader, nil + } + return spanContextFromMessageHeader(envelope.Header) +} + +func RootContextSenderMiddleware() actor.SenderMiddleware { + return func(next actor.SenderFunc) actor.SenderFunc { + return func(c actor.SenderContext, target *actor.PID, envelope *actor.MessageEnvelope) { + if _, ok := c.(*actor.RootContext); !ok { + c.Logger().Debug("Context is not a receiver context", slog.Any("self", c.Self())) + next(c, target, envelope) + return + } + + ctxWithParentSpan := context2.Background() + spanContext, err := extractSpanContextFromSenderFuncArgs(c, envelope) + if errors.Is(err, ErrSpanContextNotFound) { + c.Logger().Debug("No spanContext found", slog.Any("self", c.Self()), slog.Any("error", err)) + } else if err != nil { + c.Logger().Error("Error extracting spanContext", slog.Any("self", c.Self()), slog.Any("error", err)) + } else { + ctxWithParentSpan = trace.ContextWithSpanContext(ctxWithParentSpan, spanContext) + } + + traceExt := c.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + ctxWithCurrentSpan, span := traceExt.Tracer().Start(ctxWithParentSpan, fmt.Sprintf("message_send/%T", envelope.Message)) + defer span.End() + span.SetAttributes(attribute.String("SenderActorPID", c.Self().String())) + span.SetAttributes(attribute.String("SenderActorType", fmt.Sprintf("%T", c.Actor()))) + span.SetAttributes(attribute.String("TargetActorPID", target.String())) + span.SetAttributes(attribute.String("MessageType", fmt.Sprintf("%T", envelope.Message))) + setSpanContextToEnvelope(trace.SpanContextFromContext( + ctxWithCurrentSpan, + ), envelope) + + next(c, target, envelope) + } + } +} diff --git a/actor/middleware/otel/tracing/spawn_middleware.go b/actor/middleware/otel/tracing/spawn_middleware.go new file mode 100644 index 00000000..c1debef3 --- /dev/null +++ b/actor/middleware/otel/tracing/spawn_middleware.go @@ -0,0 +1,51 @@ +package tracing + +import ( + context2 "context" + "errors" + "fmt" + "github.com/asynkron/protoactor-go/actor" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "log/slog" +) + +func rootContextSpawnMiddleware() actor.SpawnMiddleware { + return func(next actor.SpawnFunc) actor.SpawnFunc { + return func(actorSystem *actor.ActorSystem, id string, props *actor.Props, parentContext actor.SpawnerContext) (pid *actor.PID, e error) { + rootContext, ok := parentContext.(*actor.RootContext) + if !ok { + parentContext.Logger().Debug("Context is not rootContext", slog.Any("self", parentContext.Self())) + pid, err := next(actorSystem, id, props, parentContext) + return pid, err + } + + self := parentContext.Self() + ctxWithParentSpan := context2.Background() + + if rootContext.MessageHeader() != nil { + spanCtx, err := spanContextFromMessageHeader(rootContext.MessageHeader()) + if errors.Is(err, ErrSpanContextNotFound) { + rootContext.Logger().Debug("No spanContext found", slog.String("self", "root-context"), slog.Any("error", err)) + } else if err != nil { + rootContext.Logger().Debug("Error extracting spanContext", slog.String("self", "root-context"), slog.Any("error", err)) + } else { + ctxWithParentSpan = trace.ContextWithSpanContext(ctxWithParentSpan, spanCtx) + } + } + traceExt := actorSystem.Extensions.Get(extensionID).(*TraceExtension) + _, span := traceExt.Tracer().Start(ctxWithParentSpan, fmt.Sprintf("spawn/%s", id)) + defer span.End() + span.SetAttributes(attribute.String("SpawnActorPID", pid.String())) + pid, err := next(actorSystem, id, props, parentContext) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + actorSystem.Logger().Debug("SPAWN got error trying to spawn", slog.Any("self", self), slog.Any("actor", parentContext.Actor()), slog.Any("error", err)) + return pid, err + } + return pid, err + } + } +} From a9c91259af77dafe993ae955527ffb5e48eeefa3 Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 16:43:23 +0900 Subject: [PATCH 3/7] impl extension --- actor/middleware/otel/tracing/extension.go | 34 ++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 actor/middleware/otel/tracing/extension.go diff --git a/actor/middleware/otel/tracing/extension.go b/actor/middleware/otel/tracing/extension.go new file mode 100644 index 00000000..0ab3d38d --- /dev/null +++ b/actor/middleware/otel/tracing/extension.go @@ -0,0 +1,34 @@ +package tracing + +import ( + "github.com/asynkron/protoactor-go/extensions" + "go.opentelemetry.io/otel/trace" +) + +var extensionID = extensions.NextExtensionID() + +type TraceExtension struct { + trace.TracerProvider +} + +func (ext *TraceExtension) Tracer() trace.Tracer { + return ext.TracerProvider.Tracer("protoactor") +} + +func NewTraceExtension( + provider trace.TracerProvider, +) *TraceExtension { + return &TraceExtension{ + provider, + } +} + +func (ext *TraceExtension) Enabled() bool { + return true +} + +func (ext *TraceExtension) ExtensionID() extensions.ExtensionID { + return extensionID +} + +var _ extensions.Extension = &TraceExtension{} From 10f03be32c9d38c103f611a9bdb6d85a38ed75e9 Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 16:43:33 +0900 Subject: [PATCH 4/7] impl middleware propagation --- .../otel/tracing/middleware_propagation.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 actor/middleware/otel/tracing/middleware_propagation.go diff --git a/actor/middleware/otel/tracing/middleware_propagation.go b/actor/middleware/otel/tracing/middleware_propagation.go new file mode 100644 index 00000000..0b1cf8cb --- /dev/null +++ b/actor/middleware/otel/tracing/middleware_propagation.go @@ -0,0 +1,14 @@ +package tracing + +import ( + "github.com/asynkron/protoactor-go/actor" + "github.com/asynkron/protoactor-go/actor/middleware/propagator" +) + +func RootContextSpawnMiddleware() actor.SpawnMiddleware { + return propagator.New(). + WithItselfForwarded(). + WithSpawnMiddleware(rootContextSpawnMiddleware()). + WithContextDecorator(ContextDecorator()). + SpawnMiddleware +} From 254f0faa09ba6a8464aa63d3b9b98293a36bc96c Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 16:51:07 +0900 Subject: [PATCH 5/7] impl easy root context trace helper --- actor/middleware/otel/tracing/span.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/actor/middleware/otel/tracing/span.go b/actor/middleware/otel/tracing/span.go index ee1cc545..13b706e8 100644 --- a/actor/middleware/otel/tracing/span.go +++ b/actor/middleware/otel/tracing/span.go @@ -67,3 +67,8 @@ func setSpanContextToEnvelope(spanCtx trace.SpanContext, envelope *actor.Message envelope.SetHeader("tracestate", spanCtx.TraceState().String()) envelope.SetHeader("trace-flags", fmt.Sprintf("%02x", byte(spanCtx.TraceFlags()))) } + +// TraceableRootContext creates a RootContext with tracing capabilities +func TraceableRootContext(rootContext actor.RootContext, spanContext trace.SpanContext) *actor.RootContext { + return rootContext.Copy().WithSenderMiddleware(RootContextSenderMiddleware()).WithSpawnMiddleware(RootContextSpawnMiddleware()).WithHeaders(MapFromSpanContext(spanContext)) +} From b68ef11dbd1b0ea67968ea9662daf9bec51120d6 Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 17:00:36 +0900 Subject: [PATCH 6/7] add example --- examples/opentelemetry-trace/go.mod | 42 ++++++++++ examples/opentelemetry-trace/go.sum | 113 +++++++++++++++++++++++++++ examples/opentelemetry-trace/main.go | 57 ++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 examples/opentelemetry-trace/go.mod create mode 100644 examples/opentelemetry-trace/go.sum create mode 100644 examples/opentelemetry-trace/main.go diff --git a/examples/opentelemetry-trace/go.mod b/examples/opentelemetry-trace/go.mod new file mode 100644 index 00000000..5662cef7 --- /dev/null +++ b/examples/opentelemetry-trace/go.mod @@ -0,0 +1,42 @@ +module opentelemetry-trace + +go 1.22.0 + +toolchain go1.23.1 + +replace github.com/asynkron/protoactor-go => ../.. + +require ( + github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716 + github.com/asynkron/protoactor-go v0.0.0-20240822202345-3c0e61ca19c9 + go.opentelemetry.io/otel v1.33.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.33.0 + +) + +require ( + github.com/Workiva/go-datastructures v1.1.5 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/emirpasic/gods v1.18.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/lithammer/shortuuid/v4 v4.0.0 // indirect + github.com/lmittmann/tint v1.0.3 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/orcaman/concurrent-map v1.0.0 // indirect + github.com/prometheus/client_golang v1.17.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.1 // indirect + github.com/twmb/murmur3 v1.1.8 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/metric v1.33.0 // indirect + go.opentelemetry.io/otel/sdk v1.33.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.33.0 // indirect + go.opentelemetry.io/otel/trace v1.33.0 // indirect + golang.org/x/sys v0.28.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/examples/opentelemetry-trace/go.sum b/examples/opentelemetry-trace/go.sum new file mode 100644 index 00000000..d7b41af7 --- /dev/null +++ b/examples/opentelemetry-trace/go.sum @@ -0,0 +1,113 @@ +github.com/Workiva/go-datastructures v1.1.3 h1:LRdRrug9tEuKk7TGfz/sct5gjVj44G9pfqDt4qm7ghw= +github.com/Workiva/go-datastructures v1.1.3/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= +github.com/Workiva/go-datastructures v1.1.5 h1:5YfhQ4ry7bZc2Mc7R0YZyYwpf5c6t1cEFvdAhd6Mkf4= +github.com/Workiva/go-datastructures v1.1.5/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= +github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716 h1:SgyG4sXkrlalMoCfp20LiNPNhfJS7ez3opNdtihIxPc= +github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716/go.mod h1:/zSlF0T2ArAsTG6SVu8d8qlK+19jjudjA3wWsxnGFHg= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw7k08o4c= +github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= +github.com/lmittmann/tint v1.0.3 h1:W5PHeA2D8bBJVvabNfQD/XW9HPLZK1XoPZH0cq8NouQ= +github.com/lmittmann/tint v1.0.3/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY= +github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= +github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= +github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= +github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= +go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel/exporters/prometheus v0.44.0 h1:08qeJgaPC0YEBu2PQMbqU3rogTlyzpjhCI2b58Yn00w= +go.opentelemetry.io/otel/exporters/prometheus v0.44.0/go.mod h1:ERL2uIeBtg4TxZdojHUwzZfIFlUIjZtxubT5p4h1Gjg= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.33.0 h1:W5AWUn/IVe8RFb5pZx1Uh9Laf/4+Qmm4kJL5zPuvR+0= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.33.0/go.mod h1:mzKxJywMNBdEX8TSJais3NnsVZUaJ+bAy6UxPTng2vk= +go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= +go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= +go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM= +go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM= +go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= +go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= +go.opentelemetry.io/otel/sdk/metric v1.33.0 h1:Gs5VK9/WUJhNXZgn8MR6ITatvAmKeIuCtNbsP3JkNqU= +go.opentelemetry.io/otel/sdk/metric v1.33.0/go.mod h1:dL5ykHZmm1B1nVRk9dDjChwDmt81MjVp3gLkQRwKf/Q= +go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= +go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/opentelemetry-trace/main.go b/examples/opentelemetry-trace/main.go new file mode 100644 index 00000000..ebd1ce7c --- /dev/null +++ b/examples/opentelemetry-trace/main.go @@ -0,0 +1,57 @@ +package main + +import ( + context2 "context" + "fmt" + "github.com/asynkron/protoactor-go/actor/middleware/otel/tracing" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "time" + + console "github.com/asynkron/goconsole" + "github.com/asynkron/protoactor-go/actor" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +type ( + hello struct{ Who string } + helloActor struct{} +) + +func (state *helloActor) Receive(context actor.Context) { + switch msg := context.Message().(type) { + case *hello: + fmt.Printf("Hello %s\n", msg.Who) + } +} + +func main() { + traceConsoleExporter, err := stdouttrace.New() + if err != nil { + panic(err) + } + + traceProvider := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(traceConsoleExporter), + ) + defer func() { + err := traceProvider.Shutdown(context2.Background()) + if err != nil { + fmt.Printf("failed to shutdown trace provider: %v\n", err) + } + }() + + system := actor.NewActorSystem() + system.Extensions.Register(tracing.NewTraceExtension(traceProvider)) + + props := actor.PropsFromProducer(func() actor.Actor { + return &helloActor{} + }) + + otel.SetTracerProvider(traceProvider) + + pid := system.Root.Spawn(props) + system.Root.Request(pid, &hello{Who: "with tracing"}) + time.Sleep(100 * time.Millisecond) + _, _ = console.ReadLine() +} From a53537e3fabf27b3785b07442bb87d4cc12d3f9e Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 17:13:33 +0900 Subject: [PATCH 7/7] fix extract Extension --- .../otel/tracing/context_decorator.go | 18 +++++++++--------- actor/middleware/otel/tracing/extension.go | 9 +++++++++ .../otel/tracing/sender_middleware.go | 8 +++++++- actor/middleware/otel/tracing/span.go | 8 ++++++-- .../otel/tracing/spawn_middleware.go | 9 ++++++++- examples/opentelemetry-trace/go.mod | 3 ++- examples/opentelemetry-trace/go.sum | 11 ----------- examples/opentelemetry-trace/main.go | 7 ++++--- 8 files changed, 45 insertions(+), 28 deletions(-) diff --git a/actor/middleware/otel/tracing/context_decorator.go b/actor/middleware/otel/tracing/context_decorator.go index 885baa8b..1c9afcf9 100644 --- a/actor/middleware/otel/tracing/context_decorator.go +++ b/actor/middleware/otel/tracing/context_decorator.go @@ -21,7 +21,7 @@ type ActorContext struct { var _ actor.Context = (*ActorContext)(nil) func (ac *ActorContext) Receive(envelope *actor.MessageEnvelope) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Receive(envelope) @@ -69,7 +69,7 @@ func (ac *ActorContext) Receive(envelope *actor.MessageEnvelope) { } func (ac *ActorContext) Send(pid *actor.PID, message interface{}) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Send(pid, message) @@ -84,7 +84,7 @@ func (ac *ActorContext) Send(pid *actor.PID, message interface{}) { } func (ac *ActorContext) Request(pid *actor.PID, message interface{}) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Request(pid, message) @@ -99,7 +99,7 @@ func (ac *ActorContext) Request(pid *actor.PID, message interface{}) { } func (ac *ActorContext) RequestWithCustomSender(pid *actor.PID, message interface{}, sender *actor.PID) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.RequestWithCustomSender(pid, message, sender) @@ -126,7 +126,7 @@ func messageToEnvelop(message interface{}, t *ActorContext, sender *actor.PID) * } func (ac *ActorContext) RequestFuture(pid *actor.PID, message interface{}, timeout time.Duration) *actor.Future { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.RequestFuture(pid, message, timeout) @@ -142,7 +142,7 @@ func (ac *ActorContext) RequestFuture(pid *actor.PID, message interface{}, timeo } func (ac *ActorContext) Respond(response interface{}) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Respond(response) @@ -165,7 +165,7 @@ func ContextDecorator() func(next actor.ContextDecoratorFunc) actor.ContextDecor } func (ac *ActorContext) Spawn(props *actor.Props) *actor.PID { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.Spawn(props) @@ -180,7 +180,7 @@ func (ac *ActorContext) Spawn(props *actor.Props) *actor.PID { } func (ac *ActorContext) SpawnPrefix(props *actor.Props, prefix string) *actor.PID { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.SpawnPrefix(props, prefix) @@ -197,7 +197,7 @@ func (ac *ActorContext) SpawnPrefix(props *actor.Props, prefix string) *actor.PI } func (ac *ActorContext) SpawnNamed(props *actor.Props, id string) (*actor.PID, error) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.SpawnNamed(props, id) diff --git a/actor/middleware/otel/tracing/extension.go b/actor/middleware/otel/tracing/extension.go index 0ab3d38d..5b5a82f1 100644 --- a/actor/middleware/otel/tracing/extension.go +++ b/actor/middleware/otel/tracing/extension.go @@ -1,6 +1,7 @@ package tracing import ( + "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/extensions" "go.opentelemetry.io/otel/trace" ) @@ -15,6 +16,14 @@ func (ext *TraceExtension) Tracer() trace.Tracer { return ext.TracerProvider.Tracer("protoactor") } +func ExtensionFromActorSystem(system *actor.ActorSystem) (*TraceExtension, bool) { + t, ok := system.Extensions.Get(extensionID).(*TraceExtension) + if !ok { + return nil, false + } + return t, true +} + func NewTraceExtension( provider trace.TracerProvider, ) *TraceExtension { diff --git a/actor/middleware/otel/tracing/sender_middleware.go b/actor/middleware/otel/tracing/sender_middleware.go index 2f8bd1a3..a75975a9 100644 --- a/actor/middleware/otel/tracing/sender_middleware.go +++ b/actor/middleware/otel/tracing/sender_middleware.go @@ -36,6 +36,13 @@ func RootContextSenderMiddleware() actor.SenderMiddleware { return } + traceExt, ok := ExtensionFromActorSystem(c.ActorSystem()) + if !ok { + c.Logger().Debug("TraceExtension not registered") + next(c, target, envelope) + return + } + ctxWithParentSpan := context2.Background() spanContext, err := extractSpanContextFromSenderFuncArgs(c, envelope) if errors.Is(err, ErrSpanContextNotFound) { @@ -46,7 +53,6 @@ func RootContextSenderMiddleware() actor.SenderMiddleware { ctxWithParentSpan = trace.ContextWithSpanContext(ctxWithParentSpan, spanContext) } - traceExt := c.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) ctxWithCurrentSpan, span := traceExt.Tracer().Start(ctxWithParentSpan, fmt.Sprintf("message_send/%T", envelope.Message)) defer span.End() span.SetAttributes(attribute.String("SenderActorPID", c.Self().String())) diff --git a/actor/middleware/otel/tracing/span.go b/actor/middleware/otel/tracing/span.go index 13b706e8..2bed7c2e 100644 --- a/actor/middleware/otel/tracing/span.go +++ b/actor/middleware/otel/tracing/span.go @@ -69,6 +69,10 @@ func setSpanContextToEnvelope(spanCtx trace.SpanContext, envelope *actor.Message } // TraceableRootContext creates a RootContext with tracing capabilities -func TraceableRootContext(rootContext actor.RootContext, spanContext trace.SpanContext) *actor.RootContext { - return rootContext.Copy().WithSenderMiddleware(RootContextSenderMiddleware()).WithSpawnMiddleware(RootContextSpawnMiddleware()).WithHeaders(MapFromSpanContext(spanContext)) +func TraceableRootContext(rootContext *actor.RootContext) *actor.RootContext { + return rootContext.Copy().WithSenderMiddleware(RootContextSenderMiddleware()).WithSpawnMiddleware(RootContextSpawnMiddleware()) +} + +func WithSpanRootContext(rootContext *actor.RootContext, span trace.Span) *actor.RootContext { + return rootContext.Copy().WithHeaders(MapFromSpanContext(span.SpanContext())) } diff --git a/actor/middleware/otel/tracing/spawn_middleware.go b/actor/middleware/otel/tracing/spawn_middleware.go index c1debef3..144465bf 100644 --- a/actor/middleware/otel/tracing/spawn_middleware.go +++ b/actor/middleware/otel/tracing/spawn_middleware.go @@ -14,6 +14,13 @@ import ( func rootContextSpawnMiddleware() actor.SpawnMiddleware { return func(next actor.SpawnFunc) actor.SpawnFunc { return func(actorSystem *actor.ActorSystem, id string, props *actor.Props, parentContext actor.SpawnerContext) (pid *actor.PID, e error) { + traceExt, ok := ExtensionFromActorSystem(actorSystem) + if !ok { + actorSystem.Logger().Debug("TraceExtension not registered") + pid, err := next(actorSystem, id, props, parentContext) + return pid, err + } + rootContext, ok := parentContext.(*actor.RootContext) if !ok { parentContext.Logger().Debug("Context is not rootContext", slog.Any("self", parentContext.Self())) @@ -34,7 +41,7 @@ func rootContextSpawnMiddleware() actor.SpawnMiddleware { ctxWithParentSpan = trace.ContextWithSpanContext(ctxWithParentSpan, spanCtx) } } - traceExt := actorSystem.Extensions.Get(extensionID).(*TraceExtension) + _, span := traceExt.Tracer().Start(ctxWithParentSpan, fmt.Sprintf("spawn/%s", id)) defer span.End() span.SetAttributes(attribute.String("SpawnActorPID", pid.String())) diff --git a/examples/opentelemetry-trace/go.mod b/examples/opentelemetry-trace/go.mod index 5662cef7..a3cca71b 100644 --- a/examples/opentelemetry-trace/go.mod +++ b/examples/opentelemetry-trace/go.mod @@ -11,6 +11,7 @@ require ( github.com/asynkron/protoactor-go v0.0.0-20240822202345-3c0e61ca19c9 go.opentelemetry.io/otel v1.33.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.33.0 + go.opentelemetry.io/otel/sdk v1.33.0 ) @@ -33,8 +34,8 @@ require ( github.com/prometheus/procfs v0.11.1 // indirect github.com/twmb/murmur3 v1.1.8 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.44.0 // indirect go.opentelemetry.io/otel/metric v1.33.0 // indirect - go.opentelemetry.io/otel/sdk v1.33.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.33.0 // indirect go.opentelemetry.io/otel/trace v1.33.0 // indirect golang.org/x/sys v0.28.0 // indirect diff --git a/examples/opentelemetry-trace/go.sum b/examples/opentelemetry-trace/go.sum index d7b41af7..a7a7e82a 100644 --- a/examples/opentelemetry-trace/go.sum +++ b/examples/opentelemetry-trace/go.sum @@ -1,5 +1,3 @@ -github.com/Workiva/go-datastructures v1.1.3 h1:LRdRrug9tEuKk7TGfz/sct5gjVj44G9pfqDt4qm7ghw= -github.com/Workiva/go-datastructures v1.1.3/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/Workiva/go-datastructures v1.1.5 h1:5YfhQ4ry7bZc2Mc7R0YZyYwpf5c6t1cEFvdAhd6Mkf4= github.com/Workiva/go-datastructures v1.1.5/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716 h1:SgyG4sXkrlalMoCfp20LiNPNhfJS7ez3opNdtihIxPc= @@ -19,12 +17,8 @@ github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -73,8 +67,6 @@ go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5W go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM= go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM= -go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= -go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/sdk/metric v1.33.0 h1:Gs5VK9/WUJhNXZgn8MR6ITatvAmKeIuCtNbsP3JkNqU= go.opentelemetry.io/otel/sdk/metric v1.33.0/go.mod h1:dL5ykHZmm1B1nVRk9dDjChwDmt81MjVp3gLkQRwKf/Q= go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= @@ -101,10 +93,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/examples/opentelemetry-trace/main.go b/examples/opentelemetry-trace/main.go index ebd1ce7c..e76a03f3 100644 --- a/examples/opentelemetry-trace/main.go +++ b/examples/opentelemetry-trace/main.go @@ -34,6 +34,7 @@ func main() { traceProvider := sdktrace.NewTracerProvider( sdktrace.WithBatcher(traceConsoleExporter), ) + defer func() { err := traceProvider.Shutdown(context2.Background()) if err != nil { @@ -47,11 +48,11 @@ func main() { props := actor.PropsFromProducer(func() actor.Actor { return &helloActor{} }) - + root := tracing.TraceableRootContext(system.Root) otel.SetTracerProvider(traceProvider) - pid := system.Root.Spawn(props) - system.Root.Request(pid, &hello{Who: "with tracing"}) + pid := root.Spawn(props) + root.Request(pid, &hello{Who: "with tracing"}) time.Sleep(100 * time.Millisecond) _, _ = console.ReadLine() }