Skip to content

Commit

Permalink
chore: resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
RRashmit committed Nov 7, 2024
2 parents 3262ad8 + 46ff3d8 commit eaa9291
Show file tree
Hide file tree
Showing 39 changed files with 943 additions and 1,231 deletions.
15 changes: 9 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ GOBIN ?= $$(go env GOPATH)/bin
# | OS_RELEASE | OS_VERSION | NOTES |
# | ---------------- | ----------------------------------------- | -------------------------------------------------------------- |
# | amazonlinux | 2, 2023 | |
# | ubuntu | 20.04, 22.04 | |
# | ubuntu | 20.04, 22.04 24.04 | |
# | debian | bullseye-slim, bookworm-slim | |
# | centos | 7 | |
# | redhatenterprise | 7, 8, 9 | |
# | redhatenterprise | 8, 9 | |
# | rockylinux | 8, 9 | |
# | almalinux | 8, 9 | |
# | alpine | 3.16, 3.17, 3.18, 3.19 | |
# | oraclelinux | 7, 8, 9 | |
# | alpine | 3.17, 3.18, 3.19, 3.20 | |
# | oraclelinux | 8, 9 | |
# | suse | sles12sp5, sle15 | |
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
OS_RELEASE ?= ubuntu
Expand Down Expand Up @@ -151,6 +150,10 @@ build-mock-management-plane-grpc:
mkdir -p $(BUILD_DIR)/mock-management-plane-grpc
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-grpc/server test/mock/grpc/cmd/main.go

build-mock-management-plane-collector:
mkdir -p $(BUILD_DIR)/mock-management-plane-collector
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-collector/collector test/mock/collector/mock-collector/main.go

integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc
TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) BUILD_TARGET="install-agent-local" CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} \
PACKAGES_REPO=$(OSS_PACKAGES_REPO) PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(DOCKERFILE_PATH) IMAGE_PATH=$(IMAGE_PATH) TAG=${IMAGE_TAG} \
Expand Down Expand Up @@ -209,7 +212,7 @@ build-test-oss-image:
--build-arg ENTRY_POINT=./test/docker/entrypoint.sh

.PHONY: build-mock-collector-image
build-mock-collector-image:
build-mock-collector-image: build-mock-management-plane-collector
$(CONTAINER_BUILDENV) $(CONTAINER_CLITOOL) build -t mock-collector . \
--no-cache -f ./test/mock/collector/mock-collector/Dockerfile

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
go.opentelemetry.io/collector/exporter/otlpexporter v0.108.1
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.108.1
go.opentelemetry.io/collector/extension v0.108.1
go.opentelemetry.io/collector/extension/auth v0.108.1
go.opentelemetry.io/collector/filter v0.108.1
go.opentelemetry.io/collector/otelcol v0.108.1
go.opentelemetry.io/collector/pdata v1.14.1
Expand Down Expand Up @@ -268,7 +269,6 @@ require (
go.opentelemetry.io/collector/config/configretry v1.14.1 // indirect
go.opentelemetry.io/collector/config/internal v0.108.1 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.108.1 // indirect
go.opentelemetry.io/collector/extension/auth v0.108.1 // indirect
go.opentelemetry.io/collector/extension/zpagesextension v0.108.1 // indirect
go.opentelemetry.io/collector/featuregate v1.14.1 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.108.1 // indirect
Expand Down
32 changes: 17 additions & 15 deletions internal/bus/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
package bus

const (
AddInstancesTopic = "add-instances"
UpdatedInstancesTopic = "updated-instances"
DeletedInstancesTopic = "deleted-instances"
ResourceUpdateTopic = "resource-update"
NginxConfigUpdateTopic = "nginx-config-update"
InstanceHealthTopic = "instance-health"
ConfigUploadRequestTopic = "config-upload-request"
DataPlaneResponseTopic = "data-plane-response"
ConnectionCreatedTopic = "connection-created"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
ConfigApplyFailedTopic = "config-apply-failed"
RollbackCompleteTopic = "rollback-complete"
RollbackWriteTopic = "rollback-write"
AddInstancesTopic = "add-instances"
UpdatedInstancesTopic = "updated-instances"
DeletedInstancesTopic = "deleted-instances"
ResourceUpdateTopic = "resource-update"
NginxConfigUpdateTopic = "nginx-config-update"
InstanceHealthTopic = "instance-health"
ConfigUploadRequestTopic = "config-upload-request"
DataPlaneResponseTopic = "data-plane-response"
ConnectionCreatedTopic = "connection-created"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
ConfigApplyFailedTopic = "config-apply-failed"
ConfigApplyCompleteTopic = "config-apply-complete"
RollbackWriteTopic = "rollback-write"
DataPlaneHealthRequestTopic = "data-plane-health-request"
DataPlaneHealthResponseTopic = "data-plane-health-response"
)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ func (nls *NginxLogScraper) Scrape(_ context.Context) (pmetric.Metrics, error) {

func (nls *NginxLogScraper) Shutdown(_ context.Context) error {
nls.logger.Info("Shutting down NGINX access log scraper")
nls.cancel()

if nls.cancel != nil {
nls.cancel()
}
nls.wg.Wait()

return nls.pipe.Stop()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 64 additions & 15 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ func (oc *Collector) Subscriptions() []string {
}

func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) {
oc.mu.Lock()
defer oc.mu.Unlock()

nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data)
Expand All @@ -241,13 +244,33 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
}

func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) {
var reloadCollector bool
oc.mu.Lock()
defer oc.mu.Unlock()

resourceUpdateContext, ok := msg.Data.(*v1.Resource)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data)
return
}

resourceProcessorUpdated := oc.updateResourceProcessor(resourceUpdateContext)
headersSetterExtensionUpdated := oc.updateHeadersSetterExtension(ctx, resourceUpdateContext)

if resourceProcessorUpdated || headersSetterExtensionUpdated {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
}

oc.restartCollector(ctx)
}
}

func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) bool {
resourceProcessorUpdated := false

if oc.config.Collector.Processors.Resource == nil {
oc.config.Collector.Processors.Resource = &config.Resource{
Attributes: make([]config.ResourceAttribute, 0),
Expand All @@ -256,7 +279,7 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)

if oc.config.Collector.Processors.Resource != nil &&
resourceUpdateContext.GetResourceId() != "" {
reloadCollector = oc.updateResourceAttributes(
resourceProcessorUpdated = oc.updateResourceAttributes(
[]config.ResourceAttribute{
{
Key: "resource.id",
Expand All @@ -267,21 +290,47 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)
)
}

if reloadCollector {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
return resourceProcessorUpdated
}

func (oc *Collector) updateHeadersSetterExtension(
ctx context.Context,
resourceUpdateContext *v1.Resource,
) bool {
headersSetterExtensionUpdated := false

if oc.config.Collector.Extensions.HeadersSetter != nil &&
oc.config.Collector.Extensions.HeadersSetter.Headers != nil {
isUUIDHeaderSet := false
for _, header := range oc.config.Collector.Extensions.HeadersSetter.Headers {
if header.Key == "uuid" {
isUUIDHeaderSet = true
break
}
}

oc.restartCollector(ctx)
if !isUUIDHeaderSet {
slog.DebugContext(
ctx, "Adding uuid header to OTel collector",
"uuid", resourceUpdateContext.GetResourceId(),
)
oc.config.Collector.Extensions.HeadersSetter.Headers = append(
oc.config.Collector.Extensions.HeadersSetter.Headers,
config.Header{
Action: "insert",
Key: "uuid",
Value: resourceUpdateContext.GetResourceId(),
},
)

headersSetterExtensionUpdated = true
}
}

return headersSetterExtensionUpdated
}

func (oc *Collector) restartCollector(ctx context.Context) {
oc.mu.Lock()
defer oc.mu.Unlock()
err := oc.Close(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
Expand Down Expand Up @@ -404,8 +453,8 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
// nolint: revive
func (oc *Collector) updateResourceAttributes(
attributesToAdd []config.ResourceAttribute,
) (reloadCollector bool) {
reloadCollector = false
) (actionUpdated bool) {
actionUpdated = false

if oc.config.Collector.Processors.Resource.Attributes != nil {
OUTER:
Expand All @@ -419,11 +468,11 @@ func (oc *Collector) updateResourceAttributes(
oc.config.Collector.Processors.Resource.Attributes,
toAdd,
)
reloadCollector = true
actionUpdated = true
}
}

return reloadCollector
return actionUpdated
}

func isOSSReceiverChanged(nginxReceiver config.NginxReceiver, nginxConfigContext *model.NginxConfigContext) bool {
Expand Down
14 changes: 14 additions & 0 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
message *bus.Message
processors config.Processors
name string
headers []config.Header
}{
{
name: "Test 1: Resource update adds resource id attribute",
Expand All @@ -255,6 +256,18 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
},
},
},
headers: []config.Header{
{
Action: "insert",
Key: "authorization",
Value: "fake-authorization",
},
{
Action: "insert",
Key: "uuid",
Value: "1234",
},
},
},
}

Expand Down Expand Up @@ -288,6 +301,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
)

assert.Equal(tt, test.processors, collector.config.Collector.Processors)
assert.Equal(tt, test.headers, collector.config.Collector.Extensions.HeadersSetter.Headers)
})
}
}
Expand Down
Loading

0 comments on commit eaa9291

Please sign in to comment.