Skip to content

Commit

Permalink
Update batch processor config
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley committed Sep 24, 2024
1 parent 1ebc153 commit 534d575
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 98 deletions.
3 changes: 3 additions & 0 deletions internal/collector/otelcol.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ receivers:
processors:
{{- if ne .Processors.Batch nil }}
batch:
send_batch_size: {{ .Processors.Batch.SendBatchSize }}
timeout: {{ .Processors.Batch.Timeout }}
send_batch_max_size: {{ .Processors.Batch.SendBatchMaxSize }}
{{- end }}

exporters:
Expand Down
81 changes: 57 additions & 24 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,27 +209,6 @@ func registerFlags() {
"How often the NGINX Agent will check for file changes.",
)

fs.String(
CollectorConfigPathKey,
DefCollectorConfigPath,
"The path to the Opentelemetry Collector configuration file.",
)

fs.String(
CollectorLogLevelKey,
DefCollectorLogLevel,
`The desired verbosity level for logging messages from nginx-agent OTel collector.
Available options, in order of severity from highest to lowest, are:
ERROR, WARN, INFO and DEBUG.`,
)

fs.String(
CollectorLogPathKey,
DefCollectorLogPath,
`The path to output OTel collector log messages to.
If the default path doesn't exist, log messages are output to stdout/stderr.`,
)

fs.Int(
ClientMaxMessageSizeKey,
DefMaxMessageSize,
Expand All @@ -248,6 +227,8 @@ func registerFlags() {
"Updates the client grpc setting MaxSendMsgSize with the specific value in MB.",
)

registerCollectorFlags(fs)

fs.SetNormalizeFunc(normalizeFunc)

fs.VisitAll(func(flag *flag.Flag) {
Expand All @@ -261,6 +242,47 @@ func registerFlags() {
})
}

func registerCollectorFlags(fs *flag.FlagSet) {
fs.String(
CollectorConfigPathKey,
DefCollectorConfigPath,
"The path to the Opentelemetry Collector configuration file.",
)

fs.String(
CollectorLogLevelKey,
DefCollectorLogLevel,
`The desired verbosity level for logging messages from nginx-agent OTel collector.
Available options, in order of severity from highest to lowest, are:
ERROR, WARN, INFO and DEBUG.`,
)

fs.String(
CollectorLogPathKey,
DefCollectorLogPath,
`The path to output OTel collector log messages to.
If the default path doesn't exist, log messages are output to stdout/stderr.`,
)

fs.Uint32(
CollectorBatchProcessorSendBatchSizeKey,
DefCollectorBatchProcessorSendBatchSize,
`Number of metric data points after which a batch will be sent regardless of the timeout.`,
)

fs.Uint32(
CollectorBatchProcessorSendBatchMaxSizeKey,
DefCollectorBatchProcessorSendBatchMaxSize,
`The upper limit of the batch size.`,
)

fs.Duration(
CollectorBatchProcessorTimeoutKey,
DefCollectorBatchProcessorTimeout,
`Time duration after which a batch will be sent regardless of size.`,
)
}

func seekFileInPaths(fileName string, directories ...string) (string, error) {
for _, directory := range directories {
f := filepath.Join(directory, fileName)
Expand Down Expand Up @@ -346,7 +368,6 @@ func resolveCollector(allowedDirs []string) (*Collector, error) {
var (
err error
exporters Exporters
processors Processors
receivers Receivers
healthCheck ServerConfig
log Log
Expand All @@ -355,7 +376,6 @@ func resolveCollector(allowedDirs []string) (*Collector, error) {
err = errors.Join(
err,
resolveMapStructure(CollectorExportersKey, &exporters),
resolveMapStructure(CollectorProcessorsKey, &processors),
resolveMapStructure(CollectorReceiversKey, &receivers),
resolveMapStructure(CollectorHealthKey, &healthCheck),
resolveMapStructure(CollectorLogKey, &log),
Expand All @@ -375,7 +395,7 @@ func resolveCollector(allowedDirs []string) (*Collector, error) {
col := &Collector{
ConfigPath: viperInstance.GetString(CollectorConfigPathKey),
Exporters: exporters,
Processors: processors,
Processors: resolveProcessors(),
Receivers: receivers,
Health: &healthCheck,
Log: &log,
Expand All @@ -389,6 +409,19 @@ func resolveCollector(allowedDirs []string) (*Collector, error) {
return col, nil
}

func resolveProcessors() Processors {
processors := Processors{}

if viperInstance.IsSet(CollectorBatchProcessorKey) {
processors.Batch = &Batch{}
processors.Batch.SendBatchSize = viperInstance.GetUint32(CollectorBatchProcessorSendBatchSizeKey)
processors.Batch.SendBatchMaxSize = viperInstance.GetUint32(CollectorBatchProcessorSendBatchMaxSizeKey)
processors.Batch.Timeout = viperInstance.GetDuration(CollectorBatchProcessorTimeoutKey)
}

return processors
}

func resolveCommand() *Command {
if !viperInstance.IsSet(CommandRootKey) {
return nil
Expand Down
78 changes: 38 additions & 40 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,46 +171,40 @@ func TestResolveClient(t *testing.T) {

func TestResolveCollector(t *testing.T) {
testDefault := getAgentConfig()
tests := []struct {
expected *Collector
name string
errMsg string
shouldErr bool
}{
{
name: "Test 1: Happy path",
expected: testDefault.Collector,
},
{
name: "Test 2: Non allowed path",
expected: &Collector{
ConfigPath: "/path/to/secret",
},
shouldErr: true,
errMsg: "collector path /path/to/secret not allowed",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter))
viperInstance.Set(CollectorConfigPathKey, test.expected.ConfigPath)
viperInstance.Set(CollectorReceiversKey, test.expected.Receivers)
viperInstance.Set(CollectorProcessorsKey, test.expected.Processors)
viperInstance.Set(CollectorExportersKey, test.expected.Exporters)
viperInstance.Set(CollectorHealthKey, test.expected.Health)
viperInstance.Set(CollectorLogKey, test.expected.Log)

actual, err := resolveCollector(testDefault.AllowedDirectories)
if test.shouldErr {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errMsg)
} else {
require.NoError(t, err)
assert.Equal(t, test.expected, actual)
}
})
}
t.Run("Test 1: Happy path", func(t *testing.T) {
expected := testDefault.Collector

viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter))
viperInstance.Set(CollectorConfigPathKey, expected.ConfigPath)
viperInstance.Set(CollectorReceiversKey, expected.Receivers)
viperInstance.Set(CollectorBatchProcessorKey, expected.Processors.Batch)
viperInstance.Set(CollectorBatchProcessorSendBatchSizeKey, expected.Processors.Batch.SendBatchSize)
viperInstance.Set(CollectorBatchProcessorSendBatchMaxSizeKey, expected.Processors.Batch.SendBatchMaxSize)
viperInstance.Set(CollectorBatchProcessorTimeoutKey, expected.Processors.Batch.Timeout)
viperInstance.Set(CollectorExportersKey, expected.Exporters)
viperInstance.Set(CollectorHealthKey, expected.Health)
viperInstance.Set(CollectorLogKey, expected.Log)

actual, err := resolveCollector(testDefault.AllowedDirectories)
require.NoError(t, err)
assert.Equal(t, expected, actual)
})

t.Run("Test 2: Non allowed path", func(t *testing.T) {
expected := &Collector{
ConfigPath: "/path/to/secret",
}
errMsg := "collector path /path/to/secret not allowed"

viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter))
viperInstance.Set(CollectorConfigPathKey, expected.ConfigPath)

_, err := resolveCollector(testDefault.AllowedDirectories)

require.Error(t, err)
assert.Contains(t, err.Error(), errMsg)
})
}

func TestCommand(t *testing.T) {
Expand Down Expand Up @@ -330,7 +324,11 @@ func getAgentConfig() *Config {
},
},
Processors: Processors{
Batch: &Batch{},
Batch: &Batch{
SendBatchMaxSize: DefCollectorBatchProcessorSendBatchMaxSize,
SendBatchSize: DefCollectorBatchProcessorSendBatchSize,
Timeout: DefCollectorBatchProcessorTimeout,
},
},
Receivers: Receivers{
OtlpReceivers: []OtlpReceiver{
Expand Down
4 changes: 4 additions & 0 deletions internal/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ const (
DefMaxMessageRecieveSize = 4194304
// math.MaxInt32
DefMaxMessageSendSize = math.MaxInt32

DefCollectorBatchProcessorSendBatchSize = 8192
DefCollectorBatchProcessorSendBatchMaxSize = 0
DefCollectorBatchProcessorTimeout = 200 * time.Millisecond
)
66 changes: 34 additions & 32 deletions internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,40 @@ const (

var (
// child flags saved as vars to enable easier prefixing.
ClientPermitWithoutStreamKey = pre(ClientRootKey) + "permit_without_stream"
ClientTimeKey = pre(ClientRootKey) + "time"
ClientTimeoutKey = pre(ClientRootKey) + "timeout"
ClientMaxMessageSendSizeKey = pre(ClientRootKey) + "max_message_send_size"
ClientMaxMessageRecieveSizeKey = pre(ClientRootKey) + "max_message_receive_size"
ClientMaxMessageSizeKey = pre(ClientRootKey) + "max_message_size"
CollectorConfigPathKey = pre(CollectorRootKey) + "config_path"
CollectorExportersKey = pre(CollectorRootKey) + "exporters"
CollectorProcessorsKey = pre(CollectorRootKey) + "processors"
CollectorHealthKey = pre(CollectorRootKey) + "health"
CollectorReceiversKey = pre(CollectorRootKey) + "receivers"
CollectorLogKey = pre(CollectorRootKey) + "log"
CollectorLogLevelKey = pre(CollectorLogKey) + "level"
CollectorLogPathKey = pre(CollectorLogKey) + "path"
CommandAuthKey = pre(CommandRootKey) + "auth"
CommandAuthTokenKey = pre(CommandAuthKey) + "token"
CommandServerHostKey = pre(CommandServerKey) + "host"
CommandServerKey = pre(CommandRootKey) + "server"
CommandServerPortKey = pre(CommandServerKey) + "port"
CommandServerTypeKey = pre(CommandServerKey) + "type"
CommandTLSCaKey = pre(CommandTLSKey) + "ca"
CommandTLSCertKey = pre(CommandTLSKey) + "cert"
CommandTLSKey = pre(CommandRootKey) + "tls"
CommandTLSKeyKey = pre(CommandTLSKey) + "key"
CommandTLSServerNameKey = pre(CommandRootKey) + "server_name"
CommandTLSSkipVerifyKey = pre(CommandTLSKey) + "skip_verify"
LogLevelKey = pre(LogLevelRootKey) + "level"
LogPathKey = pre(LogLevelRootKey) + "path"
NginxReloadMonitoringPeriodKey = pre(DataPlaneConfigRootKey, "nginx") + "reload_monitoring_period"
NginxTreatWarningsAsErrorsKey = pre(DataPlaneConfigRootKey, "nginx") + "treat_warnings_as_error"
OTLPExportURLKey = pre(CollectorRootKey) + "otlp_export_url"
OTLPReceiverURLKey = pre(CollectorRootKey) + "otlp_receiver_url"
ClientPermitWithoutStreamKey = pre(ClientRootKey) + "permit_without_stream"
ClientTimeKey = pre(ClientRootKey) + "time"
ClientTimeoutKey = pre(ClientRootKey) + "timeout"
ClientMaxMessageSendSizeKey = pre(ClientRootKey) + "max_message_send_size"
ClientMaxMessageRecieveSizeKey = pre(ClientRootKey) + "max_message_receive_size"
ClientMaxMessageSizeKey = pre(ClientRootKey) + "max_message_size"
CollectorConfigPathKey = pre(CollectorRootKey) + "config_path"
CollectorExportersKey = pre(CollectorRootKey) + "exporters"
CollectorProcessorsKey = pre(CollectorRootKey) + "processors"
CollectorBatchProcessorKey = pre(CollectorProcessorsKey) + "batch"
CollectorBatchProcessorSendBatchSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_size"
CollectorBatchProcessorSendBatchMaxSizeKey = pre(CollectorBatchProcessorKey) + "send_batch_max_size"
CollectorBatchProcessorTimeoutKey = pre(CollectorBatchProcessorKey) + "timeout"
CollectorHealthKey = pre(CollectorRootKey) + "health"
CollectorReceiversKey = pre(CollectorRootKey) + "receivers"
CollectorLogKey = pre(CollectorRootKey) + "log"
CollectorLogLevelKey = pre(CollectorLogKey) + "level"
CollectorLogPathKey = pre(CollectorLogKey) + "path"
CommandAuthKey = pre(CommandRootKey) + "auth"
CommandAuthTokenKey = pre(CommandAuthKey) + "token"
CommandServerHostKey = pre(CommandServerKey) + "host"
CommandServerKey = pre(CommandRootKey) + "server"
CommandServerPortKey = pre(CommandServerKey) + "port"
CommandServerTypeKey = pre(CommandServerKey) + "type"
CommandTLSCaKey = pre(CommandTLSKey) + "ca"
CommandTLSCertKey = pre(CommandTLSKey) + "cert"
CommandTLSKey = pre(CommandRootKey) + "tls"
CommandTLSKeyKey = pre(CommandTLSKey) + "key"
CommandTLSServerNameKey = pre(CommandRootKey) + "server_name"
CommandTLSSkipVerifyKey = pre(CommandTLSKey) + "skip_verify"
LogLevelKey = pre(LogLevelRootKey) + "level"
LogPathKey = pre(LogLevelRootKey) + "path"
NginxReloadMonitoringPeriodKey = pre(DataPlaneConfigRootKey, "nginx") + "reload_monitoring_period"
NginxTreatWarningsAsErrorsKey = pre(DataPlaneConfigRootKey, "nginx") + "treat_warnings_as_error"
)

func pre(prefixes ...string) string {
Expand Down
6 changes: 5 additions & 1 deletion internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ type (
Batch *Batch `yaml:"-" mapstructure:"batch"`
}

Batch struct{}
Batch struct {
SendBatchSize uint32 `yaml:"-" mapstructure:"send_batch_size"`
SendBatchMaxSize uint32 `yaml:"-" mapstructure:"send_batch_max_size"`
Timeout time.Duration `yaml:"-" mapstructure:"timeout"`
}

// OTel Collector Receiver configuration.
Receivers struct {
Expand Down
3 changes: 3 additions & 0 deletions test/config/collector/test-opentelemetry-collector-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ receivers:

processors:
batch:
send_batch_size: 8192
timeout: 200ms
send_batch_max_size: 0

exporters:
otlp/0:
Expand Down
6 changes: 5 additions & 1 deletion test/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func AgentConfig() *config.Config {
},
},
Processors: config.Processors{
Batch: &config.Batch{},
Batch: &config.Batch{
SendBatchSize: config.DefCollectorBatchProcessorSendBatchSize,
SendBatchMaxSize: config.DefCollectorBatchProcessorSendBatchMaxSize,
Timeout: config.DefCollectorBatchProcessorTimeout,
},
},
Receivers: config.Receivers{
OtlpReceivers: OtlpReceivers(),
Expand Down

0 comments on commit 534d575

Please sign in to comment.