Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTel tcplog receiver for NAP #932

Merged
merged 17 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.114.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/spf13/pflag v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusrec
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.114.0/go.mod h1:T1p6ShTr8farkE4qUB2TyGUIvRSN3s17D0qY7rMqCRM=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.114.0 h1:chiIs7XGNSoptd0VgVR912NFogB8srpuVzgeVM9xW0w=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.114.0/go.mod h1:cHIBJ01AgmkTUN2FgXz9NU8LbnLGUR2uxssCdiqwV4w=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0 h1:C1487YIMVBuJ8ixBLChyIl8VHlF8Ir6l2hfeYNNPPLM=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0/go.mod h1:+SvBS7Xu+pI67e5outljqWDMt+bIu9B6XON0nM4PIhY=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.114.0 h1:E686MeQcQ+a3Q47A/xAc3Nk6Qdz8wHcBLMJ3Y8bNKi0=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.114.0/go.mod h1:zkQAapuNRobj7GY8kKRal+2EYkAMWmZ1KMysUrQI48A=
github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.114.0 h1:/4hKXCQaD78y6cFA4xXMfZhh86/RNwrjyr6xnhFs5AA=
Expand Down
2 changes: 2 additions & 0 deletions internal/collector/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -98,6 +99,7 @@ func createReceiverFactories() (map[component.Type]receiver.Factory, error) {
hostmetricsreceiver.NewFactory(),
nginxreceiver.NewFactory(),
nginxplusreceiver.NewFactory(),
tcplogreceiver.NewFactory(),
}

return receiver.MakeFactoryMap(receiverList...)
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestOTelComponentFactoriesDefault(t *testing.T) {
require.NoError(t, err, "OTelComponentFactories should not return an error")
assert.NotNil(t, factories, "factories should not be nil")

assert.Len(t, factories.Receivers, 4)
assert.Len(t, factories.Receivers, 5)
assert.Len(t, factories.Processors, 8)
assert.Len(t, factories.Exporters, 4)
assert.Len(t, factories.Extensions, 3)
Expand Down
80 changes: 78 additions & 2 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ import (
const (
maxTimeToWaitForShutdown = 30 * time.Second
filePermission = 0o600
// To conform to the rfc3164 spec the timestamp in the logs need to be formatted correctly.
// Here are some examples of what the timestamp conversions look like.
// Notice how if the day begins with a zero that the zero is replaced with an empty space.

// 2024-11-06T17:19:24+00:00 ---> Nov 6 17:19:24
// 2024-11-16T17:19:24+00:00 ---> Nov 16 17:19:24
timestampConversionExpression = `'EXPR(let timestamp = split(split(body, ">")[1], " ")[0]; ` +
`let newTimestamp = timestamp matches "(\\d{4})-(\\d{2})-(0\\d{1})T(\\d{2}):(\\d{2}):(\\d{2}).*" ` +
`? date(timestamp).Format("Jan 2 15:04:05") : date(timestamp).Format("Jan 02 15:04:05"); ` +
`split(body, ">")[0] + ">" + newTimestamp + " " + split(body, " ", 2)[1])'`
)

type (
Expand Down Expand Up @@ -243,7 +253,7 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
return
}

reloadCollector := oc.checkForNewNginxReceivers(nginxConfigContext)
reloadCollector := oc.checkForNewReceivers(nginxConfigContext)

if reloadCollector {
slog.InfoContext(ctx, "Reloading OTel collector config")
Expand Down Expand Up @@ -368,7 +378,7 @@ func (oc *Collector) restartCollector(ctx context.Context) {
}
}

func (oc *Collector) checkForNewNginxReceivers(nginxConfigContext *model.NginxConfigContext) bool {
func (oc *Collector) checkForNewReceivers(nginxConfigContext *model.NginxConfigContext) bool {
nginxReceiverFound, reloadCollector := oc.updateExistingNginxPlusReceiver(nginxConfigContext)

if !nginxReceiverFound && nginxConfigContext.PlusAPI.URL != "" {
Expand Down Expand Up @@ -406,6 +416,11 @@ func (oc *Collector) checkForNewNginxReceivers(nginxConfigContext *model.NginxCo
}
}

tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext)
if tcplogReceiversFound {
reloadCollector = true
}

return reloadCollector
}

Expand Down Expand Up @@ -476,6 +491,67 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
return nginxReceiverFound, reloadCollector
}

func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
newTcplogReceiverAdded := false
if nginxConfigContext.NAPSysLogServers != nil {
napLoop:
for _, napSysLogServer := range nginxConfigContext.NAPSysLogServers {
if oc.doesTcplogReceiverAlreadyExist(napSysLogServer) {
continue napLoop
}

oc.config.Collector.Receivers.TcplogReceivers = append(
oc.config.Collector.Receivers.TcplogReceivers,
config.TcplogReceiver{
ListenAddress: napSysLogServer,
Operators: []config.Operator{
{
Type: "add",
Fields: map[string]string{
"field": "body",
"value": timestampConversionExpression,
},
},
{
Type: "syslog_parser",
Fields: map[string]string{
"protocol": "rfc3164",
},
},
{
Type: "remove",
Fields: map[string]string{
"field": "attributes.message",
},
},
{
Type: "add",
Fields: map[string]string{
"field": "resource[\"instance.id\"]",
"value": nginxConfigContext.InstanceID,
},
},
},
},
)

newTcplogReceiverAdded = true
}
}

return newTcplogReceiverAdded
}

func (oc *Collector) doesTcplogReceiverAlreadyExist(listenAddress string) bool {
for _, tcplogReceiver := range oc.config.Collector.Receivers.TcplogReceivers {
if listenAddress == tcplogReceiver.ListenAddress {
return true
}
}

return false
}

// nolint: revive
func (oc *Collector) updateResourceAttributes(
attributesToAdd []config.ResourceAttribute,
Expand Down
47 changes: 45 additions & 2 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"testing"

"github.com/nginx/agent/v3/test/protos"
Expand All @@ -26,6 +27,8 @@ import (
)

func TestCollector_New(t *testing.T) {
tmpDir := t.TempDir()

tests := []struct {
config *config.Config
expectedError error
Expand Down Expand Up @@ -56,7 +59,7 @@ func TestCollector_New(t *testing.T) {
name: "Successful initialization",
config: &config.Config{
Collector: &config.Collector{
Log: &config.Log{Path: "/tmp/test.log"},
Log: &config.Log{Path: filepath.Join(tmpDir, "test.log")},
},
},
expectedError: nil,
Expand All @@ -79,6 +82,8 @@ func TestCollector_New(t *testing.T) {
}

func TestCollector_Init(t *testing.T) {
tmpDir := t.TempDir()

tests := []struct {
name string
expectedLog string
Expand All @@ -104,7 +109,7 @@ func TestCollector_Init(t *testing.T) {
logBuf := &bytes.Buffer{}
stub.StubLoggerWith(logBuf)

conf.Collector.Log = &config.Log{Path: "/tmp/test.log"}
conf.Collector.Log = &config.Log{Path: filepath.Join(tmpDir, "test.log")}

if tt.expectedError {
conf.Collector.Receivers = config.Receivers{}
Expand Down Expand Up @@ -714,6 +719,44 @@ func TestCollector_updateResourceAttributes(t *testing.T) {
}
}

func TestCollector_updateTcplogReceivers(t *testing.T) {
conf := types.OTelConfig(t)
conf.Collector.Log.Path = ""
conf.Collector.Processors.Batch = nil
conf.Collector.Processors.Attribute = nil
conf.Collector.Processors.Resource = nil

collector, err := New(conf)
require.NoError(t, err)

nginxConfigContext := &model.NginxConfigContext{
NAPSysLogServers: []string{
"localhost:151",
},
}

assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)

t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) {
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)

assert.True(tt, tcplogReceiverAdded)
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1)
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
})

// Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice
// since there is already a receiver with the same ListenAddress
t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) {
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
assert.False(t, tcplogReceiverAdded)
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
})
}

func createFakeCollector() *typesfakes.FakeCollectorInterface {
fakeCollector := &typesfakes.FakeCollectorInterface{}
fakeCollector.RunStub = func(ctx context.Context) error { return nil }
Expand Down
34 changes: 34 additions & 0 deletions internal/collector/otelcol.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ receivers:
location: "{{- .PlusAPI.Location -}}"
collection_interval: 10s
{{- end }}
{{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }}
tcplog/{{$index}}:
listen_address: "{{- .ListenAddress -}}"
operators:
{{- range $index, $operator := .Operators }}
- type: {{.Type}}
{{- range $key, $value := .Fields }}
{{$key}}: {{$value}}
{{- end }}
{{- end }}
{{- end }}

processors:
{{- if ne .Processors.Resource nil }}
Expand Down Expand Up @@ -241,3 +252,26 @@ service:
{{- if ne .Exporters.Debug nil }}
- debug
{{- end }}
{{- if ne .Receivers.TcplogReceivers nil }}
logs:
receivers:
{{- range $index, $tcplogReceiver := .Receivers.TcplogReceivers }}
- tcplog/{{$index}}
{{- end }}
processors:
{{- if ne .Processors.Resource nil }}
{{- if .Processors.Resource.Attributes }}
- resource
{{- end }}
{{- end }}
{{- if ne .Processors.Batch nil }}
- batch
{{- end }}
exporters:
{{- range $index, $otlpExporter := .Exporters.OtlpExporters }}
- otlp/{{$index}}
{{- end }}
{{- if ne .Exporters.Debug nil }}
- debug
{{- end }}
{{- end }}
34 changes: 33 additions & 1 deletion internal/collector/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,20 @@ func TestConfigProviderSettings(t *testing.T) {
}

func TestTemplateWrite(t *testing.T) {
tmpDir := t.TempDir()

cfg := types.AgentConfig()
actualConfPath := filepath.Join("/tmp/", "nginx-agent-otelcol-test.yaml")
actualConfPath := filepath.Join(tmpDir, "nginx-agent-otelcol-test.yaml")
cfg.Collector.ConfigPath = actualConfPath
cfg.Collector.Processors.Resource = &config.Resource{
Attributes: []config.ResourceAttribute{
{
Key: "resource.id",
Action: "add",
Value: "12345",
},
},
}

cfg.Collector.Exporters.PrometheusExporter = &config.PrometheusExporter{
Server: &config.ServerConfig{
Expand Down Expand Up @@ -104,6 +115,27 @@ func TestTemplateWrite(t *testing.T) {
},
})

cfg.Collector.Receivers.TcplogReceivers = []config.TcplogReceiver{
{
ListenAddress: "localhost:151",
Operators: []config.Operator{
{
Type: "add",
Fields: map[string]string{
"field": "body",
"value": `EXPR(split(body, ",")[0])`,
},
},
{
Type: "remove",
Fields: map[string]string{
"field": "attributes.message",
},
},
},
},
}

cfg.Collector.Extensions.HeadersSetter = &config.HeadersSetter{
Headers: []config.Header{
{
Expand Down
8 changes: 0 additions & 8 deletions internal/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,6 @@ const (
DefCollectorExtensionsHealthTLSCAPath = ""
DefCollectorExtensionsHealthTLSSkipVerify = false
DefCollectorExtensionsHealthTLServerNameKey = ""

DefCollectorPrometheusExporterServerHost = ""
DefCollectorPrometheusExporterServerPort = 0
DefCollectorPrometheusExporterTLSCertPath = ""
DefCollectorPrometheusExporterTLSKeyPath = ""
DefCollectorPrometheusExporterTLSCAPath = ""
DefCollectorPrometheusExporterTLSSkipVerify = false
DefCollectorPrometheusExporterTLServerNameKey = ""
)

func DefaultFeatures() []string {
Expand Down
14 changes: 14 additions & 0 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type (
OtlpReceivers []OtlpReceiver `yaml:"-" mapstructure:"otlp_receivers"`
NginxReceivers []NginxReceiver `yaml:"-" mapstructure:"nginx_receivers"`
NginxPlusReceivers []NginxPlusReceiver `yaml:"-" mapstructure:"nginx_plus_receivers"`
TcplogReceivers []TcplogReceiver `yaml:"-" mapstructure:"tcplog_receivers"`
}

OtlpReceiver struct {
Expand All @@ -172,6 +173,19 @@ type (
OtlpTLSConfig *OtlpTLSConfig `yaml:"-" mapstructure:"tls"`
}

TcplogReceiver struct {
ListenAddress string `yaml:"-" mapstructure:"listen_address"`
Operators []Operator `yaml:"-" mapstructure:"operators"`
}

// There are many types of operators with different field names so we use a generic map to store the fields.
// See here for more info:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/docs/operators/README.md
Operator struct {
Fields map[string]string `yaml:"-" mapstructure:"fields"`
Type string `yaml:"-" mapstructure:"type"`
}

NginxReceiver struct {
InstanceID string `yaml:"-" mapstructure:"instance_id"`
StubStatus APIDetails `yaml:"-" mapstructure:"api_details"`
Expand Down
Loading