Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add application_blocking_latencies client side metric #11349

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,12 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
continue
}
prevRowKey = row.Key()
if !f(row) {

appBlockingLatencyStart := time.Now()
stopStream := f(row)
mt.currOp.incrementAppBlockingLatency(convertToMs(time.Since(appBlockingLatencyStart)))

if !stopStream {
// Cancel and drain stream.
cancel()
for {
Expand Down Expand Up @@ -1593,6 +1598,10 @@ func recordOperationCompletion(mt *builtinMetricsTracer) {
// graph will be less confusing
mt.instrumentRetryCount.Add(mt.ctx, mt.currOp.attemptCount-1, metric.WithAttributes(retryCntAttrs...))
}

// Record application_latencies
appBlockingLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAppBlockingLatencies)
mt.instrumentAppBlockingLatencies.Record(mt.ctx, mt.currOp.appBlockingLatency, metric.WithAttributes(appBlockingLatAttrs...))
}

// gaxInvokeWithRecorder:
Expand Down
62 changes: 42 additions & 20 deletions bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ const (
metricLabelKeyClientUID = "client_uid"

// Metric names
metricNameOperationLatencies = "operation_latencies"
metricNameAttemptLatencies = "attempt_latencies"
metricNameServerLatencies = "server_latencies"
metricNameRetryCount = "retry_count"
metricNameDebugTags = "debug_tags"
metricNameOperationLatencies = "operation_latencies"
metricNameAttemptLatencies = "attempt_latencies"
metricNameServerLatencies = "server_latencies"
metricNameAppBlockingLatencies = "application_latencies"
metricNameRetryCount = "retry_count"
metricNameDebugTags = "debug_tags"

// Metric units
metricUnitMS = "ms"
Expand Down Expand Up @@ -105,6 +106,7 @@ var (
},
recordedPerAttempt: true,
},
metricNameAppBlockingLatencies: {},
metricNameRetryCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
Expand Down Expand Up @@ -146,11 +148,12 @@ type builtinMetricsTracerFactory struct {
// do not change across different function calls on client
clientAttributes []attribute.KeyValue

operationLatencies metric.Float64Histogram
serverLatencies metric.Float64Histogram
attemptLatencies metric.Float64Histogram
retryCount metric.Int64Counter
debugTags metric.Int64Counter
operationLatencies metric.Float64Histogram
serverLatencies metric.Float64Histogram
attemptLatencies metric.Float64Histogram
appBlockingLatencies metric.Float64Histogram
retryCount metric.Int64Counter
debugTags metric.Int64Counter
}

func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
Expand Down Expand Up @@ -250,6 +253,17 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
return err
}

// Create application_latencies
tf.appBlockingLatencies, err = meter.Float64Histogram(
metricNameAppBlockingLatencies,
metric.WithDescription("The latency of the client application consuming available response data."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)
if err != nil {
return err
}

// Create retry_count
tf.retryCount, err = meter.Int64Counter(
metricNameRetryCount,
Expand Down Expand Up @@ -280,11 +294,12 @@ type builtinMetricsTracer struct {
// do not change across different operations on client
clientAttributes []attribute.KeyValue

instrumentOperationLatencies metric.Float64Histogram
instrumentServerLatencies metric.Float64Histogram
instrumentAttemptLatencies metric.Float64Histogram
instrumentRetryCount metric.Int64Counter
instrumentDebugTags metric.Int64Counter
instrumentOperationLatencies metric.Float64Histogram
instrumentServerLatencies metric.Float64Histogram
instrumentAttemptLatencies metric.Float64Histogram
instrumentAppBlockingLatencies metric.Float64Histogram
instrumentRetryCount metric.Int64Counter
instrumentDebugTags metric.Int64Counter

tableName string
method string
Expand All @@ -305,6 +320,8 @@ type opTracer struct {
status string

currAttempt attemptTracer

appBlockingLatency float64
}

func (o *opTracer) setStartTime(t time.Time) {
Expand All @@ -319,6 +336,10 @@ func (o *opTracer) incrementAttemptCount() {
o.attemptCount++
}

func (o *opTracer) incrementAppBlockingLatency(latency float64) {
o.appBlockingLatency += latency
}

// attemptTracer is used to record metrics for each individual attempt of the operation.
// Attempt corresponds to an attempt of an RPC.
type attemptTracer struct {
Expand Down Expand Up @@ -373,11 +394,12 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
currOp: currOpTracer,
clientAttributes: tf.clientAttributes,

instrumentOperationLatencies: tf.operationLatencies,
instrumentServerLatencies: tf.serverLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentRetryCount: tf.retryCount,
instrumentDebugTags: tf.debugTags,
instrumentOperationLatencies: tf.operationLatencies,
instrumentServerLatencies: tf.serverLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentAppBlockingLatencies: tf.appBlockingLatencies,
instrumentRetryCount: tf.retryCount,
instrumentDebugTags: tf.debugTags,

tableName: tableName,
isStreaming: isStreaming,
Expand Down
4 changes: 3 additions & 1 deletion bigtable/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
attribute.String(metricLabelKeyClientUID, clientUID),
attribute.String(metricLabelKeyClientName, clientName),
}
wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies}
wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies, metricNameAppBlockingLatencies}
wantMetricTypesGCM := []string{}
for _, wantMetricName := range wantMetricNamesStdout {
wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName)
}
sort.Strings(wantMetricTypesGCM)

// Reduce sampling period to reduce test run time
origSamplePeriod := defaultSamplePeriod
Expand Down Expand Up @@ -210,6 +211,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil &&
gotClient.metricsTracerFactory.serverLatencies != nil &&
gotClient.metricsTracerFactory.attemptLatencies != nil &&
gotClient.metricsTracerFactory.appBlockingLatencies != nil &&
gotClient.metricsTracerFactory.retryCount != nil
if test.wantBuiltinEnabled != gotNonNilInstruments {
t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled)
Expand Down
Loading