Skip to content

Commit

Permalink
Emit metrics via datadog
Browse files Browse the repository at this point in the history
  • Loading branch information
vu-hoang-kaligo committed Jan 10, 2025
1 parent afbd363 commit 0ee5eed
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 10 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ gem 'mock_redis'
gem 'rspec', '~> 3.0'

gem 'connection_pool'
gem 'dry-monitor'
gem 'hanami-controller', '~> 1.3'
gem 'pry-byebug'
gem 'rubocop', '~> 1.21'
9 changes: 9 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PATH
idempotency (0.1.2)
base64
dry-configurable
dry-monitor
msgpack
redis

Expand All @@ -24,6 +25,13 @@ GEM
concurrent-ruby (~> 1.0)
logger
zeitwerk (~> 2.6)
dry-events (1.0.1)
concurrent-ruby (~> 1.0)
dry-core (~> 1.0, < 2)
dry-monitor (1.0.1)
dry-configurable (~> 1.0, < 2)
dry-core (~> 1.0, < 2)
dry-events (~> 1.0, < 2)
hanami-controller (1.3.3)
hanami-utils (~> 1.3)
rack (~> 2.0)
Expand Down Expand Up @@ -90,6 +98,7 @@ PLATFORMS

DEPENDENCIES
connection_pool
dry-monitor
hanami-controller (~> 1.3)
idempotency!
mock_redis
Expand Down
1 change: 1 addition & 0 deletions idempotency.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Gem::Specification.new do |spec|

spec.add_dependency 'base64'
spec.add_dependency 'dry-configurable'
spec.add_dependency 'dry-monitor'
spec.add_dependency 'msgpack'
spec.add_dependency 'redis'
end
34 changes: 31 additions & 3 deletions lib/idempotency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@
require 'base64'
require_relative 'idempotency/cache'
require_relative 'idempotency/constants'
require_relative 'idempotency/instrumentation/datadog_listener'
require 'dry-monitor'

class Idempotency
extend Dry::Configurable

def self.notifier
@notifier ||= Dry::Monitor::Notifications.new(:idempotency_gem).tap do |n|
Events::ALL_EVENTS.each { |event| n.register_event(event) }
end
end

setting :redis_pool
setting :logger
setting :instrumentation_listeners, default: []

setting :default_lock_expiry, default: 300 # 5 minutes
setting :idempotent_methods, default: %w[POST PUT PATCH DELETE]
setting :idempotent_statuses, default: (200..299).to_a + (400..499).to_a
Expand All @@ -21,16 +31,24 @@ class Idempotency
}.to_json
end

def self.configure
super

config.instrumentation_listeners.each(&:setup_subscriptions)
end

def initialize(config: Idempotency.config, cache: Cache.new(config:))
@config = config
@cache = cache
end

def self.use_cache(request, request_identifiers, lock_duration: nil, &blk)
new.use_cache(request, request_identifiers, lock_duration:, &blk)
def self.use_cache(request, request_identifiers, lock_duration: nil, action: nil, &blk)
new.use_cache(request, request_identifiers, lock_duration:, action:, &blk)
end

def use_cache(request, request_identifiers, lock_duration:) # rubocop:disable Metrics/AbcSize
def use_cache(request, request_identifiers, lock_duration: nil, action: nil) # rubocop:disable Metrics/AbcSize
duration_start = Process.clock_gettime(::Process::CLOCK_MONOTONIC)

return yield unless cache_request?(request)

request_headers = request.env
Expand All @@ -42,6 +60,9 @@ def use_cache(request, request_identifiers, lock_duration:) # rubocop:disable Me

if (cached_status, cached_headers, cached_body = cached_response)
cached_headers.merge!(Constants::HEADER_KEY => idempotency_key)
Idempotency.notifier.instrument(Events::CACHE_HIT, request:, action:,
duration: calculate_duration(duration_start))

return [cached_status, cached_headers, cached_body]
end

Expand All @@ -55,15 +76,22 @@ def use_cache(request, request_identifiers, lock_duration:) # rubocop:disable Me
response_headers.merge!({ Constants::HEADER_KEY => idempotency_key })
end

Idempotency.notifier.instrument(Events::CACHE_MISS, request:, action:, duration: calculate_duration(duration_start))
[response_status, response_headers, response_body]
rescue Idempotency::Cache::LockConflict
Idempotency.notifier.instrument(Events::LOCK_CONFLICT, request:, action:,
duration: calculate_duration(duration_start))
[409, {}, config.response_body.concurrent_error]
end

private

attr_reader :config, :cache

def calculate_duration(start_time)
Process.clock_gettime(::Process::CLOCK_MONOTONIC) - start_time
end

def calculate_fingerprint(request, idempotency_key, request_identifiers)
d = Digest::SHA256.new
d << idempotency_key
Expand Down
8 changes: 8 additions & 0 deletions lib/idempotency/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,12 @@ class Constants
RACK_HEADER_KEY = 'HTTP_IDEMPOTENCY_KEY'
HEADER_KEY = 'Idempotency-Key'
end

module Events
CACHE_HIT = :cache_hit
CACHE_MISS = :cache_miss
LOCK_CONFLICT = :lock_conflict

ALL_EVENTS = constants.map { |event_id| const_get(event_id) }
end
end
4 changes: 2 additions & 2 deletions lib/idempotency/hanami.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

class Idempotency
module Hanami
def use_cache(request_identifiers = [], lock_duration: nil)
def use_cache(request_identifiers = [], lock_duration: nil, action: nil)
response_status, response_headers, response_body = Idempotency.use_cache(
request, request_identifiers, lock_duration:
request, request_identifiers, lock_duration:, action:
) do
yield

Expand Down
43 changes: 43 additions & 0 deletions lib/idempotency/instrumentation/datadog_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

require_relative '../../idempotency'

class Idempotency
module Instrumentation
class DatadogListener
EVENT_NAME_TO_METRIC_MAPPINGS = {
Events::CACHE_HIT => 'idempotency_cache_hit_count',
Events::CACHE_MISS => 'idempotency_cache_miss_count',
Events::LOCK_CONFLICT => 'idempotency_lock_conflict_count'
}.freeze

def initialize(statsd_client, namespace = nil)
@statsd_client = statsd_client
@namespace = namespace
end

def setup_subscriptions
EVENT_NAME_TO_METRIC_MAPPINGS.each do |event_name, metric|
Idempotency.notifier.subscribe(event_name) do |event|
send_metric(metric, event.payload)
end
end
end

private

attr_reader :namespace, :statsd_client

def send_metric(metric_name, event_data)
action = event_data[:action] || "#{event_data[:request].request_method}:#{event_data[:request].path}"
tags = ["action:#{action}"]
tags << "namespace:#{@namespace}" if @namespace

@statsd_client.increment(metric_name, tags:)
@statsd_client.histogram(
'idempotency_cache_duration_seconds', event_data[:duration], tags: tags + ["metric:#{metric_name}"]
)
end
end
end
end
4 changes: 2 additions & 2 deletions lib/idempotency/rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

class Idempotency
module Rails
def use_cache(request_identifiers = [], lock_duration: nil)
def use_cache(request_identifiers = [], lock_duration: nil, action: nil)
response_status, response_headers, response_body = Idempotency.use_cache(
request, request_identifiers, lock_duration:
request, request_identifiers, lock_duration:, action:
) do
yield

Expand Down
139 changes: 139 additions & 0 deletions spec/idempotency/instrumentation/datadog_listener_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# frozen_string_literal: true

RSpec.describe Idempotency::Instrumentation::DatadogListener do
let(:statsd_client) { double('statsd_client') }
let(:namespace) { 'test_app' }
let(:listener) { described_class.new(statsd_client, namespace) }
let(:request) do
double(
'Request',
request_method: 'POST',
path: '/orders/123'
)
end
let(:event_payload) do
{
request: request,
action: 'POST:/orders/create',
duration: 0.1
}
end
let(:notifier) do
Dry::Monitor::Notifications.new(:test).tap do |n|
Idempotency::Events::ALL_EVENTS.each { |event| n.register_event(event) }
end
end

before do
allow(Idempotency).to receive(:notifier).and_return(notifier)
listener.setup_subscriptions
end

context 'when cache hit event is triggered' do
it 'sends correct metrics' do
expect(statsd_client).to receive(:increment).with(
'idempotency_cache_hit_count',
tags: ['action:POST:/orders/create', 'namespace:test_app']
)
expect(statsd_client).to receive(:histogram).with(
'idempotency_cache_duration_seconds',
0.1,
tags: [
'action:POST:/orders/create',
'namespace:test_app',
'metric:idempotency_cache_hit_count'
]
)

Idempotency.notifier.instrument(Idempotency::Events::CACHE_HIT, event_payload)
end
end

context 'when cache miss event is triggered' do
it 'sends correct metrics' do
expect(statsd_client).to receive(:increment).with(
'idempotency_cache_miss_count',
tags: ['action:POST:/orders/create', 'namespace:test_app']
)
expect(statsd_client).to receive(:histogram).with(
'idempotency_cache_duration_seconds',
0.1,
tags: [
'action:POST:/orders/create',
'namespace:test_app',
'metric:idempotency_cache_miss_count'
]
)

Idempotency.notifier.instrument(Idempotency::Events::CACHE_MISS, event_payload)
end
end

context 'when lock conflict event is triggered' do
it 'sends correct metrics' do
expect(statsd_client).to receive(:increment).with(
'idempotency_lock_conflict_count',
tags: ['action:POST:/orders/create', 'namespace:test_app']
)
expect(statsd_client).to receive(:histogram).with(
'idempotency_cache_duration_seconds',
0.1,
tags: [
'action:POST:/orders/create',
'namespace:test_app',
'metric:idempotency_lock_conflict_count'
]
)

Idempotency.notifier.instrument(Idempotency::Events::LOCK_CONFLICT, event_payload)
end
end

context 'when action is not provided' do
let(:event_payload) do
{
request: request,
duration: 0.1
}
end

it 'uses request method and path as action' do
expect(statsd_client).to receive(:increment).with(
'idempotency_cache_hit_count',
tags: ['action:POST:/orders/123', 'namespace:test_app']
)
expect(statsd_client).to receive(:histogram).with(
'idempotency_cache_duration_seconds',
0.1,
tags: [
'action:POST:/orders/123',
'namespace:test_app',
'metric:idempotency_cache_hit_count'
]
)

Idempotency.notifier.instrument(Idempotency::Events::CACHE_HIT, event_payload)
end
end

context 'when namespace is not provided' do
let(:listener) { described_class.new(statsd_client) }

it 'does not include namespace tag' do
expect(statsd_client).to receive(:increment).with(
'idempotency_cache_hit_count',
tags: ['action:POST:/orders/create']
)
expect(statsd_client).to receive(:histogram).with(
'idempotency_cache_duration_seconds',
0.1,
tags: [
'action:POST:/orders/create',
'metric:idempotency_cache_hit_count'
]
)

Idempotency.notifier.instrument(Idempotency::Events::CACHE_HIT, event_payload)
end
end
end
Loading

0 comments on commit 0ee5eed

Please sign in to comment.