Skip to content

Commit

Permalink
Merge branch 'v3' into add-nginx-plus-api-actions
Browse files Browse the repository at this point in the history
  • Loading branch information
aphralG committed Jan 15, 2025
2 parents aadb015 + 9b1bc5b commit 32c6f74
Show file tree
Hide file tree
Showing 32 changed files with 995 additions and 205 deletions.
2 changes: 1 addition & 1 deletion Makefile.packaging
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ BINARY_PATH := $(BUILD_DIR)/$(BINARY_NAME)
GPG_PUBLIC_KEY := .key
PACKAGE_BUILD ?= 1
PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')-$(PACKAGE_BUILD)
APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')_$(PACKAGE_BUILD)
APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v').$(PACKAGE_BUILD)
TARBALL_NAME := $(PACKAGE_PREFIX)v3.tar.gz

DEB_DISTROS ?= ubuntu-noble-24.04 ubuntu-jammy-22.04 ubuntu-focal-20.04 debian-bookworm-12 debian-bullseye-11
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.114.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/spf13/pflag v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusrec
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.114.0/go.mod h1:T1p6ShTr8farkE4qUB2TyGUIvRSN3s17D0qY7rMqCRM=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.114.0 h1:chiIs7XGNSoptd0VgVR912NFogB8srpuVzgeVM9xW0w=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.114.0/go.mod h1:cHIBJ01AgmkTUN2FgXz9NU8LbnLGUR2uxssCdiqwV4w=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0 h1:C1487YIMVBuJ8ixBLChyIl8VHlF8Ir6l2hfeYNNPPLM=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver v0.114.0/go.mod h1:+SvBS7Xu+pI67e5outljqWDMt+bIu9B6XON0nM4PIhY=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.114.0 h1:E686MeQcQ+a3Q47A/xAc3Nk6Qdz8wHcBLMJ3Y8bNKi0=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.114.0/go.mod h1:zkQAapuNRobj7GY8kKRal+2EYkAMWmZ1KMysUrQI48A=
github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.114.0 h1:/4hKXCQaD78y6cFA4xXMfZhh86/RNwrjyr6xnhFs5AA=
Expand Down
6 changes: 3 additions & 3 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
// Information from https://pkg.go.dev/github.com/cenkalti/backoff/v4#section-readme
func WaitUntil(
ctx context.Context,
backoffSettings *config.CommonSettings,
backoffSettings *config.BackOff,
operation backoff.Operation,
) error {
eb := backoff.NewExponentialBackOff()
Expand All @@ -68,7 +68,7 @@ func WaitUntil(
// nolint: ireturn
func WaitUntilWithData[T any](
ctx context.Context,
backoffSettings *config.CommonSettings,
backoffSettings *config.BackOff,
operation backoff.OperationWithData[T],
) (T, error) {
backoffWithContext := Context(ctx, backoffSettings)
Expand All @@ -77,7 +77,7 @@ func WaitUntilWithData[T any](
}

// nolint: ireturn
func Context(ctx context.Context, backoffSettings *config.CommonSettings) backoff.BackOffContext {
func Context(ctx context.Context, backoffSettings *config.BackOff) backoff.BackOffContext {
eb := backoff.NewExponentialBackOff()
eb.InitialInterval = backoffSettings.InitialInterval
eb.MaxInterval = backoffSettings.MaxInterval
Expand Down
20 changes: 11 additions & 9 deletions internal/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ func TestWaitUntil(t *testing.T) {

for _, test := range tests {
invocations = 0
settings := &config.CommonSettings{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
RandomizationFactor: config.DefBackoffRandomizationFactor,
Multiplier: config.DefBackoffMultiplier,
settings := &config.Client{
Backoff: &config.BackOff{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
RandomizationFactor: config.DefBackoffRandomizationFactor,
Multiplier: config.DefBackoffMultiplier,
},
}
result := WaitUntil(test.context, settings, test.operation)
result := WaitUntil(test.context, settings.Backoff, test.operation)

if test.expectedError {
assert.Errorf(t, result, test.name)
Expand Down Expand Up @@ -164,7 +166,7 @@ func TestWaitUntilWithData(t *testing.T) {
}

for _, test := range tests {
settings := &config.CommonSettings{
settings := &config.BackOff{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
Expand All @@ -185,7 +187,7 @@ func TestWaitUntilWithData(t *testing.T) {
}

func TestContext(t *testing.T) {
settings := &config.CommonSettings{
settings := &config.BackOff{
InitialInterval: 10 * time.Millisecond,
MaxInterval: 10 * time.Millisecond,
MaxElapsedTime: 10 * time.Millisecond,
Expand Down
2 changes: 2 additions & 0 deletions internal/collector/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -98,6 +99,7 @@ func createReceiverFactories() (map[component.Type]receiver.Factory, error) {
hostmetricsreceiver.NewFactory(),
nginxreceiver.NewFactory(),
nginxplusreceiver.NewFactory(),
tcplogreceiver.NewFactory(),
}

return receiver.MakeFactoryMap(receiverList...)
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestOTelComponentFactoriesDefault(t *testing.T) {
require.NoError(t, err, "OTelComponentFactories should not return an error")
assert.NotNil(t, factories, "factories should not be nil")

assert.Len(t, factories.Receivers, 4)
assert.Len(t, factories.Receivers, 5)
assert.Len(t, factories.Processors, 8)
assert.Len(t, factories.Exporters, 4)
assert.Len(t, factories.Extensions, 3)
Expand Down
127 changes: 123 additions & 4 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ import (
const (
maxTimeToWaitForShutdown = 30 * time.Second
filePermission = 0o600
// To conform to the rfc3164 spec the timestamp in the logs need to be formatted correctly.
// Here are some examples of what the timestamp conversions look like.
// Notice how if the day begins with a zero that the zero is replaced with an empty space.

// 2024-11-06T17:19:24+00:00 ---> Nov 6 17:19:24
// 2024-11-16T17:19:24+00:00 ---> Nov 16 17:19:24
timestampConversionExpression = `'EXPR(let timestamp = split(split(body, ">")[1], " ")[0]; ` +
`let newTimestamp = timestamp matches "(\\d{4})-(\\d{2})-(0\\d{1})T(\\d{2}):(\\d{2}):(\\d{2}).*" ` +
`? date(timestamp).Format("Jan 2 15:04:05") : date(timestamp).Format("Jan 02 15:04:05"); ` +
`split(body, ">")[0] + ">" + newTimestamp + " " + split(body, " ", 2)[1])'`
)

type (
Expand Down Expand Up @@ -193,9 +203,9 @@ func (oc *Collector) Close(ctx context.Context) error {
oc.service.Shutdown()
oc.cancel()

settings := oc.config.Common
settings := oc.config.Client.Backoff
settings.MaxElapsedTime = maxTimeToWaitForShutdown
err := backoff.WaitUntil(ctx, oc.config.Common, func() error {
err := backoff.WaitUntil(ctx, settings, func() error {
if oc.service.GetState() == otelcol.StateClosed {
return nil
}
Expand Down Expand Up @@ -243,7 +253,7 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
return
}

reloadCollector := oc.checkForNewNginxReceivers(nginxConfigContext)
reloadCollector := oc.checkForNewReceivers(nginxConfigContext)

if reloadCollector {
slog.InfoContext(ctx, "Reloading OTel collector config")
Expand Down Expand Up @@ -368,7 +378,7 @@ func (oc *Collector) restartCollector(ctx context.Context) {
}
}

func (oc *Collector) checkForNewNginxReceivers(nginxConfigContext *model.NginxConfigContext) bool {
func (oc *Collector) checkForNewReceivers(nginxConfigContext *model.NginxConfigContext) bool {
nginxReceiverFound, reloadCollector := oc.updateExistingNginxPlusReceiver(nginxConfigContext)

if !nginxReceiverFound && nginxConfigContext.PlusAPI.URL != "" {
Expand Down Expand Up @@ -406,6 +416,11 @@ func (oc *Collector) checkForNewNginxReceivers(nginxConfigContext *model.NginxCo
}
}

tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext)
if tcplogReceiversFound {
reloadCollector = true
}

return reloadCollector
}

Expand Down Expand Up @@ -476,6 +491,110 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
return nginxReceiverFound, reloadCollector
}

func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
newTcplogReceiverAdded := false
if nginxConfigContext.NAPSysLogServers != nil {
napLoop:
for _, napSysLogServer := range nginxConfigContext.NAPSysLogServers {
if oc.doesTcplogReceiverAlreadyExist(napSysLogServer) {
continue napLoop
}

oc.config.Collector.Receivers.TcplogReceivers = append(
oc.config.Collector.Receivers.TcplogReceivers,
config.TcplogReceiver{
ListenAddress: napSysLogServer,
Operators: []config.Operator{
{
Type: "add",
Fields: map[string]string{
"field": "body",
"value": timestampConversionExpression,
},
},
{
Type: "syslog_parser",
Fields: map[string]string{
"protocol": "rfc3164",
},
},
{
Type: "remove",
Fields: map[string]string{
"field": "attributes.message",
},
},
{
Type: "add",
Fields: map[string]string{
"field": "resource[\"instance.id\"]",
"value": nginxConfigContext.InstanceID,
},
},
},
},
)

newTcplogReceiverAdded = true
}
}

tcplogReceiverDeleted := oc.areNapReceiversDeleted(nginxConfigContext)

return newTcplogReceiverAdded || tcplogReceiverDeleted
}

func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool {
listenAddressesToBeDeleted := oc.getConfigDeletedNapReceivers(nginxConfigContext)
if len(listenAddressesToBeDeleted) != 0 {
oc.deleteNapReceivers(listenAddressesToBeDeleted)
return true
}

return false
}

func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bool) {
filteredReceivers := (oc.config.Collector.Receivers.TcplogReceivers)[:0]
for _, receiver := range oc.config.Collector.Receivers.TcplogReceivers {
if !listenAddressesToBeDeleted[receiver.ListenAddress] {
filteredReceivers = append(filteredReceivers, receiver)
}
}
oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers
}

func (oc *Collector) getConfigDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
elements := make(map[string]bool)

for _, tcplogReceiver := range oc.config.Collector.Receivers.TcplogReceivers {
elements[tcplogReceiver.ListenAddress] = true
}

if nginxConfigContext.NAPSysLogServers != nil {
addressesToDelete := make(map[string]bool)
for _, napAddress := range nginxConfigContext.NAPSysLogServers {
if !elements[napAddress] {
addressesToDelete[napAddress] = true
}
}

return addressesToDelete
}

return elements
}

func (oc *Collector) doesTcplogReceiverAlreadyExist(listenAddress string) bool {
for _, tcplogReceiver := range oc.config.Collector.Receivers.TcplogReceivers {
if listenAddress == tcplogReceiver.ListenAddress {
return true
}
}

return false
}

// nolint: revive
func (oc *Collector) updateResourceAttributes(
attributesToAdd []config.ResourceAttribute,
Expand Down
63 changes: 61 additions & 2 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"testing"

"github.com/nginx/agent/v3/test/protos"
Expand All @@ -26,6 +27,8 @@ import (
)

func TestCollector_New(t *testing.T) {
tmpDir := t.TempDir()

tests := []struct {
config *config.Config
expectedError error
Expand Down Expand Up @@ -56,7 +59,7 @@ func TestCollector_New(t *testing.T) {
name: "Successful initialization",
config: &config.Config{
Collector: &config.Collector{
Log: &config.Log{Path: "/tmp/test.log"},
Log: &config.Log{Path: filepath.Join(tmpDir, "test.log")},
},
},
expectedError: nil,
Expand All @@ -79,6 +82,8 @@ func TestCollector_New(t *testing.T) {
}

func TestCollector_Init(t *testing.T) {
tmpDir := t.TempDir()

tests := []struct {
name string
expectedLog string
Expand All @@ -104,7 +109,7 @@ func TestCollector_Init(t *testing.T) {
logBuf := &bytes.Buffer{}
stub.StubLoggerWith(logBuf)

conf.Collector.Log = &config.Log{Path: "/tmp/test.log"}
conf.Collector.Log = &config.Log{Path: filepath.Join(tmpDir, "test.log")}

if tt.expectedError {
conf.Collector.Receivers = config.Receivers{}
Expand Down Expand Up @@ -714,6 +719,60 @@ func TestCollector_updateResourceAttributes(t *testing.T) {
}
}

func TestCollector_updateTcplogReceivers(t *testing.T) {
conf := types.OTelConfig(t)
conf.Collector.Log.Path = ""
conf.Collector.Processors.Batch = nil
conf.Collector.Processors.Attribute = nil
conf.Collector.Processors.Resource = nil

collector, err := New(conf)
require.NoError(t, err)

nginxConfigContext := &model.NginxConfigContext{
NAPSysLogServers: []string{
"localhost:151",
},
}

assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)

t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) {
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)

assert.True(tt, tcplogReceiverAdded)
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1)
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
})

// Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice
// since there is already a receiver with the same ListenAddress
t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) {
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
assert.False(t, tcplogReceiverAdded)
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
})

t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) {
tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{})
assert.True(t, tcplogReceiverDeleted)
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
})

t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) {
tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{NAPSysLogServers: []string{
"localhost:152",
}})
assert.True(t, tcplogReceiverDeleted)
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
})
}

func createFakeCollector() *typesfakes.FakeCollectorInterface {
fakeCollector := &typesfakes.FakeCollectorInterface{}
fakeCollector.RunStub = func(ctx context.Context) error { return nil }
Expand Down
Loading

0 comments on commit 32c6f74

Please sign in to comment.