Skip to content

Commit

Permalink
Otel fix missing attributes (#917)
Browse files Browse the repository at this point in the history
* setting instance attributes on every scrape

* fix linter: fieldalignment

* fieldalignment

* print state during collector tests

* remove otlp exporter from agent config, add prometheus
  • Loading branch information
sean-breen authored Nov 11, 2024
1 parent faba7b1 commit 496e750
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ const Percentage = 100

type (
NginxLogScraper struct {
outChan <-chan []*entry.Entry
cfg *config.Config
logger *zap.Logger
mb *metadata.MetricsBuilder
rb *metadata.ResourceBuilder
pipe *pipeline.DirectedPipeline
wg *sync.WaitGroup
cancel context.CancelFunc
entries []*entry.Entry
mut sync.Mutex
outChan <-chan []*entry.Entry
cfg *config.Config
settings receiver.Settings
logger *zap.Logger
mb *metadata.MetricsBuilder
rb *metadata.ResourceBuilder
pipe *pipeline.DirectedPipeline
wg *sync.WaitGroup
cancel context.CancelFunc
entries []*entry.Entry
mut sync.Mutex
}

NginxMetrics struct {
Expand All @@ -74,9 +75,6 @@ func NewScraper(

mb := metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings)
rb := mb.NewResourceBuilder()
rb.SetInstanceID(settings.ID.Name())
rb.SetInstanceType("nginx")
logger.Debug("NGINX OSS resource info", zap.Any("resource", rb))

operators := make([]operator.Config, 0)

Expand All @@ -96,14 +94,15 @@ func NewScraper(
}

return &NginxLogScraper{
cfg: cfg,
logger: logger,
mb: mb,
rb: rb,
mut: sync.Mutex{},
outChan: outChan,
pipe: stanzaPipeline,
wg: &sync.WaitGroup{},
cfg: cfg,
logger: logger,
settings: settings,
mb: mb,
rb: rb,
mut: sync.Mutex{},
outChan: outChan,
pipe: stanzaPipeline,
wg: &sync.WaitGroup{},
}, nil
}

Expand Down Expand Up @@ -165,6 +164,10 @@ func (nls *NginxLogScraper) Scrape(_ context.Context) (pmetric.Metrics, error) {
nls.entries = make([]*entry.Entry, 0)
timeNow := pcommon.NewTimestampFromTime(time.Now())

nls.rb.SetInstanceID(nls.settings.ID.Name())
nls.rb.SetInstanceType("nginx")
nls.logger.Debug("NGINX OSS access log resource info", zap.Any("resource", nls.rb))

nls.mb.RecordNginxHTTPResponseStatusDataPoint(
timeNow,
nginxMetrics.responseStatuses.oneHundredStatusRange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
type NginxStubStatusScraper struct {
httpClient *http.Client
client *client.NginxClient

settings component.TelemetrySettings
cfg *config.Config
mb *metadata.MetricsBuilder
rb *metadata.ResourceBuilder
cfg *config.Config
mb *metadata.MetricsBuilder
rb *metadata.ResourceBuilder
settings receiver.Settings
}

var _ scraperhelper.Scraper = (*NginxStubStatusScraper)(nil)
Expand All @@ -43,12 +42,9 @@ func NewScraper(

mb := metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings)
rb := mb.NewResourceBuilder()
rb.SetInstanceID(settings.ID.Name())
rb.SetInstanceType("nginx")
logger.Debug("NGINX OSS resource info", zap.Any("resource", rb))

return &NginxStubStatusScraper{
settings: settings.TelemetrySettings,
settings: settings,
cfg: cfg,
mb: mb,
rb: rb,
Expand All @@ -60,7 +56,7 @@ func (s *NginxStubStatusScraper) ID() component.ID {
}

func (s *NginxStubStatusScraper) Start(ctx context.Context, host component.Host) error {
httpClient, err := s.cfg.ToClient(ctx, host, s.settings)
httpClient, err := s.cfg.ToClient(ctx, host, s.settings.TelemetrySettings)
if err != nil {
return err
}
Expand All @@ -85,6 +81,10 @@ func (s *NginxStubStatusScraper) Scrape(context.Context) (pmetric.Metrics, error
return pmetric.Metrics{}, err
}

s.rb.SetInstanceID(s.settings.ID.Name())
s.rb.SetInstanceType("nginx")
s.settings.Logger.Debug("NGINX OSS stub status resource info", zap.Any("resource", s.rb))

now := pcommon.NewTimestampFromTime(time.Now())

s.mb.RecordNginxHTTPRequestsDataPoint(now, stats.Requests)
Expand Down
16 changes: 8 additions & 8 deletions internal/collector/nginxplusreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
Expand All @@ -33,11 +32,11 @@ const (

type nginxPlusScraper struct {
plusClient *plusapi.NginxClient
settings component.TelemetrySettings
cfg *Config
mb *metadata.MetricsBuilder
rb *metadata.ResourceBuilder
logger *zap.Logger
settings receiver.Settings
}

func newNginxPlusScraper(
Expand All @@ -48,21 +47,18 @@ func newNginxPlusScraper(
logger.Info("Creating NGINX Plus scraper")

mb := metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings)
rb := mb.NewResourceBuilder()

plusClient, err := plusapi.NewNginxClient(cfg.Endpoint,
plusapi.WithMaxAPIVersion(),
)
if err != nil {
return nil, err
}

rb := mb.NewResourceBuilder()
rb.SetInstanceID(settings.ID.Name())
rb.SetInstanceType("nginxplus")
logger.Debug("NGINX Plus resource info", zap.Any("resource", rb))

return &nginxPlusScraper{
plusClient: plusClient,
settings: settings.TelemetrySettings,
settings: settings,
cfg: cfg,
mb: mb,
rb: rb,
Expand All @@ -76,6 +72,10 @@ func (nps *nginxPlusScraper) scrape(ctx context.Context) (pmetric.Metrics, error
return pmetric.Metrics{}, fmt.Errorf("GET stats: %w", err)
}

nps.rb.SetInstanceID(nps.settings.ID.Name())
nps.rb.SetInstanceType("nginxplus")
nps.logger.Debug("NGINX Plus resource info", zap.Any("resource", nps.rb))

nps.logger.Debug("NGINX Plus stats", zap.Any("stats", stats))
nps.recordMetrics(stats)

Expand Down
59 changes: 50 additions & 9 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/nginx/agent/v3/test/protos"
"github.com/nginx/agent/v3/test/stub"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/otelcol"
Expand Down Expand Up @@ -232,6 +231,21 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
conf.Collector.Processors.Batch = nil
conf.Collector.Processors.Attribute = nil
conf.Collector.Processors.Resource = nil
conf.Collector.Exporters.OtlpExporters = nil
conf.Collector.Exporters.PrometheusExporter = &config.PrometheusExporter{
Server: &config.ServerConfig{
Host: "",
Port: 0,
Type: 0,
},
TLS: &config.TLSConfig{
Cert: "",
Key: "",
Ca: "",
ServerName: "",
SkipVerify: false,
},
}

tests := []struct {
message *bus.Message
Expand Down Expand Up @@ -286,17 +300,23 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {

assert.Eventually(
tt,
func() bool { return collector.service.GetState() == otelcol.StateRunning },
6*time.Second,
func() bool {
tt.Logf("Collector state is %+v", collector.service.GetState())
return collector.service.GetState() == otelcol.StateRunning
},
5*time.Second,
100*time.Millisecond,
)

collector.Process(ctx, test.message)

assert.Eventually(
tt,
func() bool { return collector.service.GetState() == otelcol.StateRunning },
6*time.Second,
func() bool {
tt.Logf("Collector state is %+v", collector.service.GetState())
return collector.service.GetState() == otelcol.StateRunning
},
5*time.Second,
100*time.Millisecond,
)

Expand All @@ -312,6 +332,21 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) {
conf.Collector.Processors.Batch = nil
conf.Collector.Processors.Attribute = nil
conf.Collector.Processors.Resource = nil
conf.Collector.Exporters.OtlpExporters = nil
conf.Collector.Exporters.PrometheusExporter = &config.PrometheusExporter{
Server: &config.ServerConfig{
Host: "",
Port: 0,
Type: 0,
},
TLS: &config.TLSConfig{
Cert: "",
Key: "",
Ca: "",
ServerName: "",
SkipVerify: false,
},
}

tests := []struct {
message *bus.Message
Expand Down Expand Up @@ -342,17 +377,23 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) {

assert.Eventually(
tt,
func() bool { return collector.service.GetState() == otelcol.StateRunning },
6*time.Second,
func() bool {
tt.Logf("Collector state is %+v", collector.service.GetState())
return collector.service.GetState() == otelcol.StateRunning
},
5*time.Second,
100*time.Millisecond,
)

collector.Process(ctx, test.message)

assert.Eventually(
tt,
func() bool { return collector.service.GetState() == otelcol.StateRunning },
6*time.Second,
func() bool {
tt.Logf("Collector state is %+v", collector.service.GetState())
return collector.service.GetState() == otelcol.StateRunning
},
5*time.Second,
100*time.Millisecond,
)

Expand Down

0 comments on commit 496e750

Please sign in to comment.