Skip to content

Commit

Permalink
Merge branch 'config-apply-determine-file-diff' into update-config-ap…
Browse files Browse the repository at this point in the history
…ply-diagrams
  • Loading branch information
aphralG committed Nov 6, 2024
2 parents 946a5cf + 50c0ab7 commit 79c0241
Show file tree
Hide file tree
Showing 20 changed files with 138 additions and 746 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ build-mock-management-plane-grpc:
mkdir -p $(BUILD_DIR)/mock-management-plane-grpc
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-grpc/server test/mock/grpc/cmd/main.go

build-mock-management-plane-collector:
mkdir -p $(BUILD_DIR)/mock-management-plane-collector
@CGO_ENABLED=0 GOARCH=$(OSARCH) GOOS=linux $(GOBUILD) -o $(BUILD_DIR)/mock-management-plane-collector/collector test/mock/collector/mock-collector/main.go

integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc
TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) BUILD_TARGET="install-agent-local" CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} \
PACKAGES_REPO=$(OSS_PACKAGES_REPO) PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(DOCKERFILE_PATH) IMAGE_PATH=$(IMAGE_PATH) TAG=${IMAGE_TAG} \
Expand Down Expand Up @@ -209,7 +213,7 @@ build-test-oss-image:
--build-arg ENTRY_POINT=./test/docker/entrypoint.sh

.PHONY: build-mock-collector-image
build-mock-collector-image:
build-mock-collector-image: build-mock-management-plane-collector
$(CONTAINER_BUILDENV) $(CONTAINER_CLITOOL) build -t mock-collector . \
--no-cache -f ./test/mock/collector/mock-collector/Dockerfile

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
go.opentelemetry.io/collector/exporter/otlpexporter v0.108.1
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.108.1
go.opentelemetry.io/collector/extension v0.108.1
go.opentelemetry.io/collector/extension/auth v0.108.1
go.opentelemetry.io/collector/filter v0.108.1
go.opentelemetry.io/collector/otelcol v0.108.1
go.opentelemetry.io/collector/pdata v1.14.1
Expand Down Expand Up @@ -268,7 +269,6 @@ require (
go.opentelemetry.io/collector/config/configretry v1.14.1 // indirect
go.opentelemetry.io/collector/config/internal v0.108.1 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.108.1 // indirect
go.opentelemetry.io/collector/extension/auth v0.108.1 // indirect
go.opentelemetry.io/collector/extension/zpagesextension v0.108.1 // indirect
go.opentelemetry.io/collector/featuregate v1.14.1 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.108.1 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ func (nls *NginxLogScraper) Scrape(_ context.Context) (pmetric.Metrics, error) {

func (nls *NginxLogScraper) Shutdown(_ context.Context) error {
nls.logger.Info("Shutting down NGINX access log scraper")
nls.cancel()

if nls.cancel != nil {
nls.cancel()
}
nls.wg.Wait()

return nls.pipe.Stop()
Expand Down
79 changes: 64 additions & 15 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ func (oc *Collector) Subscriptions() []string {
}

func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) {
oc.mu.Lock()
defer oc.mu.Unlock()

nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data)
Expand All @@ -241,13 +244,33 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
}

func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) {
var reloadCollector bool
oc.mu.Lock()
defer oc.mu.Unlock()

resourceUpdateContext, ok := msg.Data.(*v1.Resource)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *v1.Resource", "payload", msg.Data)
return
}

resourceProcessorUpdated := oc.updateResourceProcessor(resourceUpdateContext)
headersSetterExtensionUpdated := oc.updateHeadersSetterExtension(ctx, resourceUpdateContext)

if resourceProcessorUpdated || headersSetterExtensionUpdated {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
}

oc.restartCollector(ctx)
}
}

func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) bool {
resourceProcessorUpdated := false

if oc.config.Collector.Processors.Resource == nil {
oc.config.Collector.Processors.Resource = &config.Resource{
Attributes: make([]config.ResourceAttribute, 0),
Expand All @@ -256,7 +279,7 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)

if oc.config.Collector.Processors.Resource != nil &&
resourceUpdateContext.GetResourceId() != "" {
reloadCollector = oc.updateResourceAttributes(
resourceProcessorUpdated = oc.updateResourceAttributes(
[]config.ResourceAttribute{
{
Key: "resource.id",
Expand All @@ -267,21 +290,47 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)
)
}

if reloadCollector {
slog.InfoContext(ctx, "Reloading OTel collector config")
err := writeCollectorConfig(oc.config.Collector)
if err != nil {
slog.ErrorContext(ctx, "Failed to write OTel Collector config", "error", err)
return
return resourceProcessorUpdated
}

func (oc *Collector) updateHeadersSetterExtension(
ctx context.Context,
resourceUpdateContext *v1.Resource,
) bool {
headersSetterExtensionUpdated := false

if oc.config.Collector.Extensions.HeadersSetter != nil &&
oc.config.Collector.Extensions.HeadersSetter.Headers != nil {
isUUIDHeaderSet := false
for _, header := range oc.config.Collector.Extensions.HeadersSetter.Headers {
if header.Key == "uuid" {
isUUIDHeaderSet = true
break
}
}

oc.restartCollector(ctx)
if !isUUIDHeaderSet {
slog.DebugContext(
ctx, "Adding uuid header to OTel collector",
"uuid", resourceUpdateContext.GetResourceId(),
)
oc.config.Collector.Extensions.HeadersSetter.Headers = append(
oc.config.Collector.Extensions.HeadersSetter.Headers,
config.Header{
Action: "insert",
Key: "uuid",
Value: resourceUpdateContext.GetResourceId(),
},
)

headersSetterExtensionUpdated = true
}
}

return headersSetterExtensionUpdated
}

func (oc *Collector) restartCollector(ctx context.Context) {
oc.mu.Lock()
defer oc.mu.Unlock()
err := oc.Close(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
Expand Down Expand Up @@ -404,8 +453,8 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
// nolint: revive
func (oc *Collector) updateResourceAttributes(
attributesToAdd []config.ResourceAttribute,
) (reloadCollector bool) {
reloadCollector = false
) (actionUpdated bool) {
actionUpdated = false

if oc.config.Collector.Processors.Resource.Attributes != nil {
OUTER:
Expand All @@ -419,11 +468,11 @@ func (oc *Collector) updateResourceAttributes(
oc.config.Collector.Processors.Resource.Attributes,
toAdd,
)
reloadCollector = true
actionUpdated = true
}
}

return reloadCollector
return actionUpdated
}

func isOSSReceiverChanged(nginxReceiver config.NginxReceiver, nginxConfigContext *model.NginxConfigContext) bool {
Expand Down
14 changes: 14 additions & 0 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
message *bus.Message
processors config.Processors
name string
headers []config.Header
}{
{
name: "Test 1: Resource update adds resource id attribute",
Expand All @@ -255,6 +256,18 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
},
},
},
headers: []config.Header{
{
Action: "insert",
Key: "authorization",
Value: "fake-authorization",
},
{
Action: "insert",
Key: "uuid",
Value: "1234",
},
},
},
}

Expand Down Expand Up @@ -288,6 +301,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
)

assert.Equal(tt, test.processors, collector.config.Collector.Processors)
assert.Equal(tt, test.headers, collector.config.Collector.Extensions.HeadersSetter.Headers)
})
}
}
Expand Down
17 changes: 13 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,11 @@ func processOtlpReceivers(tlsConfig *OtlpTLSConfig) error {
}

func resolveExtensions() Extensions {
var health *Health
var headersSetter *HeadersSetter

if isHealthExtensionSet() {
health := &Health{
health = &Health{
Server: &ServerConfig{
Host: viperInstance.GetString(CollectorExtensionsHealthServerHostKey),
Port: viperInstance.GetInt(CollectorExtensionsHealthServerPortKey),
Expand All @@ -594,13 +597,19 @@ func resolveExtensions() Extensions {
ServerName: viperInstance.GetString(CollectorExtensionsHealthTLSServerNameKey),
}
}
}

return Extensions{
Health: health,
if viperInstance.IsSet(CollectorExtensionsHeadersSetterKey) {
err := resolveMapStructure(CollectorExtensionsHeadersSetterKey, &headersSetter)
if err != nil {
headersSetter = nil
}
}

return Extensions{}
return Extensions{
Health: health,
HeadersSetter: headersSetter,
}
}

func isHealthExtensionSet() bool {
Expand Down
1 change: 1 addition & 0 deletions internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
CollectorExtensionsHealthTLSKeyKey = pre(CollectorExtensionsHealthTLSKey) + "key"
CollectorExtensionsHealthTLSServerNameKey = pre(CollectorExtensionsHealthTLSKey) + "server_name"
CollectorExtensionsHealthTLSSkipVerifyKey = pre(CollectorExtensionsHealthTLSKey) + "skip_verify"
CollectorExtensionsHeadersSetterKey = pre(CollectorExtensionsKey) + "headers_setter"
CollectorReceiversKey = pre(CollectorRootKey) + "receivers"
CollectorLogKey = pre(CollectorRootKey) + "log"
CollectorLogLevelKey = pre(CollectorLogKey) + "level"
Expand Down
2 changes: 1 addition & 1 deletion internal/file/file_manager_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (fms *FileManagerService) DetermineFileActions(currentFiles, modifiedFiles
} else if file.GetFileMeta().GetHash() != currentFile.GetFileMeta().GetHash() {
fileContent, readErr := os.ReadFile(fileName)
if readErr != nil {
return nil, nil, fmt.Errorf("error generating hash for file %s, error: %w", fileName, readErr)
return nil, nil, fmt.Errorf("error reading file %s, error: %w", fileName, readErr)
}
file.Action = &updateAction
fileContents[fileName] = fileContent
Expand Down
19 changes: 5 additions & 14 deletions internal/file/file_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) {
assert.True(t, ok)
case test.configApplyStatus == model.RollbackRequired:
assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic)
assert.Len(t, messages, 3)
assert.Len(t, messages, 2)
dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse)
assert.True(t, ok)
assert.Equal(
Expand All @@ -190,20 +190,11 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) {
assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
dataPlaneResponse.GetCommandResponse().GetStatus())
case test.configApplyStatus == model.NoChange:
assert.Len(t, messages, 2)

dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse)
assert.True(t, ok)
assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic)
assert.Equal(
t,
mpi.CommandResponse_COMMAND_STATUS_OK,
dataPlaneResponse.GetCommandResponse().GetStatus(),
)
assert.Len(t, messages, 1)

response, ok := messages[1].Data.(*mpi.DataPlaneResponse)
response, ok := messages[0].Data.(*mpi.DataPlaneResponse)
assert.True(t, ok)
assert.Equal(t, bus.ConfigApplySuccessfulTopic, messages[1].Topic)
assert.Equal(t, bus.ConfigApplySuccessfulTopic, messages[0].Topic)
assert.Equal(
t,
mpi.CommandResponse_COMMAND_STATUS_OK,
Expand All @@ -212,7 +203,7 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) {
case test.message == nil:
assert.Empty(t, messages)
default:
assert.Len(t, messages, 2)
assert.Len(t, messages, 1)
dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse)
assert.True(t, ok)
assert.Equal(
Expand Down
4 changes: 2 additions & 2 deletions internal/watcher/watcher_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
case bus.ConfigApplySuccessfulTopic:
w.handleConfigApplySuccess(ctx, msg)
case bus.ConfigApplyCompleteTopic:
w.handleRollbackComplete(ctx, msg)
w.handleConfigApplyComplete(ctx, msg)
default:
slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message
w.instanceWatcherService.ReparseConfig(ctx, instanceID)
}

func (w *Watcher) handleRollbackComplete(ctx context.Context, msg *bus.Message) {
func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) {
response, ok := msg.Data.(*mpi.DataPlaneResponse)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload",
Expand Down
4 changes: 2 additions & 2 deletions test/integration/grpc_management_plane_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ func clearManagementPlaneResponses(t *testing.T) {

client := resty.New()

url := fmt.Sprintf("http://%s/api/v1/responses/delete", mockManagementPlaneAPIAddress)
resp, err := client.R().EnableTrace().Get(url)
url := fmt.Sprintf("http://%s/api/v1/responses", mockManagementPlaneAPIAddress)
resp, err := client.R().EnableTrace().Delete(url)

require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode())
Expand Down
6 changes: 3 additions & 3 deletions test/mock/collector/mock-collector/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:bookworm

WORKDIR /mock-collector
COPY ./test/mock/collector/mock-collector/ ./
WORKDIR /mock-management-plane-collector
COPY ./build/mock-management-plane-collector ./

CMD ["go", "run", "main.go"]
CMD ["/mock-management-plane-collector/collector"]
4 changes: 3 additions & 1 deletion test/mock/collector/mock-collector/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ var (
)

type HeadersCheck struct {
AuthenticatorID component.ID `mapstructure:"authenticator"`
logger *zap.SugaredLogger
AuthenticatorID component.ID `mapstructure:"authenticator"`
}

type Option func(*HeadersCheck)

// Ensure that the authenticator implements the auth.Server interface.
var _ auth.Server = (*HeadersCheck)(nil)

// nolint: ireturn
func NewFactory() extension.Factory {
return extension.NewFactory(
aType,
Expand All @@ -55,6 +56,7 @@ func (a *HeadersCheck) Authenticate(ctx context.Context, headers map[string][]st
return ctx, nil
}

// nolint: ireturn
func CreateAuthExtensionFunc(
_ context.Context,
setting extension.Settings,
Expand Down
1 change: 1 addition & 0 deletions test/mock/collector/mock-collector/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
AuthenticatorID component.ID `mapstructure:",squash"`
}

// nolint: ireturn
func CreateDefaultConfig() component.Config {
return &Config{
AuthenticatorID: HeadersCheckID,
Expand Down
Loading

0 comments on commit 79c0241

Please sign in to comment.