diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index de2f49d832..b91879166d 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -43,6 +43,9 @@ receivers: processors: {{- if ne .Processors.Batch nil }} batch: + send_batch_size: {{ .Processors.Batch.SendBatchSize }} + timeout: {{ .Processors.Batch.Timeout }} + send_batch_max_size: {{ .Processors.Batch.SendBatchMaxSize }} {{- end }} exporters: diff --git a/internal/config/config.go b/internal/config/config.go index 66e7806a85..e1f170a728 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -209,27 +209,6 @@ func registerFlags() { "How often the NGINX Agent will check for file changes.", ) - fs.String( - CollectorConfigPathKey, - DefCollectorConfigPath, - "The path to the Opentelemetry Collector configuration file.", - ) - - fs.String( - CollectorLogLevelKey, - DefCollectorLogLevel, - `The desired verbosity level for logging messages from nginx-agent OTel collector. - Available options, in order of severity from highest to lowest, are: - ERROR, WARN, INFO and DEBUG.`, - ) - - fs.String( - CollectorLogPathKey, - DefCollectorLogPath, - `The path to output OTel collector log messages to. - If the default path doesn't exist, log messages are output to stdout/stderr.`, - ) - fs.Int( ClientMaxMessageSizeKey, DefMaxMessageSize, @@ -248,6 +227,8 @@ func registerFlags() { "Updates the client grpc setting MaxSendMsgSize with the specific value in MB.", ) + registerCollectorFlags(fs) + fs.SetNormalizeFunc(normalizeFunc) fs.VisitAll(func(flag *flag.Flag) { @@ -261,6 +242,47 @@ func registerFlags() { }) } +func registerCollectorFlags(fs *flag.FlagSet) { + fs.String( + CollectorConfigPathKey, + DefCollectorConfigPath, + "The path to the Opentelemetry Collector configuration file.", + ) + + fs.String( + CollectorLogLevelKey, + DefCollectorLogLevel, + `The desired verbosity level for logging messages from nginx-agent OTel collector. + Available options, in order of severity from highest to lowest, are: + ERROR, WARN, INFO and DEBUG.`, + ) + + fs.String( + CollectorLogPathKey, + DefCollectorLogPath, + `The path to output OTel collector log messages to. + If the default path doesn't exist, log messages are output to stdout/stderr.`, + ) + + fs.Uint32( + CollectorBatchProcessorSendBatchSizeKey, + DefCollectorBatchProcessorSendBatchSize, + `Number of metric data points after which a batch will be sent regardless of the timeout.`, + ) + + fs.Uint32( + CollectorBatchProcessorSendBatchMaxSizeKey, + DefCollectorBatchProcessorSendBatchMaxSize, + `The upper limit of the batch size.`, + ) + + fs.Duration( + CollectorBatchProcessorTimeoutKey, + DefCollectorBatchProcessorTimeout, + `Time duration after which a batch will be sent regardless of size.`, + ) +} + func seekFileInPaths(fileName string, directories ...string) (string, error) { for _, directory := range directories { f := filepath.Join(directory, fileName) @@ -346,7 +368,6 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { var ( err error exporters Exporters - processors Processors receivers Receivers healthCheck ServerConfig log Log @@ -355,7 +376,6 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { err = errors.Join( err, resolveMapStructure(CollectorExportersKey, &exporters), - resolveMapStructure(CollectorProcessorsKey, &processors), resolveMapStructure(CollectorReceiversKey, &receivers), resolveMapStructure(CollectorHealthKey, &healthCheck), resolveMapStructure(CollectorLogKey, &log), @@ -375,7 +395,7 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { col := &Collector{ ConfigPath: viperInstance.GetString(CollectorConfigPathKey), Exporters: exporters, - Processors: processors, + Processors: resolveProcessors(), Receivers: receivers, Health: &healthCheck, Log: &log, @@ -389,6 +409,19 @@ func resolveCollector(allowedDirs []string) (*Collector, error) { return col, nil } +func resolveProcessors() Processors { + processors := Processors{} + + if viperInstance.IsSet(CollectorBatchProcessorKey) { + processors.Batch = &Batch{} + processors.Batch.SendBatchSize = viperInstance.GetUint32(CollectorBatchProcessorSendBatchSizeKey) + processors.Batch.SendBatchMaxSize = viperInstance.GetUint32(CollectorBatchProcessorSendBatchMaxSizeKey) + processors.Batch.Timeout = viperInstance.GetDuration(CollectorBatchProcessorTimeoutKey) + } + + return processors +} + func resolveCommand() *Command { if !viperInstance.IsSet(CommandRootKey) { return nil diff --git a/internal/config/config_test.go b/internal/config/config_test.go index b5adb2ecbd..f7a4363453 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -171,46 +171,40 @@ func TestResolveClient(t *testing.T) { func TestResolveCollector(t *testing.T) { testDefault := getAgentConfig() - tests := []struct { - expected *Collector - name string - errMsg string - shouldErr bool - }{ - { - name: "Test 1: Happy path", - expected: testDefault.Collector, - }, - { - name: "Test 2: Non allowed path", - expected: &Collector{ - ConfigPath: "/path/to/secret", - }, - shouldErr: true, - errMsg: "collector path /path/to/secret not allowed", - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) - viperInstance.Set(CollectorConfigPathKey, test.expected.ConfigPath) - viperInstance.Set(CollectorReceiversKey, test.expected.Receivers) - viperInstance.Set(CollectorProcessorsKey, test.expected.Processors) - viperInstance.Set(CollectorExportersKey, test.expected.Exporters) - viperInstance.Set(CollectorHealthKey, test.expected.Health) - viperInstance.Set(CollectorLogKey, test.expected.Log) - - actual, err := resolveCollector(testDefault.AllowedDirectories) - if test.shouldErr { - require.Error(t, err) - assert.Contains(t, err.Error(), test.errMsg) - } else { - require.NoError(t, err) - assert.Equal(t, test.expected, actual) - } - }) - } + t.Run("Test 1: Happy path", func(t *testing.T) { + expected := testDefault.Collector + + viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) + viperInstance.Set(CollectorConfigPathKey, expected.ConfigPath) + viperInstance.Set(CollectorReceiversKey, expected.Receivers) + viperInstance.Set(CollectorBatchProcessorKey, expected.Processors.Batch) + viperInstance.Set(CollectorBatchProcessorSendBatchSizeKey, expected.Processors.Batch.SendBatchSize) + viperInstance.Set(CollectorBatchProcessorSendBatchMaxSizeKey, expected.Processors.Batch.SendBatchMaxSize) + viperInstance.Set(CollectorBatchProcessorTimeoutKey, expected.Processors.Batch.Timeout) + viperInstance.Set(CollectorExportersKey, expected.Exporters) + viperInstance.Set(CollectorHealthKey, expected.Health) + viperInstance.Set(CollectorLogKey, expected.Log) + + actual, err := resolveCollector(testDefault.AllowedDirectories) + require.NoError(t, err) + assert.Equal(t, expected, actual) + }) + + t.Run("Test 2: Non allowed path", func(t *testing.T) { + expected := &Collector{ + ConfigPath: "/path/to/secret", + } + errMsg := "collector path /path/to/secret not allowed" + + viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) + viperInstance.Set(CollectorConfigPathKey, expected.ConfigPath) + + _, err := resolveCollector(testDefault.AllowedDirectories) + + require.Error(t, err) + assert.Contains(t, err.Error(), errMsg) + }) } func TestCommand(t *testing.T) { @@ -330,7 +324,11 @@ func getAgentConfig() *Config { }, }, Processors: Processors{ - Batch: &Batch{}, + Batch: &Batch{ + SendBatchMaxSize: DefCollectorBatchProcessorSendBatchMaxSize, + SendBatchSize: DefCollectorBatchProcessorSendBatchSize, + Timeout: DefCollectorBatchProcessorTimeout, + }, }, Receivers: Receivers{ OtlpReceivers: []OtlpReceiver{ diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 0b9aedf310..b1a66f7d0f 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -45,4 +45,8 @@ const ( DefMaxMessageRecieveSize = 4194304 // math.MaxInt32 DefMaxMessageSendSize = math.MaxInt32 + + DefCollectorBatchProcessorSendBatchSize = 8192 + DefCollectorBatchProcessorSendBatchMaxSize = 0 + DefCollectorBatchProcessorTimeout = 200 * time.Millisecond ) diff --git a/internal/config/flags.go b/internal/config/flags.go index 83f132c0cb..a8dd803d9c 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -25,38 +25,40 @@ const ( var ( // child flags saved as vars to enable easier prefixing. - ClientPermitWithoutStreamKey = pre(ClientRootKey) + "permit_without_stream" - ClientTimeKey = pre(ClientRootKey) + "time" - ClientTimeoutKey = pre(ClientRootKey) + "timeout" - ClientMaxMessageSendSizeKey = pre(ClientRootKey) + "max_message_send_size" - ClientMaxMessageRecieveSizeKey = pre(ClientRootKey) + "max_message_receive_size" - ClientMaxMessageSizeKey = pre(ClientRootKey) + "max_message_size" - CollectorConfigPathKey = pre(CollectorRootKey) + "config_path" - CollectorExportersKey = pre(CollectorRootKey) + "exporters" - CollectorProcessorsKey = pre(CollectorRootKey) + "processors" - CollectorHealthKey = pre(CollectorRootKey) + "health" - CollectorReceiversKey = pre(CollectorRootKey) + "receivers" - CollectorLogKey = pre(CollectorRootKey) + "log" - CollectorLogLevelKey = pre(CollectorLogKey) + "level" - CollectorLogPathKey = pre(CollectorLogKey) + "path" - CommandAuthKey = pre(CommandRootKey) + "auth" - CommandAuthTokenKey = pre(CommandAuthKey) + "token" - CommandServerHostKey = pre(CommandServerKey) + "host" - CommandServerKey = pre(CommandRootKey) + "server" - CommandServerPortKey = pre(CommandServerKey) + "port" - CommandServerTypeKey = pre(CommandServerKey) + "type" - CommandTLSCaKey = pre(CommandTLSKey) + "ca" - CommandTLSCertKey = pre(CommandTLSKey) + "cert" - CommandTLSKey = pre(CommandRootKey) + "tls" - CommandTLSKeyKey = pre(CommandTLSKey) + "key" - CommandTLSServerNameKey = pre(CommandRootKey) + "server_name" - CommandTLSSkipVerifyKey = pre(CommandTLSKey) + "skip_verify" - LogLevelKey = pre(LogLevelRootKey) + "level" - LogPathKey = pre(LogLevelRootKey) + "path" - NginxReloadMonitoringPeriodKey = pre(DataPlaneConfigRootKey, "nginx") + "reload_monitoring_period" - NginxTreatWarningsAsErrorsKey = pre(DataPlaneConfigRootKey, "nginx") + "treat_warnings_as_error" - OTLPExportURLKey = pre(CollectorRootKey) + "otlp_export_url" - OTLPReceiverURLKey = pre(CollectorRootKey) + "otlp_receiver_url" + ClientPermitWithoutStreamKey = pre(ClientRootKey) + "permit_without_stream" + ClientTimeKey = pre(ClientRootKey) + "time" + ClientTimeoutKey = pre(ClientRootKey) + "timeout" + ClientMaxMessageSendSizeKey = pre(ClientRootKey) + "max_message_send_size" + ClientMaxMessageRecieveSizeKey = pre(ClientRootKey) + "max_message_receive_size" + ClientMaxMessageSizeKey = pre(ClientRootKey) + "max_message_size" + CollectorConfigPathKey = pre(CollectorRootKey) + "config_path" + CollectorExportersKey = pre(CollectorRootKey) + "exporters" + CollectorProcessorsKey = pre(CollectorRootKey) + "processors" + CollectorBatchProcessorKey = pre(CollectorProcessorsKey) + "batch" + CollectorBatchProcessorSendBatchSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_size" + CollectorBatchProcessorSendBatchMaxSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_max_size" + CollectorBatchProcessorTimeoutKey = pre(CollectorBatchProcessorKey) + "timeout" + CollectorHealthKey = pre(CollectorRootKey) + "health" + CollectorReceiversKey = pre(CollectorRootKey) + "receivers" + CollectorLogKey = pre(CollectorRootKey) + "log" + CollectorLogLevelKey = pre(CollectorLogKey) + "level" + CollectorLogPathKey = pre(CollectorLogKey) + "path" + CommandAuthKey = pre(CommandRootKey) + "auth" + CommandAuthTokenKey = pre(CommandAuthKey) + "token" + CommandServerHostKey = pre(CommandServerKey) + "host" + CommandServerKey = pre(CommandRootKey) + "server" + CommandServerPortKey = pre(CommandServerKey) + "port" + CommandServerTypeKey = pre(CommandServerKey) + "type" + CommandTLSCaKey = pre(CommandTLSKey) + "ca" + CommandTLSCertKey = pre(CommandTLSKey) + "cert" + CommandTLSKey = pre(CommandRootKey) + "tls" + CommandTLSKeyKey = pre(CommandTLSKey) + "key" + CommandTLSServerNameKey = pre(CommandRootKey) + "server_name" + CommandTLSSkipVerifyKey = pre(CommandTLSKey) + "skip_verify" + LogLevelKey = pre(LogLevelRootKey) + "level" + LogPathKey = pre(LogLevelRootKey) + "path" + NginxReloadMonitoringPeriodKey = pre(DataPlaneConfigRootKey, "nginx") + "reload_monitoring_period" + NginxTreatWarningsAsErrorsKey = pre(DataPlaneConfigRootKey, "nginx") + "treat_warnings_as_error" ) func pre(prefixes ...string) string { diff --git a/internal/config/types.go b/internal/config/types.go index 06738d5f00..4443f3e0a7 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -105,7 +105,11 @@ type ( Batch *Batch `yaml:"-" mapstructure:"batch"` } - Batch struct{} + Batch struct { + SendBatchSize uint32 `yaml:"-" mapstructure:"send_batch_size"` + SendBatchMaxSize uint32 `yaml:"-" mapstructure:"send_batch_max_size"` + Timeout time.Duration `yaml:"-" mapstructure:"timeout"` + } // OTel Collector Receiver configuration. Receivers struct { diff --git a/test/config/collector/test-opentelemetry-collector-agent.yaml b/test/config/collector/test-opentelemetry-collector-agent.yaml index cc75d7ff46..d8c1e404ed 100644 --- a/test/config/collector/test-opentelemetry-collector-agent.yaml +++ b/test/config/collector/test-opentelemetry-collector-agent.yaml @@ -23,6 +23,9 @@ receivers: processors: batch: + send_batch_size: 8192 + timeout: 200ms + send_batch_max_size: 0 exporters: otlp/0: diff --git a/test/types/config.go b/test/types/config.go index a290c7f05d..88e09103e7 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -63,7 +63,11 @@ func AgentConfig() *config.Config { }, }, Processors: config.Processors{ - Batch: &config.Batch{}, + Batch: &config.Batch{ + SendBatchSize: config.DefCollectorBatchProcessorSendBatchSize, + SendBatchMaxSize: config.DefCollectorBatchProcessorSendBatchMaxSize, + Timeout: config.DefCollectorBatchProcessorTimeout, + }, }, Receivers: config.Receivers{ OtlpReceivers: OtlpReceivers(),