Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl otel trace middleware #1137

Draft
wants to merge 7 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions actor/middleware/otel/tracing/context_decorator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package tracing

import (
context2 "context"
"errors"
"fmt"
"github.com/asynkron/protoactor-go/actor"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"log/slog"
"time"
)

// ActorContext is a decorator for actor.Context that adds OpenTelemetry tracing capabilities.
type ActorContext struct {
actor.Context
withTracingSpanContext context2.Context
}

var _ actor.Context = (*ActorContext)(nil)

func (ac *ActorContext) Receive(envelope *actor.MessageEnvelope) {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
ac.Context.Receive(envelope)
return
}
ac.withTracingSpanContext = context2.Background()
defer func() {
ac.withTracingSpanContext = nil
}()
if envelope.Header != nil {
spanContext, err := spanContextFromMessageHeader(envelope.Header)
if errors.Is(err, ErrSpanContextNotFound) {
ac.Logger().Debug("No spanContext found", slog.Any("self", ac.Self()), slog.Any("error", err))
} else if err != nil {
ac.Logger().Debug("Error extracting spanContext", slog.Any("self", ac.Self()), slog.Any("error", err))
} else {
ac.withTracingSpanContext = trace.ContextWithSpanContext(ac.withTracingSpanContext, spanContext)
}
}
startSpan := func(suffix string) trace.Span {
ctx, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_receive/%T/%s", ac.Actor(), suffix))
span.SetAttributes(attribute.String("ActorPID", ac.Self().String()))
span.SetAttributes(attribute.String("ActorType", fmt.Sprintf("%T", ac.Actor())))
span.SetAttributes(attribute.String("MessageType", fmt.Sprintf("%T", envelope.Message)))
ac.withTracingSpanContext = ctx
return span
}

switch envelope.Message.(type) {
case *actor.Started:
span := startSpan("started")
defer span.End()
case *actor.Stopping:
span := startSpan("stopping")
defer span.End()
case *actor.Stopped:
span := startSpan("stopped")
defer span.End()
default:
span := startSpan(fmt.Sprintf("%T", envelope.Message))
defer span.End()
}

ac.Context.Receive(envelope)
}

func (ac *ActorContext) Send(pid *actor.PID, message interface{}) {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
ac.Context.Send(pid, message)
return
}
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_send/%T", message))
defer span.End()
setSenderSpanAttributes(pid, message, span, ac)
envelop := messageToEnvelop(message, ac, ac.Self())

ac.Context.Send(pid, envelop)
}

func (ac *ActorContext) Request(pid *actor.PID, message interface{}) {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
ac.Context.Request(pid, message)
return
}
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_request/%T", message))
defer span.End()
setSenderSpanAttributes(pid, message, span, ac)
envelop := messageToEnvelop(message, ac, ac.Self())

ac.Context.Send(pid, envelop)
}

func (ac *ActorContext) RequestWithCustomSender(pid *actor.PID, message interface{}, sender *actor.PID) {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
ac.Context.RequestWithCustomSender(pid, message, sender)
return
}
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_request_with_custom_sender/%T", message))
defer span.End()
setSenderSpanAttributes(pid, message, span, ac)
span.SetAttributes(attribute.String("CustomSenderActorPID", sender.String()))

envelop := messageToEnvelop(message, ac, sender)

ac.Context.Send(pid, envelop)
}

func messageToEnvelop(message interface{}, t *ActorContext, sender *actor.PID) *actor.MessageEnvelope {
envelop, ok := message.(*actor.MessageEnvelope)
if ok {
setSpanContextToEnvelope(trace.SpanContextFromContext(t.withTracingSpanContext), envelop)
} else {
envelop = wrapEnvelopeWithSpanContext(trace.SpanContextFromContext(t.withTracingSpanContext), message, sender)
}
return envelop
}

func (ac *ActorContext) RequestFuture(pid *actor.PID, message interface{}, timeout time.Duration) *actor.Future {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
return ac.Context.RequestFuture(pid, message, timeout)
}
future := actor.NewFuture(ac.ActorSystem(), timeout)
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_request_future/%T", message))
defer span.End()
setSenderSpanAttributes(pid, message, span, ac)
envelop := messageToEnvelop(message, ac, future.PID())

ac.Context.RequestFuture(pid, envelop, timeout)
return future

}
func (ac *ActorContext) Respond(response interface{}) {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
ac.Context.Respond(response)
return
}
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, fmt.Sprintf("message_respond/%T", response))
defer span.End()
setSenderSpanAttributes(ac.Sender(), response, span, ac)
envelop := messageToEnvelop(response, ac, ac.Self())

ac.Context.Respond(envelop)
}

func ContextDecorator() func(next actor.ContextDecoratorFunc) actor.ContextDecoratorFunc {
return func(next actor.ContextDecoratorFunc) actor.ContextDecoratorFunc {
return func(ctx actor.Context) actor.Context {
return next(&ActorContext{ctx, nil})
}
}
}

func (ac *ActorContext) Spawn(props *actor.Props) *actor.PID {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
return ac.Context.Spawn(props)
}
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, "spawn")
defer span.End()
props.Configure(actor.WithContextDecorator(ContextDecorator()))
pid := ac.Context.Spawn(props)
span.SetName(fmt.Sprintf("spawn/%s", pid.Id))
span.SetAttributes(attribute.String("SpawnActorPID", pid.String()))
return pid
}

func (ac *ActorContext) SpawnPrefix(props *actor.Props, prefix string) *actor.PID {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
return ac.Context.SpawnPrefix(props, prefix)
}
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, "spawn")
defer span.End()
props.Configure(actor.WithContextDecorator(ContextDecorator()))
pid := ac.Context.SpawnPrefix(props, prefix)
span.SetAttributes(attribute.String("Prefix", prefix))

span.SetName(fmt.Sprintf("spawn/%s", pid.Id))
span.SetAttributes(attribute.String("SpawnActorPID", pid.String()))
return pid
}

func (ac *ActorContext) SpawnNamed(props *actor.Props, id string) (*actor.PID, error) {
traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem())
if !ok {
ac.Logger().Debug("TraceExtension not registered")
return ac.Context.SpawnNamed(props, id)
}
_, span := traceExt.Tracer().Start(ac.withTracingSpanContext, "spawn")
defer span.End()
props.Configure(actor.WithContextDecorator(ContextDecorator()))
span.SetAttributes(attribute.String("ID", id))
pid, err := ac.Context.SpawnNamed(props, id)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetName(fmt.Sprintf("spawn/%s", pid.Id))
span.SetAttributes(attribute.String("SpawnActorPID", pid.String()))
}
return pid, err
}
43 changes: 43 additions & 0 deletions actor/middleware/otel/tracing/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tracing

import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/extensions"
"go.opentelemetry.io/otel/trace"
)

var extensionID = extensions.NextExtensionID()

type TraceExtension struct {
trace.TracerProvider
}

func (ext *TraceExtension) Tracer() trace.Tracer {
return ext.TracerProvider.Tracer("protoactor")
}

func ExtensionFromActorSystem(system *actor.ActorSystem) (*TraceExtension, bool) {
t, ok := system.Extensions.Get(extensionID).(*TraceExtension)
if !ok {
return nil, false
}
return t, true
}

func NewTraceExtension(
provider trace.TracerProvider,
) *TraceExtension {
return &TraceExtension{
provider,
}
}

func (ext *TraceExtension) Enabled() bool {
return true
}

func (ext *TraceExtension) ExtensionID() extensions.ExtensionID {
return extensionID
}

var _ extensions.Extension = &TraceExtension{}
14 changes: 14 additions & 0 deletions actor/middleware/otel/tracing/middleware_propagation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package tracing

import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/actor/middleware/propagator"
)

func RootContextSpawnMiddleware() actor.SpawnMiddleware {
return propagator.New().
WithItselfForwarded().
WithSpawnMiddleware(rootContextSpawnMiddleware()).
WithContextDecorator(ContextDecorator()).
SpawnMiddleware
}
69 changes: 69 additions & 0 deletions actor/middleware/otel/tracing/sender_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package tracing

import (
context2 "context"
"errors"
"fmt"
"github.com/asynkron/protoactor-go/actor"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log/slog"
)

func MapFromSpanContext(spanCtx trace.SpanContext) map[string]string {
return map[string]string{
"parent-id": spanCtx.SpanID().String(),
"trace-id": spanCtx.TraceID().String(),
"tracestate": spanCtx.TraceState().String(),
"trace-flags": fmt.Sprintf("%02x", byte(spanCtx.TraceFlags())),
}
}

func extractSpanContextFromSenderFuncArgs(c actor.SenderContext, envelope *actor.MessageEnvelope) (trace.SpanContext, error) {
fromCtxMessageHeader, err := spanContextFromMessageHeader(c.MessageHeader())
if !errors.Is(err, ErrSpanContextNotFound) {
return fromCtxMessageHeader, nil
}
return spanContextFromMessageHeader(envelope.Header)
}

func RootContextSenderMiddleware() actor.SenderMiddleware {
return func(next actor.SenderFunc) actor.SenderFunc {
return func(c actor.SenderContext, target *actor.PID, envelope *actor.MessageEnvelope) {
if _, ok := c.(*actor.RootContext); !ok {
c.Logger().Debug("Context is not a receiver context", slog.Any("self", c.Self()))
next(c, target, envelope)
return
}

traceExt, ok := ExtensionFromActorSystem(c.ActorSystem())
if !ok {
c.Logger().Debug("TraceExtension not registered")
next(c, target, envelope)
return
}

ctxWithParentSpan := context2.Background()
spanContext, err := extractSpanContextFromSenderFuncArgs(c, envelope)
if errors.Is(err, ErrSpanContextNotFound) {
c.Logger().Debug("No spanContext found", slog.Any("self", c.Self()), slog.Any("error", err))
} else if err != nil {
c.Logger().Error("Error extracting spanContext", slog.Any("self", c.Self()), slog.Any("error", err))
} else {
ctxWithParentSpan = trace.ContextWithSpanContext(ctxWithParentSpan, spanContext)
}

ctxWithCurrentSpan, span := traceExt.Tracer().Start(ctxWithParentSpan, fmt.Sprintf("message_send/%T", envelope.Message))
defer span.End()
span.SetAttributes(attribute.String("SenderActorPID", c.Self().String()))
span.SetAttributes(attribute.String("SenderActorType", fmt.Sprintf("%T", c.Actor())))
span.SetAttributes(attribute.String("TargetActorPID", target.String()))
span.SetAttributes(attribute.String("MessageType", fmt.Sprintf("%T", envelope.Message)))
setSpanContextToEnvelope(trace.SpanContextFromContext(
ctxWithCurrentSpan,
), envelope)

next(c, target, envelope)
}
}
}
Loading