diff --git a/Makefile.packaging b/Makefile.packaging index 000ab6c1d8..35bc95bf8e 100644 --- a/Makefile.packaging +++ b/Makefile.packaging @@ -10,7 +10,7 @@ BINARY_PATH := $(BUILD_DIR)/$(BINARY_NAME) GPG_PUBLIC_KEY := .key PACKAGE_BUILD ?= 1 PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')-$(PACKAGE_BUILD) -APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')_$(PACKAGE_BUILD) +APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v').$(PACKAGE_BUILD) TARBALL_NAME := $(PACKAGE_PREFIX)v3.tar.gz DEB_DISTROS ?= ubuntu-noble-24.04 ubuntu-jammy-22.04 ubuntu-focal-20.04 debian-bookworm-12 debian-bullseye-11 diff --git a/go.mod b/go.mod index 065a39177d..6d3a17c73d 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.114.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.114.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.114.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0 github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.114.0 github.com/shirou/gopsutil/v4 v4.24.10 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index d32ecd72a4..4af474b03f 100644 --- a/go.sum +++ b/go.sum @@ -474,6 +474,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusrec github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.114.0/go.mod h1:T1p6ShTr8farkE4qUB2TyGUIvRSN3s17D0qY7rMqCRM= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.114.0 h1:chiIs7XGNSoptd0VgVR912NFogB8srpuVzgeVM9xW0w= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.114.0/go.mod h1:cHIBJ01AgmkTUN2FgXz9NU8LbnLGUR2uxssCdiqwV4w= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0 h1:C1487YIMVBuJ8ixBLChyIl8VHlF8Ir6l2hfeYNNPPLM= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0/go.mod h1:+SvBS7Xu+pI67e5outljqWDMt+bIu9B6XON0nM4PIhY= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.114.0 h1:E686MeQcQ+a3Q47A/xAc3Nk6Qdz8wHcBLMJ3Y8bNKi0= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.114.0/go.mod h1:zkQAapuNRobj7GY8kKRal+2EYkAMWmZ1KMysUrQI48A= github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.114.0 h1:/4hKXCQaD78y6cFA4xXMfZhh86/RNwrjyr6xnhFs5AA= diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index c525fbe775..0ceef8c730 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -48,7 +48,7 @@ import ( // Information from https://pkg.go.dev/github.com/cenkalti/backoff/v4#section-readme func WaitUntil( ctx context.Context, - backoffSettings *config.CommonSettings, + backoffSettings *config.BackOff, operation backoff.Operation, ) error { eb := backoff.NewExponentialBackOff() @@ -68,7 +68,7 @@ func WaitUntil( // nolint: ireturn func WaitUntilWithData[T any]( ctx context.Context, - backoffSettings *config.CommonSettings, + backoffSettings *config.BackOff, operation backoff.OperationWithData[T], ) (T, error) { backoffWithContext := Context(ctx, backoffSettings) @@ -77,7 +77,7 @@ func WaitUntilWithData[T any]( } // nolint: ireturn -func Context(ctx context.Context, backoffSettings *config.CommonSettings) backoff.BackOffContext { +func Context(ctx context.Context, backoffSettings *config.BackOff) backoff.BackOffContext { eb := backoff.NewExponentialBackOff() eb.InitialInterval = backoffSettings.InitialInterval eb.MaxInterval = backoffSettings.MaxInterval diff --git a/internal/backoff/backoff_test.go b/internal/backoff/backoff_test.go index 43e794292a..60cbc75a9b 100644 --- a/internal/backoff/backoff_test.go +++ b/internal/backoff/backoff_test.go @@ -82,14 +82,16 @@ func TestWaitUntil(t *testing.T) { for _, test := range tests { invocations = 0 - settings := &config.CommonSettings{ - InitialInterval: test.initialInterval, - MaxInterval: test.maxInterval, - MaxElapsedTime: test.maxElapsedTime, - RandomizationFactor: config.DefBackoffRandomizationFactor, - Multiplier: config.DefBackoffMultiplier, + settings := &config.Client{ + Backoff: &config.BackOff{ + InitialInterval: test.initialInterval, + MaxInterval: test.maxInterval, + MaxElapsedTime: test.maxElapsedTime, + RandomizationFactor: config.DefBackoffRandomizationFactor, + Multiplier: config.DefBackoffMultiplier, + }, } - result := WaitUntil(test.context, settings, test.operation) + result := WaitUntil(test.context, settings.Backoff, test.operation) if test.expectedError { assert.Errorf(t, result, test.name) @@ -164,7 +166,7 @@ func TestWaitUntilWithData(t *testing.T) { } for _, test := range tests { - settings := &config.CommonSettings{ + settings := &config.BackOff{ InitialInterval: test.initialInterval, MaxInterval: test.maxInterval, MaxElapsedTime: test.maxElapsedTime, @@ -185,7 +187,7 @@ func TestWaitUntilWithData(t *testing.T) { } func TestContext(t *testing.T) { - settings := &config.CommonSettings{ + settings := &config.BackOff{ InitialInterval: 10 * time.Millisecond, MaxInterval: 10 * time.Millisecond, MaxElapsedTime: 10 * time.Millisecond, diff --git a/internal/collector/factories.go b/internal/collector/factories.go index 92fa823ccc..b7bb0d93e3 100644 --- a/internal/collector/factories.go +++ b/internal/collector/factories.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" @@ -98,6 +99,7 @@ func createReceiverFactories() (map[component.Type]receiver.Factory, error) { hostmetricsreceiver.NewFactory(), nginxreceiver.NewFactory(), nginxplusreceiver.NewFactory(), + tcplogreceiver.NewFactory(), } return receiver.MakeFactoryMap(receiverList...) diff --git a/internal/collector/factories_test.go b/internal/collector/factories_test.go index 339c99af62..140a5f873c 100644 --- a/internal/collector/factories_test.go +++ b/internal/collector/factories_test.go @@ -18,7 +18,7 @@ func TestOTelComponentFactoriesDefault(t *testing.T) { require.NoError(t, err, "OTelComponentFactories should not return an error") assert.NotNil(t, factories, "factories should not be nil") - assert.Len(t, factories.Receivers, 4) + assert.Len(t, factories.Receivers, 5) assert.Len(t, factories.Processors, 8) assert.Len(t, factories.Exporters, 4) assert.Len(t, factories.Extensions, 3) diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index e91fbcee5e..cb3429be63 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -26,6 +26,16 @@ import ( const ( maxTimeToWaitForShutdown = 30 * time.Second filePermission = 0o600 + // To conform to the rfc3164 spec the timestamp in the logs need to be formatted correctly. + // Here are some examples of what the timestamp conversions look like. + // Notice how if the day begins with a zero that the zero is replaced with an empty space. + + // 2024-11-06T17:19:24+00:00 ---> Nov 6 17:19:24 + // 2024-11-16T17:19:24+00:00 ---> Nov 16 17:19:24 + timestampConversionExpression = `'EXPR(let timestamp = split(split(body, ">")[1], " ")[0]; ` + + `let newTimestamp = timestamp matches "(\\d{4})-(\\d{2})-(0\\d{1})T(\\d{2}):(\\d{2}):(\\d{2}).*" ` + + `? date(timestamp).Format("Jan 2 15:04:05") : date(timestamp).Format("Jan 02 15:04:05"); ` + + `split(body, ">")[0] + ">" + newTimestamp + " " + split(body, " ", 2)[1])'` ) type ( @@ -193,9 +203,9 @@ func (oc *Collector) Close(ctx context.Context) error { oc.service.Shutdown() oc.cancel() - settings := oc.config.Common + settings := oc.config.Client.Backoff settings.MaxElapsedTime = maxTimeToWaitForShutdown - err := backoff.WaitUntil(ctx, oc.config.Common, func() error { + err := backoff.WaitUntil(ctx, settings, func() error { if oc.service.GetState() == otelcol.StateClosed { return nil } @@ -243,7 +253,7 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa return } - reloadCollector := oc.checkForNewNginxReceivers(nginxConfigContext) + reloadCollector := oc.checkForNewReceivers(nginxConfigContext) if reloadCollector { slog.InfoContext(ctx, "Reloading OTel collector config") @@ -368,7 +378,7 @@ func (oc *Collector) restartCollector(ctx context.Context) { } } -func (oc *Collector) checkForNewNginxReceivers(nginxConfigContext *model.NginxConfigContext) bool { +func (oc *Collector) checkForNewReceivers(nginxConfigContext *model.NginxConfigContext) bool { nginxReceiverFound, reloadCollector := oc.updateExistingNginxPlusReceiver(nginxConfigContext) if !nginxReceiverFound && nginxConfigContext.PlusAPI.URL != "" { @@ -406,6 +416,11 @@ func (oc *Collector) checkForNewNginxReceivers(nginxConfigContext *model.NginxCo } } + tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext) + if tcplogReceiversFound { + reloadCollector = true + } + return reloadCollector } @@ -476,6 +491,110 @@ func (oc *Collector) updateExistingNginxOSSReceiver( return nginxReceiverFound, reloadCollector } +func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool { + newTcplogReceiverAdded := false + if nginxConfigContext.NAPSysLogServers != nil { + napLoop: + for _, napSysLogServer := range nginxConfigContext.NAPSysLogServers { + if oc.doesTcplogReceiverAlreadyExist(napSysLogServer) { + continue napLoop + } + + oc.config.Collector.Receivers.TcplogReceivers = append( + oc.config.Collector.Receivers.TcplogReceivers, + config.TcplogReceiver{ + ListenAddress: napSysLogServer, + Operators: []config.Operator{ + { + Type: "add", + Fields: map[string]string{ + "field": "body", + "value": timestampConversionExpression, + }, + }, + { + Type: "syslog_parser", + Fields: map[string]string{ + "protocol": "rfc3164", + }, + }, + { + Type: "remove", + Fields: map[string]string{ + "field": "attributes.message", + }, + }, + { + Type: "add", + Fields: map[string]string{ + "field": "resource[\"instance.id\"]", + "value": nginxConfigContext.InstanceID, + }, + }, + }, + }, + ) + + newTcplogReceiverAdded = true + } + } + + tcplogReceiverDeleted := oc.areNapReceiversDeleted(nginxConfigContext) + + return newTcplogReceiverAdded || tcplogReceiverDeleted +} + +func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool { + listenAddressesToBeDeleted := oc.getConfigDeletedNapReceivers(nginxConfigContext) + if len(listenAddressesToBeDeleted) != 0 { + oc.deleteNapReceivers(listenAddressesToBeDeleted) + return true + } + + return false +} + +func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bool) { + filteredReceivers := (oc.config.Collector.Receivers.TcplogReceivers)[:0] + for _, receiver := range oc.config.Collector.Receivers.TcplogReceivers { + if !listenAddressesToBeDeleted[receiver.ListenAddress] { + filteredReceivers = append(filteredReceivers, receiver) + } + } + oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers +} + +func (oc *Collector) getConfigDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool { + elements := make(map[string]bool) + + for _, tcplogReceiver := range oc.config.Collector.Receivers.TcplogReceivers { + elements[tcplogReceiver.ListenAddress] = true + } + + if nginxConfigContext.NAPSysLogServers != nil { + addressesToDelete := make(map[string]bool) + for _, napAddress := range nginxConfigContext.NAPSysLogServers { + if !elements[napAddress] { + addressesToDelete[napAddress] = true + } + } + + return addressesToDelete + } + + return elements +} + +func (oc *Collector) doesTcplogReceiverAlreadyExist(listenAddress string) bool { + for _, tcplogReceiver := range oc.config.Collector.Receivers.TcplogReceivers { + if listenAddress == tcplogReceiver.ListenAddress { + return true + } + } + + return false +} + // nolint: revive func (oc *Collector) updateResourceAttributes( attributesToAdd []config.ResourceAttribute, diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index cbe0c808a1..48db96981f 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -9,6 +9,7 @@ import ( "context" "errors" "fmt" + "path/filepath" "testing" "github.com/nginx/agent/v3/test/protos" @@ -26,6 +27,8 @@ import ( ) func TestCollector_New(t *testing.T) { + tmpDir := t.TempDir() + tests := []struct { config *config.Config expectedError error @@ -56,7 +59,7 @@ func TestCollector_New(t *testing.T) { name: "Successful initialization", config: &config.Config{ Collector: &config.Collector{ - Log: &config.Log{Path: "/tmp/test.log"}, + Log: &config.Log{Path: filepath.Join(tmpDir, "test.log")}, }, }, expectedError: nil, @@ -79,6 +82,8 @@ func TestCollector_New(t *testing.T) { } func TestCollector_Init(t *testing.T) { + tmpDir := t.TempDir() + tests := []struct { name string expectedLog string @@ -104,7 +109,7 @@ func TestCollector_Init(t *testing.T) { logBuf := &bytes.Buffer{} stub.StubLoggerWith(logBuf) - conf.Collector.Log = &config.Log{Path: "/tmp/test.log"} + conf.Collector.Log = &config.Log{Path: filepath.Join(tmpDir, "test.log")} if tt.expectedError { conf.Collector.Receivers = config.Receivers{} @@ -714,6 +719,60 @@ func TestCollector_updateResourceAttributes(t *testing.T) { } } +func TestCollector_updateTcplogReceivers(t *testing.T) { + conf := types.OTelConfig(t) + conf.Collector.Log.Path = "" + conf.Collector.Processors.Batch = nil + conf.Collector.Processors.Attribute = nil + conf.Collector.Processors.Resource = nil + + collector, err := New(conf) + require.NoError(t, err) + + nginxConfigContext := &model.NginxConfigContext{ + NAPSysLogServers: []string{ + "localhost:151", + }, + } + + assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) + + t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) { + tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext) + + assert.True(tt, tcplogReceiverAdded) + assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1) + assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) + assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + }) + + // Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice + // since there is already a receiver with the same ListenAddress + t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) { + tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext) + assert.False(t, tcplogReceiverAdded) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) + assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + }) + + t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) { + tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{}) + assert.True(t, tcplogReceiverDeleted) + assert.Empty(t, conf.Collector.Receivers.TcplogReceivers) + }) + + t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) { + tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{NAPSysLogServers: []string{ + "localhost:152", + }}) + assert.True(t, tcplogReceiverDeleted) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1) + assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress) + assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4) + }) +} + func createFakeCollector() *typesfakes.FakeCollectorInterface { fakeCollector := &typesfakes.FakeCollectorInterface{} fakeCollector.RunStub = func(ctx context.Context) error { return nil } diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index e91f11eb8d..35c7f88abb 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -80,6 +80,17 @@ receivers: location: "{{- .PlusAPI.Location -}}" collection_interval: 10s {{- end }} +{{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }} + tcplog/{{$index}}: + listen_address: "{{- .ListenAddress -}}" + operators: +{{- range $index, $operator := .Operators }} + - type: {{.Type}} +{{- range $key, $value := .Fields }} + {{$key}}: {{$value}} +{{- end }} +{{- end }} +{{- end }} processors: {{- if ne .Processors.Resource nil }} @@ -203,6 +214,7 @@ service: {{- end}} {{- end}} pipelines: + {{- if or (ne .Receivers.HostMetrics nil) (gt (len .Receivers.OtlpReceivers) 0) (gt (len .Receivers.NginxReceivers) 0) (gt (len .Receivers.NginxPlusReceivers) 0) }} metrics: receivers: {{- if ne .Receivers.HostMetrics nil }} @@ -241,3 +253,27 @@ service: {{- if ne .Exporters.Debug nil }} - debug {{- end }} + {{- end }} + {{- if gt (len .Receivers.TcplogReceivers) 0 }} + logs: + receivers: + {{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }} + - tcplog/{{$index}} + {{- end }} + processors: + {{- if ne .Processors.Resource nil }} + {{- if .Processors.Resource.Attributes }} + - resource + {{- end }} + {{- end }} + {{- if ne .Processors.Batch nil }} + - batch + {{- end }} + exporters: + {{- range $index, $otlpExporter := .Exporters.OtlpExporters }} + - otlp/{{$index}} + {{- end }} + {{- if ne .Exporters.Debug nil }} + - debug + {{- end }} + {{- end }} diff --git a/internal/collector/settings_test.go b/internal/collector/settings_test.go index e7946d7ff0..3d10890c7f 100644 --- a/internal/collector/settings_test.go +++ b/internal/collector/settings_test.go @@ -49,9 +49,20 @@ func TestConfigProviderSettings(t *testing.T) { } func TestTemplateWrite(t *testing.T) { + tmpDir := t.TempDir() + cfg := types.AgentConfig() - actualConfPath := filepath.Join("/tmp/", "nginx-agent-otelcol-test.yaml") + actualConfPath := filepath.Join(tmpDir, "nginx-agent-otelcol-test.yaml") cfg.Collector.ConfigPath = actualConfPath + cfg.Collector.Processors.Resource = &config.Resource{ + Attributes: []config.ResourceAttribute{ + { + Key: "resource.id", + Action: "add", + Value: "12345", + }, + }, + } cfg.Collector.Exporters.PrometheusExporter = &config.PrometheusExporter{ Server: &config.ServerConfig{ @@ -104,6 +115,27 @@ func TestTemplateWrite(t *testing.T) { }, }) + cfg.Collector.Receivers.TcplogReceivers = []config.TcplogReceiver{ + { + ListenAddress: "localhost:151", + Operators: []config.Operator{ + { + Type: "add", + Fields: map[string]string{ + "field": "body", + "value": `EXPR(split(body, ",")[0])`, + }, + }, + { + Type: "remove", + Fields: map[string]string{ + "field": "attributes.message", + }, + }, + }, + }, + } + cfg.Collector.Extensions.HeadersSetter = &config.HeadersSetter{ Headers: []config.Header{ { diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 5c991e76d2..bb7e92bf00 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -11,7 +11,6 @@ import ( "log/slog" "sync" "sync/atomic" - "time" "github.com/cenkalti/backoff/v4" @@ -29,20 +28,21 @@ import ( var _ commandService = (*CommandService)(nil) const ( - retryInterval = 5 * time.Second createConnectionMaxElapsedTime = 0 ) type ( CommandService struct { - commandServiceClient mpi.CommandServiceClient - subscribeClient mpi.CommandService_SubscribeClient - agentConfig *config.Config - isConnected *atomic.Bool - subscribeCancel context.CancelFunc - subscribeChannel chan *mpi.ManagementPlaneRequest - subscribeMutex sync.Mutex - subscribeClientMutex sync.Mutex + commandServiceClient mpi.CommandServiceClient + subscribeClient mpi.CommandService_SubscribeClient + agentConfig *config.Config + isConnected *atomic.Bool + subscribeCancel context.CancelFunc + subscribeChannel chan *mpi.ManagementPlaneRequest + configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID + subscribeMutex sync.Mutex + subscribeClientMutex sync.Mutex + configApplyRequestQueueMutex sync.Mutex } ) @@ -56,10 +56,11 @@ func NewCommandService( isConnected.Store(false) commandService := &CommandService{ - commandServiceClient: commandServiceClient, - agentConfig: agentConfig, - isConnected: isConnected, - subscribeChannel: subscribeChannel, + commandServiceClient: commandServiceClient, + agentConfig: agentConfig, + isConnected: isConnected, + subscribeChannel: subscribeChannel, + configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest), } var subscribeCtx context.Context @@ -95,7 +96,7 @@ func (cs *CommandService) UpdateDataPlaneStatus( Resource: resource, } - backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() sendDataPlaneStatus := func() (*mpi.UpdateDataPlaneStatusResponse, error) { @@ -119,7 +120,7 @@ func (cs *CommandService) UpdateDataPlaneStatus( response, err := backoff.RetryWithData( sendDataPlaneStatus, - backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), ) if err != nil { return err @@ -145,12 +146,12 @@ func (cs *CommandService) UpdateDataPlaneHealth(ctx context.Context, instanceHea InstanceHealths: instanceHealths, } - backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() response, err := backoff.RetryWithData( cs.dataPlaneHealthCallback(ctx, request), - backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), ) if err != nil { return err @@ -164,12 +165,17 @@ func (cs *CommandService) UpdateDataPlaneHealth(ctx context.Context, instanceHea func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error { slog.DebugContext(ctx, "Sending data plane response", "response", response) - backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() + err := cs.handleConfigApplyResponse(ctx, response) + if err != nil { + return err + } + return backoff.Retry( cs.sendDataPlaneResponseCallback(ctx, response), - backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), ) } @@ -184,12 +190,12 @@ func (cs *CommandService) CancelSubscription(ctx context.Context) { } func (cs *CommandService) subscribe(ctx context.Context) { - commonSettings := &config.CommonSettings{ - InitialInterval: cs.agentConfig.Common.InitialInterval, - MaxInterval: cs.agentConfig.Common.MaxInterval, + commonSettings := &config.BackOff{ + InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval, + MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval, MaxElapsedTime: createConnectionMaxElapsedTime, - RandomizationFactor: cs.agentConfig.Common.RandomizationFactor, - Multiplier: cs.agentConfig.Common.Multiplier, + RandomizationFactor: cs.agentConfig.Client.Backoff.RandomizationFactor, + Multiplier: cs.agentConfig.Client.Backoff.Multiplier, } for { @@ -223,12 +229,12 @@ func (cs *CommandService) CreateConnection( Resource: resource, } - commonSettings := &config.CommonSettings{ - InitialInterval: cs.agentConfig.Common.InitialInterval, - MaxInterval: cs.agentConfig.Common.MaxInterval, + commonSettings := &config.BackOff{ + InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval, + MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval, MaxElapsedTime: createConnectionMaxElapsedTime, - RandomizationFactor: cs.agentConfig.Common.RandomizationFactor, - Multiplier: cs.agentConfig.Common.Multiplier, + RandomizationFactor: cs.agentConfig.Client.Backoff.RandomizationFactor, + Multiplier: cs.agentConfig.Client.Backoff.Multiplier, } slog.DebugContext(ctx, "Sending create connection request", "request", request) @@ -273,6 +279,81 @@ func (cs *CommandService) sendDataPlaneResponseCallback( } } +func (cs *CommandService) handleConfigApplyResponse( + ctx context.Context, + response *mpi.DataPlaneResponse, +) error { + cs.configApplyRequestQueueMutex.Lock() + defer cs.configApplyRequestQueueMutex.Unlock() + + isConfigApplyResponse := false + var indexOfConfigApplyRequest int + + for index, configApplyRequest := range cs.configApplyRequestQueue[response.GetInstanceId()] { + if configApplyRequest.GetMessageMeta().GetCorrelationId() == response.GetMessageMeta().GetCorrelationId() { + indexOfConfigApplyRequest = index + isConfigApplyResponse = true + + break + } + } + + if isConfigApplyResponse { + err := cs.sendResponseForQueuedConfigApplyRequests(ctx, response, indexOfConfigApplyRequest) + if err != nil { + return err + } + } + + return nil +} + +func (cs *CommandService) sendResponseForQueuedConfigApplyRequests( + ctx context.Context, + response *mpi.DataPlaneResponse, + indexOfConfigApplyRequest int, +) error { + instanceID := response.GetInstanceId() + for i := 0; i < indexOfConfigApplyRequest; i++ { + newResponse := response + + newResponse.GetMessageMeta().MessageId = proto.GenerateMessageID() + + request := cs.configApplyRequestQueue[instanceID][i] + newResponse.GetMessageMeta().CorrelationId = request.GetMessageMeta().GetCorrelationId() + + slog.DebugContext( + ctx, + "Sending data plane response for queued config apply request", + "response", newResponse, + ) + + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) + + err := backoff.Retry( + cs.sendDataPlaneResponseCallback(ctx, newResponse), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), + ) + if err != nil { + slog.ErrorContext(ctx, "Failed to send data plane response", "error", err) + backoffCancel() + + return err + } + + backoffCancel() + } + + cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:] + slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID]) + + if len(cs.configApplyRequestQueue[instanceID]) > 0 { + cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1] + } + + return nil +} + // Retry callback for sending a data plane health status to the Management Plane. func (cs *CommandService) dataPlaneHealthCallback( ctx context.Context, @@ -335,7 +416,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { return recvError } - cs.subscribeChannel <- request + switch request.GetRequest().(type) { + case *mpi.ManagementPlaneRequest_ConfigApplyRequest: + cs.configApplyRequestQueueMutex.Lock() + defer cs.configApplyRequestQueueMutex.Unlock() + + instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId() + cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request) + if len(cs.configApplyRequestQueue[instanceID]) == 1 { + cs.subscribeChannel <- request + } else { + slog.DebugContext( + ctx, + "Config apply request is already in progress, queuing new config apply request", + "request", request, + ) + } + default: + cs.subscribeChannel <- request + } return nil } diff --git a/internal/command/command_service_test.go b/internal/command/command_service_test.go index 9534414dbd..fc7f58139b 100644 --- a/internal/command/command_service_test.go +++ b/internal/command/command_service_test.go @@ -10,9 +10,13 @@ import ( "context" "errors" "log/slog" + "sync" "testing" "time" + "github.com/google/uuid" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/test/helpers" "github.com/nginx/agent/v3/test/stub" @@ -37,9 +41,41 @@ func (*FakeSubscribeClient) Send(*mpi.DataPlaneResponse) error { // nolint: nilnil func (*FakeSubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) { + time.Sleep(1 * time.Second) + return nil, nil } +type FakeConfigApplySubscribeClient struct { + grpc.ClientStream +} + +func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error { + return nil +} + +// nolint: nilnil +func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) { + protos.CreateManagementPlaneRequest() + return &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "1", + CorrelationId: "123", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + }, nil +} + func TestCommandService_NewCommandService(t *testing.T) { ctx := context.Background() commandServiceClient := &v1fakes.FakeCommandServiceClient{} @@ -61,6 +97,46 @@ func TestCommandService_NewCommandService(t *testing.T) { ) } +func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) { + ctx := context.Background() + fakeSubscribeClient := &FakeConfigApplySubscribeClient{} + + commandServiceClient := &v1fakes.FakeCommandServiceClient{} + commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil) + + subscribeChannel := make(chan *mpi.ManagementPlaneRequest) + + commandService := NewCommandService( + ctx, + commandServiceClient, + types.AgentConfig(), + subscribeChannel, + ) + + defer commandService.CancelSubscription(ctx) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + requestFromChannel := <-subscribeChannel + assert.NotNil(t, requestFromChannel) + wg.Done() + }() + + assert.Eventually( + t, + func() bool { return commandServiceClient.SubscribeCallCount() > 0 }, + 2*time.Second, + 10*time.Millisecond, + ) + + commandService.configApplyRequestQueueMutex.Lock() + defer commandService.configApplyRequestQueueMutex.Unlock() + assert.Len(t, commandService.configApplyRequestQueue, 1) + wg.Wait() +} + func TestCommandService_UpdateDataPlaneStatus(t *testing.T) { ctx := context.Background() @@ -193,3 +269,123 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) { require.NoError(t, err) } + +func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) { + ctx := context.Background() + commandServiceClient := &v1fakes.FakeCommandServiceClient{} + subscribeClient := &FakeSubscribeClient{} + subscribeChannel := make(chan *mpi.ManagementPlaneRequest) + + commandService := NewCommandService( + ctx, + commandServiceClient, + types.AgentConfig(), + subscribeChannel, + ) + + defer commandService.CancelSubscription(ctx) + + request1 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "1", + CorrelationId: "123", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + request2 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "2", + CorrelationId: "1232", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + request3 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "3", + CorrelationId: "1233", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + commandService.configApplyRequestQueueMutex.Lock() + commandService.configApplyRequestQueue = map[string][]*mpi.ManagementPlaneRequest{ + "12314": { + request1, + request2, + request3, + }, + } + commandService.configApplyRequestQueueMutex.Unlock() + + commandService.subscribeClientMutex.Lock() + commandService.subscribeClient = subscribeClient + commandService.subscribeClientMutex.Unlock() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + requestFromChannel := <-subscribeChannel + assert.Equal(t, request3, requestFromChannel) + wg.Done() + }() + + err := commandService.SendDataPlaneResponse( + ctx, + &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: uuid.NewString(), + CorrelationId: "1232", + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Success", + }, + InstanceId: "12314", + }, + ) + + require.NoError(t, err) + + commandService.configApplyRequestQueueMutex.Lock() + defer commandService.configApplyRequestQueueMutex.Unlock() + assert.Len(t, commandService.configApplyRequestQueue, 1) + assert.Equal(t, request3, commandService.configApplyRequestQueue["12314"][0]) + wg.Wait() +} diff --git a/internal/config/config.go b/internal/config/config.go index 4bd76679ca..23d6f7a05d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,7 +14,6 @@ import ( "path/filepath" "slices" "strings" - "time" selfsignedcerts "github.com/nginx/agent/v3/pkg/tls" uuidLibrary "github.com/nginx/agent/v3/pkg/uuid" @@ -101,7 +100,6 @@ func ResolveConfig() (*Config, error) { AllowedDirectories: allowedDirs, Collector: collector, Command: resolveCommand(), - Common: resolveCommon(), Watchers: resolveWatchers(), Features: viperInstance.GetStringSlice(FeaturesKey), } @@ -155,7 +153,6 @@ func registerFlags() { "collection or error monitoring", ) - fs.Duration(ClientTimeoutKey, time.Minute, "Client timeout") fs.StringSlice(AllowedDirectoriesKey, DefaultAllowedDirectories(), "A comma-separated list of paths that you want to grant NGINX Agent read/write access to") @@ -178,24 +175,6 @@ func registerFlags() { "How often the NGINX Agent will check for file changes.", ) - fs.Int( - ClientMaxMessageSizeKey, - DefMaxMessageSize, - "The value used, if not 0, for both max_message_send_size and max_message_receive_size", - ) - - fs.Int( - ClientMaxMessageReceiveSizeKey, - DefMaxMessageRecieveSize, - "Updates the client grpc setting MaxRecvMsgSize with the specific value in MB.", - ) - - fs.Int( - ClientMaxMessageSendSizeKey, - DefMaxMessageSendSize, - "Updates the client grpc setting MaxSendMsgSize with the specific value in MB.", - ) - fs.StringSlice( FeaturesKey, DefaultFeatures(), @@ -204,6 +183,7 @@ func registerFlags() { registerCommandFlags(fs) registerCollectorFlags(fs) + registerClientFlags(fs) fs.SetNormalizeFunc(normalizeFunc) @@ -218,6 +198,76 @@ func registerFlags() { }) } +func registerClientFlags(fs *flag.FlagSet) { + // HTTP Flags + fs.Duration( + ClientHTTPTimeoutKey, + DefHTTPTimeout, + "The client HTTP Timeout, value in seconds") + + // Backoff Flags + fs.Duration( + ClientBackoffInitialIntervalKey, + DefBackoffInitialInterval, + "The client backoff initial interval, value in seconds") + + fs.Duration( + ClientBackoffMaxIntervalKey, + DefBackoffMaxInterval, + "The client backoff max interval, value in seconds") + + fs.Duration( + ClientBackoffMaxElapsedTimeKey, + DefBackoffMaxElapsedTime, + "The client backoff max elapsed time, value in seconds") + + fs.Float64( + ClientBackoffRandomizationFactorKey, + DefBackoffRandomizationFactor, + "The client backoff randomization factor, value float") + + fs.Float64( + ClientBackoffMultiplierKey, + DefBackoffMultiplier, + "The client backoff multiplier, value float") + + // GRPC Flags + fs.Duration( + ClientKeepAliveTimeoutKey, + DefGRPCKeepAliveTimeout, + "Updates the client grpc setting, KeepAlive Timeout with the specific value in seconds.", + ) + + fs.Duration( + ClientKeepAliveTimeKey, + DefGRPCKeepAliveTime, + "Updates the client grpc setting, KeepAlive Time with the specific value in seconds.", + ) + + fs.Bool( + ClientKeepAlivePermitWithoutStreamKey, + DefGRPCKeepAlivePermitWithoutStream, + "Update the client grpc setting, KeepAlive PermitWithoutStream value") + + fs.Int( + ClientGRPCMaxMessageSizeKey, + DefMaxMessageSize, + "The value used, if not 0, for both max_message_send_size and max_message_receive_size", + ) + + fs.Int( + ClientGRPCMaxMessageReceiveSizeKey, + DefMaxMessageRecieveSize, + "Updates the client grpc setting MaxRecvMsgSize with the specific value in MB.", + ) + + fs.Int( + ClientGRPCMaxMessageSendSizeKey, + DefMaxMessageSendSize, + "Updates the client grpc setting MaxSendMsgSize with the specific value in MB.", + ) +} + func registerCommandFlags(fs *flag.FlagSet) { fs.String( CommandServerHostKey, @@ -418,12 +468,26 @@ func resolveDataPlaneConfig() *DataPlaneConfig { func resolveClient() *Client { return &Client{ - Timeout: viperInstance.GetDuration(ClientTimeoutKey), - Time: viperInstance.GetDuration(ClientTimeKey), - PermitWithoutStream: viperInstance.GetBool(ClientPermitWithoutStreamKey), - MaxMessageSize: viperInstance.GetInt(ClientMaxMessageSizeKey), - MaxMessageRecieveSize: viperInstance.GetInt(ClientMaxMessageReceiveSizeKey), - MaxMessageSendSize: viperInstance.GetInt(ClientMaxMessageSendSizeKey), + HTTP: &HTTP{ + Timeout: viperInstance.GetDuration(ClientHTTPTimeoutKey), + }, + Grpc: &GRPC{ + KeepAlive: &KeepAlive{ + Timeout: viperInstance.GetDuration(ClientKeepAliveTimeoutKey), + Time: viperInstance.GetDuration(ClientKeepAliveTimeKey), + PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey), + }, + MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey), + MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey), + MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey), + }, + Backoff: &BackOff{ + InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey), + MaxInterval: viperInstance.GetDuration(ClientBackoffMaxIntervalKey), + MaxElapsedTime: viperInstance.GetDuration(ClientBackoffMaxElapsedTimeKey), + RandomizationFactor: viperInstance.GetFloat64(ClientBackoffRandomizationFactorKey), + Multiplier: viperInstance.GetFloat64(ClientBackoffMultiplierKey), + }, } } @@ -686,16 +750,6 @@ func arePrometheusExportTLSSettingsSet() bool { viperInstance.IsSet(CollectorPrometheusExporterTLSServerNameKey) } -func resolveCommon() *CommonSettings { - return &CommonSettings{ - InitialInterval: DefBackoffInitialInterval, - MaxInterval: DefBackoffMaxInterval, - MaxElapsedTime: DefBackoffMaxElapsedTime, - RandomizationFactor: DefBackoffRandomizationFactor, - Multiplier: DefBackoffMultiplier, - } -} - func resolveWatchers() *Watchers { return &Watchers{ InstanceWatcher: InstanceWatcher{ diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 9cfb9d36e6..70501d76ff 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -73,7 +73,23 @@ func TestResolveConfig(t *testing.T) { assert.NotEmpty(t, actual.Collector.Exporters) assert.NotEmpty(t, actual.Collector.Extensions) - assert.Equal(t, 10*time.Second, actual.Client.Timeout) + // Client GRPC Settings + assert.Equal(t, 15*time.Second, actual.Client.Grpc.KeepAlive.Timeout) + assert.Equal(t, 10*time.Second, actual.Client.Grpc.KeepAlive.Time) + assert.False(t, actual.Client.Grpc.KeepAlive.PermitWithoutStream) + assert.Equal(t, 1048575, actual.Client.Grpc.MaxMessageSize) + assert.Equal(t, 1048575, actual.Client.Grpc.MaxMessageReceiveSize) + assert.Equal(t, 1048575, actual.Client.Grpc.MaxMessageSendSize) + + // Client HTTP Settings + assert.Equal(t, 15*time.Second, actual.Client.HTTP.Timeout) + + // Client Backoff Settings + assert.Equal(t, 200*time.Millisecond, actual.Client.Backoff.InitialInterval) + assert.Equal(t, 10*time.Second, actual.Client.Backoff.MaxInterval) + assert.Equal(t, 25*time.Second, actual.Client.Backoff.MaxElapsedTime) + assert.InDelta(t, 1.5, actual.Client.Backoff.RandomizationFactor, 0.01) + assert.InDelta(t, 2.5, actual.Client.Backoff.Multiplier, 0.01) assert.Equal(t, allowedDir, @@ -101,12 +117,35 @@ func TestRegisterFlags(t *testing.T) { t.Setenv("NGINX_AGENT_PROCESS_MONITOR_MONITORING_FREQUENCY", "10s") t.Setenv("NGINX_AGENT_DATA_PLANE_API_HOST", "example.com") t.Setenv("NGINX_AGENT_DATA_PLANE_API_PORT", "9090") - t.Setenv("NGINX_AGENT_CLIENT_TIMEOUT", "10s") + t.Setenv("NGINX_AGENT_CLIENT_GRPC_KEEPALIVE_TIMEOUT", "10s") registerFlags() assert.Equal(t, "warn", viperInstance.GetString(LogLevelKey)) assert.Equal(t, "/var/log/test/agent.log", viperInstance.GetString(LogPathKey)) - assert.Equal(t, 10*time.Second, viperInstance.GetDuration(ClientTimeoutKey)) + assert.Equal(t, 10*time.Second, viperInstance.GetDuration(ClientKeepAliveTimeoutKey)) + + checkDefaultsClientValues(t, viperInstance) +} + +func checkDefaultsClientValues(t *testing.T, viperInstance *viper.Viper) { + t.Helper() + + assert.Equal(t, DefHTTPTimeout, viperInstance.GetDuration(ClientHTTPTimeoutKey)) + + assert.Equal(t, DefBackoffInitialInterval, viperInstance.GetDuration(ClientBackoffInitialIntervalKey)) + assert.Equal(t, DefBackoffMaxInterval, viperInstance.GetDuration(ClientBackoffMaxIntervalKey)) + assert.InDelta(t, DefBackoffRandomizationFactor, viperInstance.GetFloat64(ClientBackoffRandomizationFactorKey), + 0.01) + assert.InDelta(t, DefBackoffMultiplier, viperInstance.GetFloat64(ClientBackoffMultiplierKey), 0.01) + assert.Equal(t, DefBackoffMaxElapsedTime, viperInstance.GetDuration(ClientBackoffMaxElapsedTimeKey)) + + assert.Equal(t, DefGRPCKeepAliveTimeout, viperInstance.GetDuration(ClientKeepAliveTimeoutKey)) + assert.Equal(t, DefGRPCKeepAliveTime, viperInstance.GetDuration(ClientKeepAliveTimeKey)) + assert.Equal(t, DefGRPCKeepAlivePermitWithoutStream, viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey)) + + assert.Equal(t, DefMaxMessageSize, viperInstance.GetInt(ClientGRPCMaxMessageSizeKey)) + assert.Equal(t, DefMaxMessageRecieveSize, viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey)) + assert.Equal(t, DefMaxMessageSendSize, viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey)) } func TestSeekFileInPaths(t *testing.T) { @@ -140,7 +179,7 @@ func TestLoadPropertiesFromFile(t *testing.T) { assert.Equal(t, "debug", viperInstance.GetString(LogLevelKey)) assert.Equal(t, "./", viperInstance.GetString(LogPathKey)) - assert.Equal(t, 10*time.Second, viperInstance.GetDuration(ClientTimeoutKey)) + assert.Equal(t, 15*time.Second, viperInstance.GetDuration(ClientKeepAliveTimeoutKey)) err = loadPropertiesFromFile("./testdata/unknown.conf") require.Error(t, err) @@ -165,10 +204,10 @@ func TestResolveLog(t *testing.T) { func TestResolveClient(t *testing.T) { viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) - viperInstance.Set(ClientTimeoutKey, time.Hour) + viperInstance.Set(ClientKeepAliveTimeoutKey, time.Hour) result := resolveClient() - assert.Equal(t, time.Hour, result.Timeout) + assert.Equal(t, time.Hour, result.Grpc.KeepAlive.Timeout) } func TestResolveCollector(t *testing.T) { @@ -269,18 +308,26 @@ func TestClient(t *testing.T) { viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) expected := getAgentConfig().Client - viperInstance.Set(ClientMaxMessageSizeKey, expected.MaxMessageSize) - viperInstance.Set(ClientPermitWithoutStreamKey, expected.PermitWithoutStream) - viperInstance.Set(ClientTimeKey, expected.Time) - viperInstance.Set(ClientTimeoutKey, expected.Timeout) + viperInstance.Set(ClientGRPCMaxMessageSizeKey, expected.Grpc.MaxMessageSize) + viperInstance.Set(ClientKeepAlivePermitWithoutStreamKey, expected.Grpc.KeepAlive.PermitWithoutStream) + viperInstance.Set(ClientKeepAliveTimeKey, expected.Grpc.KeepAlive.Time) + viperInstance.Set(ClientKeepAliveTimeoutKey, expected.Grpc.KeepAlive.Timeout) + + viperInstance.Set(ClientHTTPTimeoutKey, expected.HTTP.Timeout) + + viperInstance.Set(ClientBackoffMaxIntervalKey, expected.Backoff.MaxInterval) + viperInstance.Set(ClientBackoffMultiplierKey, expected.Backoff.Multiplier) + viperInstance.Set(ClientBackoffMaxElapsedTimeKey, expected.Backoff.MaxElapsedTime) + viperInstance.Set(ClientBackoffInitialIntervalKey, expected.Backoff.InitialInterval) + viperInstance.Set(ClientBackoffRandomizationFactorKey, expected.Backoff.RandomizationFactor) // root keys for sections are set appropriately - assert.True(t, viperInstance.IsSet(ClientMaxMessageSizeKey)) - assert.False(t, viperInstance.IsSet(ClientMaxMessageReceiveSizeKey)) - assert.False(t, viperInstance.IsSet(ClientMaxMessageSendSizeKey)) + assert.True(t, viperInstance.IsSet(ClientGRPCMaxMessageSizeKey)) + assert.False(t, viperInstance.IsSet(ClientGRPCMaxMessageReceiveSizeKey)) + assert.False(t, viperInstance.IsSet(ClientGRPCMaxMessageSendSizeKey)) - viperInstance.Set(ClientMaxMessageReceiveSizeKey, expected.MaxMessageRecieveSize) - viperInstance.Set(ClientMaxMessageSendSizeKey, expected.MaxMessageSendSize) + viperInstance.Set(ClientGRPCMaxMessageReceiveSizeKey, expected.Grpc.MaxMessageReceiveSize) + viperInstance.Set(ClientGRPCMaxMessageSendSizeKey, expected.Grpc.MaxMessageSendSize) result := resolveClient() @@ -294,12 +341,26 @@ func getAgentConfig() *Config { Path: "", Log: &Log{}, Client: &Client{ - Timeout: 5 * time.Second, - Time: 4 * time.Second, - PermitWithoutStream: true, - MaxMessageSize: 1, - MaxMessageRecieveSize: 20, - MaxMessageSendSize: 40, + HTTP: &HTTP{ + Timeout: 10 * time.Second, + }, + Grpc: &GRPC{ + KeepAlive: &KeepAlive{ + Timeout: 5 * time.Second, + Time: 4 * time.Second, + PermitWithoutStream: true, + }, + MaxMessageSize: 1, + MaxMessageReceiveSize: 20, + MaxMessageSendSize: 40, + }, + Backoff: &BackOff{ + InitialInterval: 500 * time.Millisecond, + MaxInterval: 5 * time.Second, + MaxElapsedTime: 30 * time.Second, + RandomizationFactor: 0.5, + Multiplier: 1.5, + }, }, AllowedDirectories: []string{ "/etc/nginx", "/usr/local/etc/nginx", "/var/run/nginx", "/var/log/nginx", "/usr/share/nginx/modules", diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 526ea914c9..916c61f8af 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -26,16 +26,25 @@ const ( DefCommandTLSSkipVerifyKey = false DefCommandTLServerNameKey = "" + // Client GRPC Settings DefMaxMessageSize = 0 // 0 = unset DefMaxMessageRecieveSize = 4194304 // default 4 MB DefMaxMessageSendSize = math.MaxInt32 - // Backoff defaults - DefBackoffInitialInterval = 50 * time.Millisecond - DefBackoffRandomizationFactor = 0.1 // the value is 0 <= and < 1 + // Client HTTP Settings + DefHTTPTimeout = 10 * time.Second + + // Client GRPC Keep Alive Settings + DefGRPCKeepAliveTimeout = 10 * time.Second + DefGRPCKeepAliveTime = 20 * time.Second + DefGRPCKeepAlivePermitWithoutStream = true + + // Client Backoff defaults + DefBackoffInitialInterval = 500 * time.Millisecond + DefBackoffRandomizationFactor = 0.5 // the value is 0 <= and < 1 DefBackoffMultiplier = 1.5 - DefBackoffMaxInterval = 200 * time.Millisecond - DefBackoffMaxElapsedTime = 3 * time.Second + DefBackoffMaxInterval = 5 * time.Second + DefBackoffMaxElapsedTime = 30 * time.Second // Watcher defaults DefInstanceWatcherMonitoringFrequency = 5 * time.Second @@ -63,14 +72,6 @@ const ( DefCollectorExtensionsHealthTLSCAPath = "" DefCollectorExtensionsHealthTLSSkipVerify = false DefCollectorExtensionsHealthTLServerNameKey = "" - - DefCollectorPrometheusExporterServerHost = "" - DefCollectorPrometheusExporterServerPort = 0 - DefCollectorPrometheusExporterTLSCertPath = "" - DefCollectorPrometheusExporterTLSKeyPath = "" - DefCollectorPrometheusExporterTLSCAPath = "" - DefCollectorPrometheusExporterTLSSkipVerify = false - DefCollectorPrometheusExporterTLServerNameKey = "" ) func DefaultFeatures() []string { diff --git a/internal/config/flags.go b/internal/config/flags.go index 8cfa72b97a..33fcd04e1e 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -26,12 +26,22 @@ const ( var ( // child flags saved as vars to enable easier prefixing. - ClientPermitWithoutStreamKey = pre(ClientRootKey) + "permit_without_stream" - ClientTimeKey = pre(ClientRootKey) + "time" - ClientTimeoutKey = pre(ClientRootKey) + "timeout" - ClientMaxMessageSendSizeKey = pre(ClientRootKey) + "max_message_send_size" - ClientMaxMessageReceiveSizeKey = pre(ClientRootKey) + "max_message_receive_size" - ClientMaxMessageSizeKey = pre(ClientRootKey) + "max_message_size" + GrpcKeepAlive = pre(ClientRootKey) + "grpc_keepalive" + ClientKeepAlivePermitWithoutStreamKey = pre(GrpcKeepAlive) + "permit_without_stream" + ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time" + ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout" + + ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout" + ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size" + ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size" + ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size" + + ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval" + ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval" + ClientBackoffMaxElapsedTimeKey = pre(ClientRootKey) + "backoff_max_elapsed_time" + ClientBackoffRandomizationFactorKey = pre(ClientRootKey) + "backoff_randomization_factor" + ClientBackoffMultiplierKey = pre(ClientRootKey) + "backoff_multiplier" + CollectorConfigPathKey = pre(CollectorRootKey) + "config_path" CollectorExportersKey = pre(CollectorRootKey) + "exporters" CollectorAttributeProcessorKey = pre(CollectorProcessorsKey) + "attribute" diff --git a/internal/config/testdata/nginx-agent.conf b/internal/config/testdata/nginx-agent.conf index 70fa0aa3cb..e63ece84fb 100644 --- a/internal/config/testdata/nginx-agent.conf +++ b/internal/config/testdata/nginx-agent.conf @@ -17,9 +17,24 @@ data_plane_config: exclude_logs: - /var/log/nginx/error.log - /var/log/nginx/access.log -client: - timeout: 10s - +client: + http: + timeout: 15s + grpc: + keepalive: + timeout: 15s + time: 10s + permit_without_stream: false + max_message_size: 1048575 + max_message_receive_size: 1048575 + max_message_send_size: 1048575 + backoff: + initial_interval: 200ms + max_interval: 10s + max_elapsed_time: 25s + randomization_factor: 1.5 + multiplier: 2.5 + collector: config_path: "/etc/nginx-agent/nginx-agent-otelcol.yaml" receivers: diff --git a/internal/config/types.go b/internal/config/types.go index f532dad376..b02a0602dd 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -38,7 +38,6 @@ type ( Client *Client `yaml:"-" mapstructure:"client"` Collector *Collector `yaml:"-" mapstructure:"collector"` File *File `yaml:"-" mapstructure:"file"` - Common *CommonSettings `yaml:"-"` Watchers *Watchers `yaml:"-"` Version string `yaml:"-"` Path string `yaml:"-"` @@ -63,16 +62,38 @@ type ( } Client struct { - Timeout time.Duration `yaml:"-" mapstructure:"timeout"` - Time time.Duration `yaml:"-" mapstructure:"time"` - PermitWithoutStream bool `yaml:"-" mapstructure:"permit_without_stream"` + HTTP *HTTP `yaml:"http" mapstructure:"http"` + Grpc *GRPC `yaml:"grpc" mapstructure:"grpc"` + Backoff *BackOff `yaml:"backoff" mapstructure:"backoff"` + } + + HTTP struct { + Timeout time.Duration `yaml:"-" mapstructure:"timeout"` + } + + BackOff struct { + InitialInterval time.Duration `yaml:"-" mapstructure:"initial_interval"` + MaxInterval time.Duration `yaml:"-" mapstructure:"max_interval"` + MaxElapsedTime time.Duration `yaml:"-" mapstructure:"max_elapsed_time"` + RandomizationFactor float64 `yaml:"-" mapstructure:"randomization_factor"` + Multiplier float64 `yaml:"-" mapstructure:"multiplier"` + } + + GRPC struct { + KeepAlive *KeepAlive `yaml:"-" mapstructure:"target"` // if MaxMessageSize is size set then we use that value, // otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings MaxMessageSize int `yaml:"-" mapstructure:"max_message_size"` - MaxMessageRecieveSize int `yaml:"-" mapstructure:"max_message_receive_size"` + MaxMessageReceiveSize int `yaml:"-" mapstructure:"max_message_receive_size"` MaxMessageSendSize int `yaml:"-" mapstructure:"max_message_send_size"` } + KeepAlive struct { + Timeout time.Duration `yaml:"-" mapstructure:"timeout"` + Time time.Duration `yaml:"-" mapstructure:"time"` + PermitWithoutStream bool `yaml:"-" mapstructure:"permit_without_stream"` + } + Collector struct { ConfigPath string `yaml:"-" mapstructure:"config_path"` Log *Log `yaml:"-" mapstructure:"log"` @@ -164,6 +185,7 @@ type ( OtlpReceivers []OtlpReceiver `yaml:"-" mapstructure:"otlp_receivers"` NginxReceivers []NginxReceiver `yaml:"-" mapstructure:"nginx_receivers"` NginxPlusReceivers []NginxPlusReceiver `yaml:"-" mapstructure:"nginx_plus_receivers"` + TcplogReceivers []TcplogReceiver `yaml:"-" mapstructure:"tcplog_receivers"` } OtlpReceiver struct { @@ -172,6 +194,19 @@ type ( OtlpTLSConfig *OtlpTLSConfig `yaml:"-" mapstructure:"tls"` } + TcplogReceiver struct { + ListenAddress string `yaml:"-" mapstructure:"listen_address"` + Operators []Operator `yaml:"-" mapstructure:"operators"` + } + + // There are many types of operators with different field names so we use a generic map to store the fields. + // See here for more info: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/docs/operators/README.md + Operator struct { + Fields map[string]string `yaml:"-" mapstructure:"fields"` + Type string `yaml:"-" mapstructure:"type"` + } + NginxReceiver struct { InstanceID string `yaml:"-" mapstructure:"instance_id"` StubStatus APIDetails `yaml:"-" mapstructure:"api_details"` @@ -213,13 +248,6 @@ type ( MemoryScraper struct{} NetworkScraper struct{} - GRPC struct { - Target string `yaml:"-" mapstructure:"target"` - ConnTimeout time.Duration `yaml:"-" mapstructure:"connection_timeout"` - MinConnTimeout time.Duration `yaml:"-" mapstructure:"minimum_connection_timeout"` - BackoffDelay time.Duration `yaml:"-" mapstructure:"backoff_delay"` - } - Command struct { Server *ServerConfig `yaml:"-" mapstructure:"server"` Auth *AuthConfig `yaml:"-" mapstructure:"auth"` @@ -259,14 +287,6 @@ type ( Location string `yaml:"-" mapstructure:"location"` } - CommonSettings struct { - InitialInterval time.Duration `yaml:"-" mapstructure:"initial_interval"` - MaxInterval time.Duration `yaml:"-" mapstructure:"max_interval"` - MaxElapsedTime time.Duration `yaml:"-" mapstructure:"max_elapsed_time"` - RandomizationFactor float64 `yaml:"-" mapstructure:"randomization_factor"` - Multiplier float64 `yaml:"-" mapstructure:"multiplier"` - } - Watchers struct { InstanceWatcher InstanceWatcher `yaml:"-" mapstructure:"instance_watcher"` InstanceHealthWatcher InstanceHealthWatcher `yaml:"-" mapstructure:"instance_health_watcher"` @@ -352,9 +372,14 @@ func (c *Config) AreReceiversConfigured() bool { } return c.Collector.Receivers.NginxPlusReceivers != nil || + len(c.Collector.Receivers.NginxPlusReceivers) > 0 || c.Collector.Receivers.OtlpReceivers != nil || + len(c.Collector.Receivers.OtlpReceivers) > 0 || c.Collector.Receivers.NginxReceivers != nil || - c.Collector.Receivers.HostMetrics != nil + len(c.Collector.Receivers.NginxReceivers) > 0 || + c.Collector.Receivers.HostMetrics != nil || + c.Collector.Receivers.TcplogReceivers != nil || + len(c.Collector.Receivers.TcplogReceivers) > 0 } func isAllowedDir(dir string, allowedDirs []string) bool { diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index af9f782b76..6d9bb1114d 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -117,7 +117,7 @@ func (fms *FileManagerService) UpdateOverview( }, } - backOffCtx, backoffCancel := context.WithTimeout(newCtx, fms.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(newCtx, fms.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() sendUpdateOverview := func() (*mpi.UpdateOverviewResponse, error) { @@ -146,7 +146,7 @@ func (fms *FileManagerService) UpdateOverview( response, err := backoff.RetryWithData( sendUpdateOverview, - backoffHelpers.Context(backOffCtx, fms.agentConfig.Common), + backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff), ) if err != nil { return err @@ -230,7 +230,7 @@ func (fms *FileManagerService) UpdateFile( }, } - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() sendUpdateFile := func() (*mpi.UpdateFileResponse, error) { @@ -256,7 +256,8 @@ func (fms *FileManagerService) UpdateFile( return response, nil } - response, err := backoff.RetryWithData(sendUpdateFile, backoffHelpers.Context(backOffCtx, fms.agentConfig.Common)) + response, err := backoff.RetryWithData(sendUpdateFile, backoffHelpers.Context(backOffCtx, + fms.agentConfig.Client.Backoff)) if err != nil { return err } @@ -375,7 +376,7 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) error { } func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error { - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() getFile := func() (*mpi.GetFileResponse, error) { @@ -391,7 +392,7 @@ func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) e getFileResp, getFileErr := backoff.RetryWithData( getFile, - backoffHelpers.Context(backOffCtx, fms.agentConfig.Common), + backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff), ) if getFileErr != nil { diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 4a558cad66..513ccd2497 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -178,21 +178,21 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp sendRecOpts := []grpc.DialOption{} if agentConfig.Client != nil { - if agentConfig.Client.MaxMessageSize != 0 { + if agentConfig.Client.Grpc.MaxMessageSize != 0 { sendRecOpts = append(sendRecOpts, grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(agentConfig.Client.MaxMessageSize), - grpc.MaxCallSendMsgSize(agentConfig.Client.MaxMessageSize), + grpc.MaxCallRecvMsgSize(agentConfig.Client.Grpc.MaxMessageSize), + grpc.MaxCallSendMsgSize(agentConfig.Client.Grpc.MaxMessageSize), )) } else { sendRecOpts = append(sendRecOpts, grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(agentConfig.Client.MaxMessageRecieveSize), - grpc.MaxCallSendMsgSize(agentConfig.Client.MaxMessageSendSize), + grpc.MaxCallRecvMsgSize(agentConfig.Client.Grpc.MaxMessageReceiveSize), + grpc.MaxCallSendMsgSize(agentConfig.Client.Grpc.MaxMessageSendSize), )) } keepAlive := keepalive.ClientParameters{ - Time: agentConfig.Client.Time, - Timeout: agentConfig.Client.Timeout, - PermitWithoutStream: agentConfig.Client.PermitWithoutStream, + Time: agentConfig.Client.Grpc.KeepAlive.Time, + Timeout: agentConfig.Client.Grpc.KeepAlive.Timeout, + PermitWithoutStream: agentConfig.Client.Grpc.KeepAlive.PermitWithoutStream, } sendRecOpts = append(sendRecOpts, diff --git a/internal/model/config.go b/internal/model/config.go index 15f0bbfbdb..8307940283 100644 --- a/internal/model/config.go +++ b/internal/model/config.go @@ -12,12 +12,13 @@ import ( ) type NginxConfigContext struct { - StubStatus *APIDetails - PlusAPI *APIDetails - InstanceID string - Files []*v1.File - AccessLogs []*AccessLog - ErrorLogs []*ErrorLog + StubStatus *APIDetails + PlusAPI *APIDetails + InstanceID string + Files []*v1.File + AccessLogs []*AccessLog + ErrorLogs []*ErrorLog + NAPSysLogServers []string } type APIDetails struct { @@ -57,6 +58,10 @@ func (ncc *NginxConfigContext) Equal(otherNginxConfigContext *NginxConfigContext return false } + if !reflect.DeepEqual(ncc.NAPSysLogServers, otherNginxConfigContext.NAPSysLogServers) { + return false + } + return true } diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index ed130052ac..1687bb3f07 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -337,7 +337,7 @@ func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { upstreams []client.UpstreamServer }{ { - name: "Test 1: Success, Update Http Upstream Servers", + name: "Test 1: Success, Update HTTP Upstream Servers", message: &bus.Message{ Topic: bus.APIActionRequestTopic, Data: protos.CreatAPIActionRequestNginxPlusUpdateHTTPServers("test_upstream", @@ -361,7 +361,7 @@ func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { expectedLog: "Successfully updated http upstream", }, { - name: "Test 2: Fail, Update Http Upstream Servers", + name: "Test 2: Fail, Update HTTP Upstream Servers", message: &bus.Message{ Topic: bus.APIActionRequestTopic, Data: protos.CreatAPIActionRequestNginxPlusUpdateHTTPServers("test_upstream", diff --git a/internal/watcher/instance/nginx_config_parser.go b/internal/watcher/instance/nginx_config_parser.go index 34beb8a97c..505c41098e 100644 --- a/internal/watcher/instance/nginx_config_parser.go +++ b/internal/watcher/instance/nginx_config_parser.go @@ -16,6 +16,7 @@ import ( "net/http" "os" "path/filepath" + "regexp" "slices" "strconv" "strings" @@ -93,6 +94,8 @@ func (ncp *NginxConfigParser) createNginxConfigContext( instance *mpi.Instance, payload *crossplane.Payload, ) (*model.NginxConfigContext, error) { + napSyslogServersFound := make(map[string]bool) + nginxConfigContext := &model.NginxConfigContext{ InstanceID: instance.GetInstanceMeta().GetInstanceId(), PlusAPI: &model.APIDetails{ @@ -133,6 +136,23 @@ func (ncp *NginxConfigParser) createNginxConfigContext( case "ssl_certificate", "proxy_ssl_certificate", "ssl_client_certificate", "ssl_trusted_certificate": sslCertFile := ncp.sslCert(ctx, directive.Args[0], rootDir) nginxConfigContext.Files = append(nginxConfigContext.Files, sslCertFile) + case "app_protect_security_log": + if len(directive.Args) > 1 { + syslogArg := directive.Args[1] + re := regexp.MustCompile(`syslog:server=([\S]+)`) + matches := re.FindStringSubmatch(syslogArg) + if len(matches) > 1 { + syslogServer := matches[1] + if !napSyslogServersFound[syslogServer] { + nginxConfigContext.NAPSysLogServers = append( + nginxConfigContext.NAPSysLogServers, + syslogServer, + ) + napSyslogServersFound[syslogServer] = true + slog.DebugContext(ctx, "Found NAP syslog server", "address", syslogServer) + } + } + } } return nil @@ -462,7 +482,7 @@ func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDeta if strings.HasPrefix(listen, "unix:") { httpClient = ncp.SocketClient(strings.TrimPrefix(listen, "unix:")) } else { - httpClient = http.Client{Timeout: ncp.agentConfig.Client.Timeout} + httpClient = http.Client{Timeout: ncp.agentConfig.Client.Grpc.KeepAlive.Timeout} } req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusAPI, nil) if err != nil { @@ -654,7 +674,7 @@ func (ncp *NginxConfigParser) isPort(value string) bool { func (ncp *NginxConfigParser) SocketClient(socketPath string) http.Client { return http.Client{ - Timeout: ncp.agentConfig.Client.Timeout, + Timeout: ncp.agentConfig.Client.Grpc.KeepAlive.Timeout, Transport: &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return net.Dial("unix", socketPath) diff --git a/internal/watcher/instance/nginx_config_parser_test.go b/internal/watcher/instance/nginx_config_parser_test.go index b37b4b3cc4..2c5f632d8e 100644 --- a/internal/watcher/instance/nginx_config_parser_test.go +++ b/internal/watcher/instance/nginx_config_parser_test.go @@ -294,16 +294,19 @@ func TestNginxConfigParser_Parse(t *testing.T) { require.NoError(t, err) tests := []struct { - instance *mpi.Instance - name string + instance *mpi.Instance + name string + syslogServers []string }{ { - name: "Test 1: Valid response", - instance: protos.GetNginxOssInstance([]string{}), + name: "Test 1: Valid response", + instance: protos.GetNginxOssInstance([]string{}), + syslogServers: []string{"127.0.0.1:1515"}, }, { - name: "Test 2: Error response", - instance: protos.GetNginxPlusInstance([]string{}), + name: "Test 2: Error response", + instance: protos.GetNginxPlusInstance([]string{}), + syslogServers: []string{"127.0.0.1:1515"}, }, } @@ -315,7 +318,9 @@ func TestNginxConfigParser_Parse(t *testing.T) { ltsvAccessLog.Name(), errorLog.Name(), test.instance.GetInstanceMeta().GetInstanceId(), + test.syslogServers, ) + expectedConfigContext.Files = append(expectedConfigContext.Files, &mpi.File{ FileMeta: fileMeta, }) diff --git a/test/config/collector/test-opentelemetry-collector-agent.yaml b/test/config/collector/test-opentelemetry-collector-agent.yaml index 9b103b67f1..7181cfc68b 100644 --- a/test/config/collector/test-opentelemetry-collector-agent.yaml +++ b/test/config/collector/test-opentelemetry-collector-agent.yaml @@ -25,8 +25,21 @@ receivers: access_logs: - log_format: "$remote_addr - $remote_user [$time_local] \"$request\" $status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\"\"$upstream_cache_status\"" file_path: "/var/log/nginx/access-custom.conf" + tcplog/0: + listen_address: "localhost:151" + operators: + - type: add + field: body + value: EXPR(split(body, ",")[0]) + - type: remove + field: attributes.message processors: + resource: + attributes: + - key: resource.id + action: add + value: 12345 batch: send_batch_size: 8192 timeout: 200ms @@ -82,8 +95,18 @@ service: - otlp/0 - nginx/123 processors: + - resource - batch exporters: - otlp/0 - prometheus - debug + logs: + receivers: + - tcplog/0 + processors: + - resource + - batch + exporters: + - otlp/0 + - debug diff --git a/test/config/nginx/nginx-with-multiple-access-logs.conf b/test/config/nginx/nginx-with-multiple-access-logs.conf index 83582169a6..4d22a456e9 100644 --- a/test/config/nginx/nginx-with-multiple-access-logs.conf +++ b/test/config/nginx/nginx-with-multiple-access-logs.conf @@ -4,6 +4,7 @@ worker_processes auto; error_log %s notice; pid /var/run/nginx.pid; +load_module modules/ngx_http_app_protect_module.so; events { worker_connections 1024; @@ -33,5 +34,7 @@ http { server { access_log %s ltsv; + + app_protect_security_log "/etc/app_protect/conf/log_default.json" syslog:server=127.0.0.1:1515; } } diff --git a/test/mock/collector/otel-collector.yaml b/test/mock/collector/otel-collector.yaml index 1fb86a2560..b1e801a9cc 100644 --- a/test/mock/collector/otel-collector.yaml +++ b/test/mock/collector/otel-collector.yaml @@ -11,6 +11,10 @@ exporters: resource_to_telemetry_conversion: enabled: true add_metric_suffixes: false + debug: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 200 processors: batch: @@ -28,3 +32,7 @@ service: receivers: [otlp] processors: [batch] exporters: [prometheus] + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] diff --git a/test/mock/grpc/cmd/main.go b/test/mock/grpc/cmd/main.go index 620b869f3a..1b0fcc7615 100644 --- a/test/mock/grpc/cmd/main.go +++ b/test/mock/grpc/cmd/main.go @@ -54,9 +54,9 @@ func main() { agentConfig.Command.Server.Port = portInt agentConfig.Command.Auth = nil agentConfig.Command.TLS = nil - agentConfig.Common.MaxElapsedTime = *sleepDuration - agentConfig.Client.MaxMessageRecieveSize = 4194304 - agentConfig.Client.MaxMessageSendSize = math.MaxInt + agentConfig.Client.Backoff.MaxElapsedTime = *sleepDuration + agentConfig.Client.Grpc.MaxMessageReceiveSize = 4194304 + agentConfig.Client.Grpc.MaxMessageSendSize = math.MaxInt newLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ Level: logger.GetLogLevel(*logLevel), diff --git a/test/mock/grpc/mock_management_server.go b/test/mock/grpc/mock_management_server.go index 45ada71c64..8459c71da1 100644 --- a/test/mock/grpc/mock_management_server.go +++ b/test/mock/grpc/mock_management_server.go @@ -158,15 +158,14 @@ func getServerOptions(agentConfig *config.Config) []grpc.ServerOption { } if agentConfig.Client != nil { - if agentConfig.Client.MaxMessageSize != 0 { - slog.Info("grpc MaxMessageSize") - opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.MaxMessageSize), - grpc.MaxRecvMsgSize(agentConfig.Client.MaxMessageSize), + if agentConfig.Client.Grpc.MaxMessageSize != 0 { + opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.Grpc.MaxMessageSize), + grpc.MaxRecvMsgSize(agentConfig.Client.Grpc.MaxMessageSize), ) } else { // both are defulted to math.MaxInt for ServerOption - opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.MaxMessageSendSize), - grpc.MaxRecvMsgSize(agentConfig.Client.MaxMessageRecieveSize), + opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.Grpc.MaxMessageSendSize), + grpc.MaxRecvMsgSize(agentConfig.Client.Grpc.MaxMessageReceiveSize), ) } } @@ -217,10 +216,10 @@ func createListener(apiAddress string, agentConfig *config.Config) (net.Listener func reportHealth(healthcheck *health.Server, agentConfig *config.Config) { var sleep time.Duration var serverName string - if agentConfig.Common == nil { + if agentConfig.Client.Backoff == nil { sleep = maxElapsedTime } else { - sleep = agentConfig.Common.MaxElapsedTime + sleep = agentConfig.Client.Backoff.MaxElapsedTime } if agentConfig.Command.TLS == nil { diff --git a/test/model/config.go b/test/model/config.go index e20759e7db..6aca42b449 100644 --- a/test/model/config.go +++ b/test/model/config.go @@ -19,12 +19,14 @@ func GetConfigContext() *model.NginxConfigContext { } } +// nolint: revive func GetConfigContextWithNames( accessLogName, combinedAccessLogName, ltsvAccessLogName, errorLogName string, instanceID string, + syslogServers []string, ) *model.NginxConfigContext { return &model.NginxConfigContext{ StubStatus: &model.APIDetails{ @@ -61,6 +63,7 @@ func GetConfigContextWithNames( Permissions: "0600", }, }, - InstanceID: instanceID, + InstanceID: instanceID, + NAPSysLogServers: syslogServers, } } diff --git a/test/types/config.go b/test/types/config.go index 681aa0cc02..60ec80e37b 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -20,6 +20,8 @@ const ( clientTime = 50 * time.Second clientTimeout = 5 * time.Second + clientHTTPTimeout = 5 * time.Second + commonInitialInterval = 100 * time.Microsecond commonMaxInterval = 1000 * time.Microsecond commonMaxElapsedTime = 10 * time.Millisecond @@ -37,9 +39,23 @@ func AgentConfig() *config.Config { Path: "/etc/nginx-agent", Log: &config.Log{}, Client: &config.Client{ - Timeout: clientTimeout, - Time: clientTime, - PermitWithoutStream: clientPermitWithoutStream, + HTTP: &config.HTTP{ + Timeout: clientHTTPTimeout, + }, + Grpc: &config.GRPC{ + KeepAlive: &config.KeepAlive{ + Timeout: clientTimeout, + Time: clientTime, + PermitWithoutStream: clientPermitWithoutStream, + }, + }, + Backoff: &config.BackOff{ + InitialInterval: commonInitialInterval, + MaxInterval: commonMaxInterval, + MaxElapsedTime: commonMaxElapsedTime, + RandomizationFactor: commonRandomizationFactor, + Multiplier: commonMultiplier, + }, }, AllowedDirectories: []string{"/tmp/"}, Collector: &config.Collector{ @@ -128,13 +144,6 @@ func AgentConfig() *config.Config { }, }, File: &config.File{}, - Common: &config.CommonSettings{ - InitialInterval: commonInitialInterval, - MaxInterval: commonMaxInterval, - MaxElapsedTime: commonMaxElapsedTime, - RandomizationFactor: commonRandomizationFactor, - Multiplier: commonMultiplier, - }, DataPlaneConfig: &config.DataPlaneConfig{ Nginx: &config.NginxDataPlaneConfig{ TreatWarningsAsErrors: true,