From 19677fafada327738227ad7fba4c9a08171bad26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Ole=C5=9B?= Date: Wed, 29 Nov 2023 02:13:50 +0100 Subject: [PATCH] Fix registering callback metric --- observability_test.go | 32 ++++++++++++++++++++++---------- worker.go | 12 ++++++++---- worker_test.go | 31 ++++++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/observability_test.go b/observability_test.go index ded4129..896b4dc 100644 --- a/observability_test.go +++ b/observability_test.go @@ -10,17 +10,22 @@ import ( ) type obsTest struct { - log Logger - sr *tracetest.SpanRecorder + log Logger + sr *tracetest.SpanRecorder + reader *metric.ManualReader } -func (o obsTest) SpanRecorder() *tracetest.SpanRecorder { +func (o *obsTest) SpanRecorder() *tracetest.SpanRecorder { return o.sr } -func (o obsTest) Observability() Observability { - reader := metric.NewManualReader() - meterProvider := metric.NewMeterProvider(metric.WithReader(reader)) +func (o *obsTest) MetricReader() *metric.ManualReader { + return o.reader +} + +func (o *obsTest) Observability() Observability { + o.reader = metric.NewManualReader() + meterProvider := metric.NewMeterProvider(metric.WithReader(o.reader)) return Observability{ Logger: o.log, Meter: meterProvider.Meter("test"), @@ -49,16 +54,23 @@ func (l testLog) With(kv ...interface{}) Logger { } func (l testLog) Info(msg string, kv ...interface{}) { - l.t.Log("INFO", msg) + l.t.Helper() + l.t.Log("INFO", msg, kv) } + func (l testLog) Debug(msg string, kv ...interface{}) { - l.t.Log("DEBUG", msg) + l.t.Helper() + l.t.Log("DEBUG", msg, kv) } + func (l testLog) Error(msg string, kv ...interface{}) { - l.t.Log("ERROR", msg) + l.t.Helper() + l.t.Log("ERROR", msg, kv) } + func (l testLog) Warn(msg string, kv ...interface{}) { - l.t.Log("WARN", msg) + l.t.Helper() + l.t.Log("WARN", msg, kv) } func obsForTest(t *testing.T) obsTest { diff --git a/worker.go b/worker.go index 66c39d3..c45d98a 100644 --- a/worker.go +++ b/worker.go @@ -3,6 +3,7 @@ package kreconciler import ( "context" "errors" + "fmt" "runtime/debug" "sync" "time" @@ -14,7 +15,7 @@ import ( ) type metrics struct { - queueSizeObserver metric.Int64ObservableUpDownCounter + queueSizeObserver metric.Int64ObservableGauge dequeue metric.Int64Counter handleResult metric.Int64Counter delay metric.Int64Histogram @@ -72,7 +73,7 @@ func attrWorkerId(id int) attribute.KeyValue { } func decorateMeter(w *worker, meter metric.Meter) error { - queueSizeObserver, err := meter.Int64ObservableUpDownCounter("kreconciler_worker_queue_size", + queueSizeObserver, err := meter.Int64ObservableGauge("kreconciler_worker_queue_size", metric.WithUnit("{call}"), metric.WithDescription("The number of outstanding items to reconcile"), ) @@ -80,11 +81,14 @@ func decorateMeter(w *worker, meter metric.Meter) error { return err } w.metrics.queueSizeObserver = queueSizeObserver - meter.RegisterCallback( + _, err = meter.RegisterCallback( func(_ context.Context, o metric.Observer) error { o.ObserveInt64(queueSizeObserver, int64(w.objectLocks.Size()), metric.WithAttributes(attrWorkerId(w.id))) return nil - }) + }, queueSizeObserver) + if err != nil { + return err + } enqueue, err := meter.Int64Counter("kreconciler_enqueue", metric.WithUnit("{call}"), diff --git a/worker_test.go b/worker_test.go index 99b49e4..e4f2dca 100644 --- a/worker_test.go +++ b/worker_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/mock" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type action struct { @@ -160,7 +161,8 @@ func TestTraceWorker(t *testing.T) { mockHandler.On("Apply", mock.Anything, "b").Return(Result{}) mockHandler.On("Apply", mock.Anything, "c").Return(Result{RequeueDelay: 250 * time.Millisecond}) - worker, err := newWorker(obs.Observability(), 0, 10, 2, 10, time.Millisecond*100, 0, mockHandler) + ob := obs.Observability() + worker, err := newWorker(ob, 0, 10, 2, 10, time.Millisecond*100, 0, mockHandler) assert.NoError(t, err) wg := sync.WaitGroup{} @@ -223,6 +225,33 @@ func TestTraceWorker(t *testing.T) { assert.Equal(t, codes.Error, sr[7].Status().Code) } +func TestMetricWorker(t *testing.T) { + obs := obsForTest(t) + + mockHandler := new(handlerMock) + + ob := obs.Observability() + _, err := newWorker(ob, 0, 0, 0, 0, time.Millisecond*100, 0, mockHandler) + assert.NoError(t, err) + + reader := obs.MetricReader() + var data metricdata.ResourceMetrics + err = reader.Collect(context.Background(), &data) + assert.NoError(t, err) + assert.NotEmpty(t, data.ScopeMetrics) + + callbackMetric := "kreconciler_worker_queue_size" + found := false + for _, scopeMetric := range data.ScopeMetrics { + for _, metric := range scopeMetric.Metrics { + if metric.Name == callbackMetric { + found = true + } + } + } + assert.True(t, found, "metric %s not found", callbackMetric) +} + type handlerMock struct { mock.Mock }