Skip to content

Commit

Permalink
Fix registering callback metric
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszo committed Nov 29, 2023
1 parent 5d840d8 commit 19677fa
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
32 changes: 22 additions & 10 deletions observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@ import (
)

type obsTest struct {
log Logger
sr *tracetest.SpanRecorder
log Logger
sr *tracetest.SpanRecorder
reader *metric.ManualReader
}

func (o obsTest) SpanRecorder() *tracetest.SpanRecorder {
func (o *obsTest) SpanRecorder() *tracetest.SpanRecorder {
return o.sr
}

func (o obsTest) Observability() Observability {
reader := metric.NewManualReader()
meterProvider := metric.NewMeterProvider(metric.WithReader(reader))
func (o *obsTest) MetricReader() *metric.ManualReader {
return o.reader
}

func (o *obsTest) Observability() Observability {
o.reader = metric.NewManualReader()
meterProvider := metric.NewMeterProvider(metric.WithReader(o.reader))
return Observability{
Logger: o.log,
Meter: meterProvider.Meter("test"),
Expand Down Expand Up @@ -49,16 +54,23 @@ func (l testLog) With(kv ...interface{}) Logger {
}

func (l testLog) Info(msg string, kv ...interface{}) {
l.t.Log("INFO", msg)
l.t.Helper()
l.t.Log("INFO", msg, kv)
}

func (l testLog) Debug(msg string, kv ...interface{}) {
l.t.Log("DEBUG", msg)
l.t.Helper()
l.t.Log("DEBUG", msg, kv)
}

func (l testLog) Error(msg string, kv ...interface{}) {
l.t.Log("ERROR", msg)
l.t.Helper()
l.t.Log("ERROR", msg, kv)
}

func (l testLog) Warn(msg string, kv ...interface{}) {
l.t.Log("WARN", msg)
l.t.Helper()
l.t.Log("WARN", msg, kv)
}

func obsForTest(t *testing.T) obsTest {
Expand Down
12 changes: 8 additions & 4 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kreconciler
import (
"context"
"errors"
"fmt"

Check failure on line 6 in worker.go

View workflow job for this annotation

GitHub Actions / check

"fmt" imported and not used
"runtime/debug"
"sync"
"time"
Expand All @@ -14,7 +15,7 @@ import (
)

type metrics struct {
queueSizeObserver metric.Int64ObservableUpDownCounter
queueSizeObserver metric.Int64ObservableGauge
dequeue metric.Int64Counter
handleResult metric.Int64Counter
delay metric.Int64Histogram
Expand Down Expand Up @@ -72,19 +73,22 @@ func attrWorkerId(id int) attribute.KeyValue {
}

func decorateMeter(w *worker, meter metric.Meter) error {
queueSizeObserver, err := meter.Int64ObservableUpDownCounter("kreconciler_worker_queue_size",
queueSizeObserver, err := meter.Int64ObservableGauge("kreconciler_worker_queue_size",
metric.WithUnit("{call}"),
metric.WithDescription("The number of outstanding items to reconcile"),
)
if err != nil {
return err
}
w.metrics.queueSizeObserver = queueSizeObserver
meter.RegisterCallback(
_, err = meter.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(queueSizeObserver, int64(w.objectLocks.Size()), metric.WithAttributes(attrWorkerId(w.id)))
return nil
})
}, queueSizeObserver)
if err != nil {
return err
}

enqueue, err := meter.Int64Counter("kreconciler_enqueue",
metric.WithUnit("{call}"),
Expand Down
31 changes: 30 additions & 1 deletion worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/mock"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

type action struct {
Expand Down Expand Up @@ -160,7 +161,8 @@ func TestTraceWorker(t *testing.T) {
mockHandler.On("Apply", mock.Anything, "b").Return(Result{})
mockHandler.On("Apply", mock.Anything, "c").Return(Result{RequeueDelay: 250 * time.Millisecond})

worker, err := newWorker(obs.Observability(), 0, 10, 2, 10, time.Millisecond*100, 0, mockHandler)
ob := obs.Observability()
worker, err := newWorker(ob, 0, 10, 2, 10, time.Millisecond*100, 0, mockHandler)
assert.NoError(t, err)
wg := sync.WaitGroup{}

Expand Down Expand Up @@ -223,6 +225,33 @@ func TestTraceWorker(t *testing.T) {
assert.Equal(t, codes.Error, sr[7].Status().Code)
}

func TestMetricWorker(t *testing.T) {
obs := obsForTest(t)

mockHandler := new(handlerMock)

ob := obs.Observability()
_, err := newWorker(ob, 0, 0, 0, 0, time.Millisecond*100, 0, mockHandler)
assert.NoError(t, err)

reader := obs.MetricReader()
var data metricdata.ResourceMetrics
err = reader.Collect(context.Background(), &data)
assert.NoError(t, err)
assert.NotEmpty(t, data.ScopeMetrics)

callbackMetric := "kreconciler_worker_queue_size"
found := false
for _, scopeMetric := range data.ScopeMetrics {
for _, metric := range scopeMetric.Metrics {
if metric.Name == callbackMetric {
found = true
}
}
}
assert.True(t, found, "metric %s not found", callbackMetric)
}

type handlerMock struct {
mock.Mock
}
Expand Down

0 comments on commit 19677fa

Please sign in to comment.