Skip to content

Commit

Permalink
Merge branch 'v3' into config-apply-determine-file-diff
Browse files Browse the repository at this point in the history
  • Loading branch information
aphralG committed Nov 6, 2024
2 parents 48a054c + f4a0803 commit 50c0ab7
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 726 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ build-mock-management-plane-grpc:
mkdir -p $(BUILD_DIR)/mock-management-plane-grpc
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-grpc/server test/mock/grpc/cmd/main.go

build-mock-management-plane-collector:
mkdir -p $(BUILD_DIR)/mock-management-plane-collector
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-collector/collector test/mock/collector/mock-collector/main.go

integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc
TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) BUILD_TARGET="install-agent-local" CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} \
PACKAGES_REPO=$(OSS_PACKAGES_REPO) PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(DOCKERFILE_PATH) IMAGE_PATH=$(IMAGE_PATH) TAG=${IMAGE_TAG} \
Expand Down Expand Up @@ -209,7 +213,7 @@ build-test-oss-image:
--build-arg ENTRY_POINT=./test/docker/entrypoint.sh

.PHONY: build-mock-collector-image
build-mock-collector-image:
build-mock-collector-image: build-mock-management-plane-collector
$(CONTAINER_BUILDENV) $(CONTAINER_CLITOOL) build -t mock-collector . \
--no-cache -f ./test/mock/collector/mock-collector/Dockerfile

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
go.opentelemetry.io/collector/exporter/otlpexporter v0.108.1
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.108.1
go.opentelemetry.io/collector/extension v0.108.1
go.opentelemetry.io/collector/extension/auth v0.108.1
go.opentelemetry.io/collector/filter v0.108.1
go.opentelemetry.io/collector/otelcol v0.108.1
go.opentelemetry.io/collector/pdata v1.14.1
Expand Down Expand Up @@ -268,7 +269,6 @@ require (
go.opentelemetry.io/collector/config/configretry v1.14.1 // indirect
go.opentelemetry.io/collector/config/internal v0.108.1 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.108.1 // indirect
go.opentelemetry.io/collector/extension/auth v0.108.1 // indirect
go.opentelemetry.io/collector/extension/zpagesextension v0.108.1 // indirect
go.opentelemetry.io/collector/featuregate v1.14.1 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.108.1 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ func (nls *NginxLogScraper) Scrape(_ context.Context) (pmetric.Metrics, error) {

func (nls *NginxLogScraper) Shutdown(_ context.Context) error {
nls.logger.Info("Shutting down NGINX access log scraper")
nls.cancel()

if nls.cancel != nil {
nls.cancel()
}
nls.wg.Wait()

return nls.pipe.Stop()
Expand Down
79 changes: 64 additions & 15 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ func (oc *Collector) Subscriptions() []string {
}

func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) {
oc.mu.Lock()
defer oc.mu.Unlock()

nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data)
Expand All @@ -241,13 +244,33 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
}

func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) {
var reloadCollector bool
oc.mu.Lock()
defer oc.mu.Unlock()

resourceUpdateContext, ok := msg.Data.(*v1.Resource)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data)
return
}

resourceProcessorUpdated := oc.updateResourceProcessor(resourceUpdateContext)
headersSetterExtensionUpdated := oc.updateHeadersSetterExtension(ctx, resourceUpdateContext)

if resourceProcessorUpdated || headersSetterExtensionUpdated {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
}

oc.restartCollector(ctx)
}
}

func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) bool {
resourceProcessorUpdated := false

if oc.config.Collector.Processors.Resource == nil {
oc.config.Collector.Processors.Resource = &config.Resource{
Attributes: make([]config.ResourceAttribute, 0),
Expand All @@ -256,7 +279,7 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)

if oc.config.Collector.Processors.Resource != nil &&
resourceUpdateContext.GetResourceId() != "" {
reloadCollector = oc.updateResourceAttributes(
resourceProcessorUpdated = oc.updateResourceAttributes(
[]config.ResourceAttribute{
{
Key: "resource.id",
Expand All @@ -267,21 +290,47 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)
)
}

if reloadCollector {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
return resourceProcessorUpdated
}

func (oc *Collector) updateHeadersSetterExtension(
ctx context.Context,
resourceUpdateContext *v1.Resource,
) bool {
headersSetterExtensionUpdated := false

if oc.config.Collector.Extensions.HeadersSetter != nil &&
oc.config.Collector.Extensions.HeadersSetter.Headers != nil {
isUUIDHeaderSet := false
for _, header := range oc.config.Collector.Extensions.HeadersSetter.Headers {
if header.Key == "uuid" {
isUUIDHeaderSet = true
break
}
}

oc.restartCollector(ctx)
if !isUUIDHeaderSet {
slog.DebugContext(
ctx, "Adding uuid header to OTel collector",
"uuid", resourceUpdateContext.GetResourceId(),
)
oc.config.Collector.Extensions.HeadersSetter.Headers = append(
oc.config.Collector.Extensions.HeadersSetter.Headers,
config.Header{
Action: "insert",
Key: "uuid",
Value: resourceUpdateContext.GetResourceId(),
},
)

headersSetterExtensionUpdated = true
}
}

return headersSetterExtensionUpdated
}

func (oc *Collector) restartCollector(ctx context.Context) {
oc.mu.Lock()
defer oc.mu.Unlock()
err := oc.Close(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
Expand Down Expand Up @@ -404,8 +453,8 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
// nolint: revive
func (oc *Collector) updateResourceAttributes(
attributesToAdd []config.ResourceAttribute,
) (reloadCollector bool) {
reloadCollector = false
) (actionUpdated bool) {
actionUpdated = false

if oc.config.Collector.Processors.Resource.Attributes != nil {
OUTER:
Expand All @@ -419,11 +468,11 @@ func (oc *Collector) updateResourceAttributes(
oc.config.Collector.Processors.Resource.Attributes,
toAdd,
)
reloadCollector = true
actionUpdated = true
}
}

return reloadCollector
return actionUpdated
}

func isOSSReceiverChanged(nginxReceiver config.NginxReceiver, nginxConfigContext *model.NginxConfigContext) bool {
Expand Down
14 changes: 14 additions & 0 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
message *bus.Message
processors config.Processors
name string
headers []config.Header
}{
{
name: "Test 1: Resource update adds resource id attribute",
Expand All @@ -255,6 +256,18 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
},
},
},
headers: []config.Header{
{
Action: "insert",
Key: "authorization",
Value: "fake-authorization",
},
{
Action: "insert",
Key: "uuid",
Value: "1234",
},
},
},
}

Expand Down Expand Up @@ -288,6 +301,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
)

assert.Equal(tt, test.processors, collector.config.Collector.Processors)
assert.Equal(tt, test.headers, collector.config.Collector.Extensions.HeadersSetter.Headers)
})
}
}
Expand Down
17 changes: 13 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,11 @@ func processOtlpReceivers(tlsConfig *OtlpTLSConfig) error {
}

func resolveExtensions() Extensions {
var health *Health
var headersSetter *HeadersSetter

if isHealthExtensionSet() {
health := &Health{
health = &Health{
Server: &ServerConfig{
Host: viperInstance.GetString(CollectorExtensionsHealthServerHostKey),
Port: viperInstance.GetInt(CollectorExtensionsHealthServerPortKey),
Expand All @@ -594,13 +597,19 @@ func resolveExtensions() Extensions {
ServerName: viperInstance.GetString(CollectorExtensionsHealthTLSServerNameKey),
}
}
}

return Extensions{
Health: health,
if viperInstance.IsSet(CollectorExtensionsHeadersSetterKey) {
err := resolveMapStructure(CollectorExtensionsHeadersSetterKey, &headersSetter)
if err != nil {
headersSetter = nil
}
}

return Extensions{}
return Extensions{
Health: health,
HeadersSetter: headersSetter,
}
}

func isHealthExtensionSet() bool {
Expand Down
1 change: 1 addition & 0 deletions internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
CollectorExtensionsHealthTLSKeyKey = pre(CollectorExtensionsHealthTLSKey) + "key"
CollectorExtensionsHealthTLSServerNameKey = pre(CollectorExtensionsHealthTLSKey) + "server_name"
CollectorExtensionsHealthTLSSkipVerifyKey = pre(CollectorExtensionsHealthTLSKey) + "skip_verify"
CollectorExtensionsHeadersSetterKey = pre(CollectorExtensionsKey) + "headers_setter"
CollectorReceiversKey = pre(CollectorRootKey) + "receivers"
CollectorLogKey = pre(CollectorRootKey) + "log"
CollectorLogLevelKey = pre(CollectorLogKey) + "level"
Expand Down
6 changes: 3 additions & 3 deletions test/mock/collector/mock-collector/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:bookworm

WORKDIR /mock-collector
COPY ./test/mock/collector/mock-collector/ ./
WORKDIR /mock-management-plane-collector
COPY ./build/mock-management-plane-collector ./

CMD ["go", "run", "main.go"]
CMD ["/mock-management-plane-collector/collector"]
4 changes: 3 additions & 1 deletion test/mock/collector/mock-collector/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ var (
)

type HeadersCheck struct {
AuthenticatorID component.ID `mapstructure:"authenticator"`
logger *zap.SugaredLogger
AuthenticatorID component.ID `mapstructure:"authenticator"`
}

type Option func(*HeadersCheck)

// Ensure that the authenticator implements the auth.Server interface.
var _ auth.Server = (*HeadersCheck)(nil)

// nolint: ireturn
func NewFactory() extension.Factory {
return extension.NewFactory(
aType,
Expand All @@ -55,6 +56,7 @@ func (a *HeadersCheck) Authenticate(ctx context.Context, headers map[string][]st
return ctx, nil
}

// nolint: ireturn
func CreateAuthExtensionFunc(
_ context.Context,
setting extension.Settings,
Expand Down
1 change: 1 addition & 0 deletions test/mock/collector/mock-collector/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
AuthenticatorID component.ID `mapstructure:",squash"`
}

// nolint: ireturn
func CreateDefaultConfig() component.Config {
return &Config{
AuthenticatorID: HeadersCheckID,
Expand Down
Loading

0 comments on commit 50c0ab7

Please sign in to comment.