Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UUID header OTel collector #911

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ build-mock-management-plane-grpc:
mkdir -p $(BUILD_DIR)/mock-management-plane-grpc
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-grpc/server test/mock/grpc/cmd/main.go

build-mock-management-plane-collector:
mkdir -p $(BUILD_DIR)/mock-management-plane-collector
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-collector/collector test/mock/collector/mock-collector/main.go

integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc
TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) BUILD_TARGET="install-agent-local" CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} \
PACKAGES_REPO=$(OSS_PACKAGES_REPO) PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(DOCKERFILE_PATH) IMAGE_PATH=$(IMAGE_PATH) TAG=${IMAGE_TAG} \
Expand Down Expand Up @@ -209,7 +213,7 @@ build-test-oss-image:
--build-arg ENTRY_POINT=./test/docker/entrypoint.sh

.PHONY: build-mock-collector-image
build-mock-collector-image:
build-mock-collector-image: build-mock-management-plane-collector
$(CONTAINER_BUILDENV) $(CONTAINER_CLITOOL) build -t mock-collector . \
--no-cache -f ./test/mock/collector/mock-collector/Dockerfile

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
go.opentelemetry.io/collector/exporter/otlpexporter v0.108.1
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.108.1
go.opentelemetry.io/collector/extension v0.108.1
go.opentelemetry.io/collector/extension/auth v0.108.1
go.opentelemetry.io/collector/filter v0.108.1
go.opentelemetry.io/collector/otelcol v0.108.1
go.opentelemetry.io/collector/pdata v1.14.1
Expand Down Expand Up @@ -268,7 +269,6 @@ require (
go.opentelemetry.io/collector/config/configretry v1.14.1 // indirect
go.opentelemetry.io/collector/config/internal v0.108.1 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.108.1 // indirect
go.opentelemetry.io/collector/extension/auth v0.108.1 // indirect
go.opentelemetry.io/collector/extension/zpagesextension v0.108.1 // indirect
go.opentelemetry.io/collector/featuregate v1.14.1 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.108.1 // indirect
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ func (nls *NginxLogScraper) Scrape(_ context.Context) (pmetric.Metrics, error) {

func (nls *NginxLogScraper) Shutdown(_ context.Context) error {
nls.logger.Info("Shutting down NGINX access log scraper")
nls.cancel()

if nls.cancel != nil {
nls.cancel()
}
nls.wg.Wait()

return nls.pipe.Stop()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 64 additions & 15 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ func (oc *Collector) Subscriptions() []string {
}

func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) {
oc.mu.Lock()
defer oc.mu.Unlock()

nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data)
Expand All @@ -241,13 +244,33 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
}

func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) {
var reloadCollector bool
oc.mu.Lock()
defer oc.mu.Unlock()

resourceUpdateContext, ok := msg.Data.(*v1.Resource)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data)
return
}

resourceProcessorUpdated := oc.updateResourceProcessor(resourceUpdateContext)
headersSetterExtensionUpdated := oc.updateHeadersSetterExtension(ctx, resourceUpdateContext)

if resourceProcessorUpdated || headersSetterExtensionUpdated {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
}

oc.restartCollector(ctx)
}
}

func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) bool {
resourceProcessorUpdated := false

if oc.config.Collector.Processors.Resource == nil {
oc.config.Collector.Processors.Resource = &config.Resource{
Attributes: make([]config.ResourceAttribute, 0),
Expand All @@ -256,7 +279,7 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)

if oc.config.Collector.Processors.Resource != nil &&
resourceUpdateContext.GetResourceId() != "" {
reloadCollector = oc.updateResourceAttributes(
resourceProcessorUpdated = oc.updateResourceAttributes(
[]config.ResourceAttribute{
{
Key: "resource.id",
Expand All @@ -267,21 +290,47 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)
)
}

if reloadCollector {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
return resourceProcessorUpdated
}

func (oc *Collector) updateHeadersSetterExtension(
ctx context.Context,
resourceUpdateContext *v1.Resource,
) bool {
headersSetterExtensionUpdated := false

if oc.config.Collector.Extensions.HeadersSetter != nil &&
oc.config.Collector.Extensions.HeadersSetter.Headers != nil {
isUUIDHeaderSet := false
for _, header := range oc.config.Collector.Extensions.HeadersSetter.Headers {
if header.Key == "uuid" {
isUUIDHeaderSet = true
break
}
}

oc.restartCollector(ctx)
if !isUUIDHeaderSet {
slog.DebugContext(
ctx, "Adding uuid header to OTel collector",
"uuid", resourceUpdateContext.GetResourceId(),
)
oc.config.Collector.Extensions.HeadersSetter.Headers = append(
oc.config.Collector.Extensions.HeadersSetter.Headers,
config.Header{
Action: "insert",
Key: "uuid",
Value: resourceUpdateContext.GetResourceId(),
},
)

headersSetterExtensionUpdated = true
}
}

return headersSetterExtensionUpdated
}

func (oc *Collector) restartCollector(ctx context.Context) {
oc.mu.Lock()
sean-breen marked this conversation as resolved.
Show resolved Hide resolved
defer oc.mu.Unlock()
err := oc.Close(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
Expand Down Expand Up @@ -404,8 +453,8 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
// nolint: revive
func (oc *Collector) updateResourceAttributes(
attributesToAdd []config.ResourceAttribute,
) (reloadCollector bool) {
reloadCollector = false
) (actionUpdated bool) {
actionUpdated = false

if oc.config.Collector.Processors.Resource.Attributes != nil {
OUTER:
Expand All @@ -419,11 +468,11 @@ func (oc *Collector) updateResourceAttributes(
oc.config.Collector.Processors.Resource.Attributes,
toAdd,
)
reloadCollector = true
actionUpdated = true
}
}

return reloadCollector
return actionUpdated
}

func isOSSReceiverChanged(nginxReceiver config.NginxReceiver, nginxConfigContext *model.NginxConfigContext) bool {
Expand Down
14 changes: 14 additions & 0 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
message *bus.Message
processors config.Processors
name string
headers []config.Header
}{
{
name: "Test 1: Resource update adds resource id attribute",
Expand All @@ -255,6 +256,18 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
},
},
},
headers: []config.Header{
{
Action: "insert",
Key: "authorization",
Value: "fake-authorization",
},
{
Action: "insert",
Key: "uuid",
Value: "1234",
},
},
},
}

Expand Down Expand Up @@ -288,6 +301,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
)

assert.Equal(tt, test.processors, collector.config.Collector.Processors)
assert.Equal(tt, test.headers, collector.config.Collector.Extensions.HeadersSetter.Headers)
})
}
}
Expand Down
17 changes: 13 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,11 @@ func processOtlpReceivers(tlsConfig *OtlpTLSConfig) error {
}

func resolveExtensions() Extensions {
var health *Health
var headersSetter *HeadersSetter

if isHealthExtensionSet() {
health := &Health{
health = &Health{
Server: &ServerConfig{
Host: viperInstance.GetString(CollectorExtensionsHealthServerHostKey),
Port: viperInstance.GetInt(CollectorExtensionsHealthServerPortKey),
Expand All @@ -594,13 +597,19 @@ func resolveExtensions() Extensions {
ServerName: viperInstance.GetString(CollectorExtensionsHealthTLSServerNameKey),
}
}
}

return Extensions{
Health: health,
if viperInstance.IsSet(CollectorExtensionsHeadersSetterKey) {
err := resolveMapStructure(CollectorExtensionsHeadersSetterKey, &headersSetter)
if err != nil {
headersSetter = nil
}
}

return Extensions{}
return Extensions{
Health: health,
HeadersSetter: headersSetter,
}
}

func isHealthExtensionSet() bool {
Expand Down
1 change: 1 addition & 0 deletions internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
CollectorExtensionsHealthTLSKeyKey = pre(CollectorExtensionsHealthTLSKey) + "key"
CollectorExtensionsHealthTLSServerNameKey = pre(CollectorExtensionsHealthTLSKey) + "server_name"
CollectorExtensionsHealthTLSSkipVerifyKey = pre(CollectorExtensionsHealthTLSKey) + "skip_verify"
CollectorExtensionsHeadersSetterKey = pre(CollectorExtensionsKey) + "headers_setter"
CollectorReceiversKey = pre(CollectorRootKey) + "receivers"
CollectorLogKey = pre(CollectorRootKey) + "log"
CollectorLogLevelKey = pre(CollectorLogKey) + "level"
Expand Down
6 changes: 3 additions & 3 deletions test/mock/collector/mock-collector/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:bookworm

WORKDIR /mock-collector
COPY ./test/mock/collector/mock-collector/ ./
WORKDIR /mock-management-plane-collector
COPY ./build/mock-management-plane-collector ./

CMD ["go", "run", "main.go"]
CMD ["/mock-management-plane-collector/collector"]
4 changes: 3 additions & 1 deletion test/mock/collector/mock-collector/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ var (
)

type HeadersCheck struct {
AuthenticatorID component.ID `mapstructure:"authenticator"`
logger *zap.SugaredLogger
AuthenticatorID component.ID `mapstructure:"authenticator"`
}

type Option func(*HeadersCheck)

// Ensure that the authenticator implements the auth.Server interface.
var _ auth.Server = (*HeadersCheck)(nil)

// nolint: ireturn
func NewFactory() extension.Factory {
return extension.NewFactory(
aType,
Expand All @@ -55,6 +56,7 @@ func (a *HeadersCheck) Authenticate(ctx context.Context, headers map[string][]st
return ctx, nil
}

// nolint: ireturn
func CreateAuthExtensionFunc(
_ context.Context,
setting extension.Settings,
Expand Down
1 change: 1 addition & 0 deletions test/mock/collector/mock-collector/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
AuthenticatorID component.ID `mapstructure:",squash"`
}

// nolint: ireturn
func CreateDefaultConfig() component.Config {
return &Config{
AuthenticatorID: HeadersCheckID,
Expand Down
Loading
Loading