diff --git a/Gemfile b/Gemfile index e2f866f..ac6e3d5 100644 --- a/Gemfile +++ b/Gemfile @@ -9,6 +9,7 @@ gem 'mock_redis' gem 'rspec', '~> 3.0' gem 'connection_pool' +gem 'dry-monitor' gem 'hanami-controller', '~> 1.3' gem 'pry-byebug' gem 'rubocop', '~> 1.21' diff --git a/Gemfile.lock b/Gemfile.lock index d15265e..ce9837b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH idempotency (0.1.2) base64 dry-configurable + dry-monitor msgpack redis @@ -24,6 +25,13 @@ GEM concurrent-ruby (~> 1.0) logger zeitwerk (~> 2.6) + dry-events (1.0.1) + concurrent-ruby (~> 1.0) + dry-core (~> 1.0, < 2) + dry-monitor (1.0.1) + dry-configurable (~> 1.0, < 2) + dry-core (~> 1.0, < 2) + dry-events (~> 1.0, < 2) hanami-controller (1.3.3) hanami-utils (~> 1.3) rack (~> 2.0) @@ -90,6 +98,7 @@ PLATFORMS DEPENDENCIES connection_pool + dry-monitor hanami-controller (~> 1.3) idempotency! mock_redis diff --git a/idempotency.gemspec b/idempotency.gemspec index 38becc4..eb71b8a 100644 --- a/idempotency.gemspec +++ b/idempotency.gemspec @@ -36,6 +36,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'base64' spec.add_dependency 'dry-configurable' + spec.add_dependency 'dry-monitor' spec.add_dependency 'msgpack' spec.add_dependency 'redis' end diff --git a/lib/idempotency.rb b/lib/idempotency.rb index 60db505..9fb3145 100644 --- a/lib/idempotency.rb +++ b/lib/idempotency.rb @@ -5,12 +5,22 @@ require 'base64' require_relative 'idempotency/cache' require_relative 'idempotency/constants' +require_relative 'idempotency/instrumentation/datadog_listener' +require 'dry-monitor' class Idempotency extend Dry::Configurable + def self.notifier + @notifier ||= Dry::Monitor::Notifications.new(:idempotency_gem).tap do |n| + Events::ALL_EVENTS.each { |event| n.register_event(event) } + end + end + setting :redis_pool setting :logger + setting :instrumentation_listeners, default: [] + setting :default_lock_expiry, default: 300 # 5 minutes setting :idempotent_methods, default: %w[POST PUT PATCH DELETE] setting :idempotent_statuses, default: (200..299).to_a + (400..499).to_a @@ -21,16 +31,24 @@ class Idempotency }.to_json end + def self.configure + super + + config.instrumentation_listeners.each(&:setup_subscriptions) + end + def initialize(config: Idempotency.config, cache: Cache.new(config:)) @config = config @cache = cache end - def self.use_cache(request, request_identifiers, lock_duration: nil, &blk) - new.use_cache(request, request_identifiers, lock_duration:, &blk) + def self.use_cache(request, request_identifiers, lock_duration: nil, action: nil, &blk) + new.use_cache(request, request_identifiers, lock_duration:, action:, &blk) end - def use_cache(request, request_identifiers, lock_duration:) # rubocop:disable Metrics/AbcSize + def use_cache(request, request_identifiers, lock_duration: nil, action: nil) # rubocop:disable Metrics/AbcSize + duration_start = Process.clock_gettime(::Process::CLOCK_MONOTONIC) + return yield unless cache_request?(request) request_headers = request.env @@ -42,6 +60,9 @@ def use_cache(request, request_identifiers, lock_duration:) # rubocop:disable Me if (cached_status, cached_headers, cached_body = cached_response) cached_headers.merge!(Constants::HEADER_KEY => idempotency_key) + Idempotency.notifier.instrument(Events::CACHE_HIT, request:, action:, + duration: calculate_duration(duration_start)) + return [cached_status, cached_headers, cached_body] end @@ -55,8 +76,11 @@ def use_cache(request, request_identifiers, lock_duration:) # rubocop:disable Me response_headers.merge!({ Constants::HEADER_KEY => idempotency_key }) end + Idempotency.notifier.instrument(Events::CACHE_MISS, request:, action:, duration: calculate_duration(duration_start)) [response_status, response_headers, response_body] rescue Idempotency::Cache::LockConflict + Idempotency.notifier.instrument(Events::LOCK_CONFLICT, request:, action:, + duration: calculate_duration(duration_start)) [409, {}, config.response_body.concurrent_error] end @@ -64,6 +88,10 @@ def use_cache(request, request_identifiers, lock_duration:) # rubocop:disable Me attr_reader :config, :cache + def calculate_duration(start_time) + Process.clock_gettime(::Process::CLOCK_MONOTONIC) - start_time + end + def calculate_fingerprint(request, idempotency_key, request_identifiers) d = Digest::SHA256.new d << idempotency_key diff --git a/lib/idempotency/constants.rb b/lib/idempotency/constants.rb index 740ccc7..f4ec441 100644 --- a/lib/idempotency/constants.rb +++ b/lib/idempotency/constants.rb @@ -5,4 +5,12 @@ class Constants RACK_HEADER_KEY = 'HTTP_IDEMPOTENCY_KEY' HEADER_KEY = 'Idempotency-Key' end + + module Events + CACHE_HIT = :cache_hit + CACHE_MISS = :cache_miss + LOCK_CONFLICT = :lock_conflict + + ALL_EVENTS = constants.map { |event_id| const_get(event_id) } + end end diff --git a/lib/idempotency/hanami.rb b/lib/idempotency/hanami.rb index 82ccc37..17a1fe9 100644 --- a/lib/idempotency/hanami.rb +++ b/lib/idempotency/hanami.rb @@ -4,9 +4,9 @@ class Idempotency module Hanami - def use_cache(request_identifiers = [], lock_duration: nil) + def use_cache(request_identifiers = [], lock_duration: nil, action: nil) response_status, response_headers, response_body = Idempotency.use_cache( - request, request_identifiers, lock_duration: + request, request_identifiers, lock_duration:, action: ) do yield diff --git a/lib/idempotency/instrumentation/datadog_listener.rb b/lib/idempotency/instrumentation/datadog_listener.rb new file mode 100644 index 0000000..1fabba8 --- /dev/null +++ b/lib/idempotency/instrumentation/datadog_listener.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +require_relative '../../idempotency' + +class Idempotency + module Instrumentation + class DatadogListener + EVENT_NAME_TO_METRIC_MAPPINGS = { + Events::CACHE_HIT => 'idempotency_cache_hit_count', + Events::CACHE_MISS => 'idempotency_cache_miss_count', + Events::LOCK_CONFLICT => 'idempotency_lock_conflict_count' + }.freeze + + def initialize(statsd_client, namespace = nil) + @statsd_client = statsd_client + @namespace = namespace + end + + def setup_subscriptions + EVENT_NAME_TO_METRIC_MAPPINGS.each do |event_name, metric| + Idempotency.notifier.subscribe(event_name) do |event| + send_metric(metric, event.payload) + end + end + end + + private + + attr_reader :namespace, :statsd_client + + def send_metric(metric_name, event_data) + action = event_data[:action] || "#{event_data[:request].request_method}:#{event_data[:request].path}" + tags = ["action:#{action}"] + tags << "namespace:#{@namespace}" if @namespace + + @statsd_client.increment(metric_name, tags:) + @statsd_client.histogram( + 'idempotency_cache_duration_seconds', event_data[:duration], tags: tags + ["metric:#{metric_name}"] + ) + end + end + end +end diff --git a/lib/idempotency/rails.rb b/lib/idempotency/rails.rb index ab0822f..d063e3e 100644 --- a/lib/idempotency/rails.rb +++ b/lib/idempotency/rails.rb @@ -4,9 +4,9 @@ class Idempotency module Rails - def use_cache(request_identifiers = [], lock_duration: nil) + def use_cache(request_identifiers = [], lock_duration: nil, action: nil) response_status, response_headers, response_body = Idempotency.use_cache( - request, request_identifiers, lock_duration: + request, request_identifiers, lock_duration:, action: ) do yield diff --git a/spec/idempotency/instrumentation/datadog_listener_spec.rb b/spec/idempotency/instrumentation/datadog_listener_spec.rb new file mode 100644 index 0000000..300e6a7 --- /dev/null +++ b/spec/idempotency/instrumentation/datadog_listener_spec.rb @@ -0,0 +1,139 @@ +# frozen_string_literal: true + +RSpec.describe Idempotency::Instrumentation::DatadogListener do + let(:statsd_client) { double('statsd_client') } + let(:namespace) { 'test_app' } + let(:listener) { described_class.new(statsd_client, namespace) } + let(:request) do + double( + 'Request', + request_method: 'POST', + path: '/orders/123' + ) + end + let(:event_payload) do + { + request: request, + action: 'POST:/orders/create', + duration: 0.1 + } + end + let(:notifier) do + Dry::Monitor::Notifications.new(:test).tap do |n| + Idempotency::Events::ALL_EVENTS.each { |event| n.register_event(event) } + end + end + + before do + allow(Idempotency).to receive(:notifier).and_return(notifier) + listener.setup_subscriptions + end + + context 'when cache hit event is triggered' do + it 'sends correct metrics' do + expect(statsd_client).to receive(:increment).with( + 'idempotency_cache_hit_count', + tags: ['action:POST:/orders/create', 'namespace:test_app'] + ) + expect(statsd_client).to receive(:histogram).with( + 'idempotency_cache_duration_seconds', + 0.1, + tags: [ + 'action:POST:/orders/create', + 'namespace:test_app', + 'metric:idempotency_cache_hit_count' + ] + ) + + Idempotency.notifier.instrument(Idempotency::Events::CACHE_HIT, event_payload) + end + end + + context 'when cache miss event is triggered' do + it 'sends correct metrics' do + expect(statsd_client).to receive(:increment).with( + 'idempotency_cache_miss_count', + tags: ['action:POST:/orders/create', 'namespace:test_app'] + ) + expect(statsd_client).to receive(:histogram).with( + 'idempotency_cache_duration_seconds', + 0.1, + tags: [ + 'action:POST:/orders/create', + 'namespace:test_app', + 'metric:idempotency_cache_miss_count' + ] + ) + + Idempotency.notifier.instrument(Idempotency::Events::CACHE_MISS, event_payload) + end + end + + context 'when lock conflict event is triggered' do + it 'sends correct metrics' do + expect(statsd_client).to receive(:increment).with( + 'idempotency_lock_conflict_count', + tags: ['action:POST:/orders/create', 'namespace:test_app'] + ) + expect(statsd_client).to receive(:histogram).with( + 'idempotency_cache_duration_seconds', + 0.1, + tags: [ + 'action:POST:/orders/create', + 'namespace:test_app', + 'metric:idempotency_lock_conflict_count' + ] + ) + + Idempotency.notifier.instrument(Idempotency::Events::LOCK_CONFLICT, event_payload) + end + end + + context 'when action is not provided' do + let(:event_payload) do + { + request: request, + duration: 0.1 + } + end + + it 'uses request method and path as action' do + expect(statsd_client).to receive(:increment).with( + 'idempotency_cache_hit_count', + tags: ['action:POST:/orders/123', 'namespace:test_app'] + ) + expect(statsd_client).to receive(:histogram).with( + 'idempotency_cache_duration_seconds', + 0.1, + tags: [ + 'action:POST:/orders/123', + 'namespace:test_app', + 'metric:idempotency_cache_hit_count' + ] + ) + + Idempotency.notifier.instrument(Idempotency::Events::CACHE_HIT, event_payload) + end + end + + context 'when namespace is not provided' do + let(:listener) { described_class.new(statsd_client) } + + it 'does not include namespace tag' do + expect(statsd_client).to receive(:increment).with( + 'idempotency_cache_hit_count', + tags: ['action:POST:/orders/create'] + ) + expect(statsd_client).to receive(:histogram).with( + 'idempotency_cache_duration_seconds', + 0.1, + tags: [ + 'action:POST:/orders/create', + 'metric:idempotency_cache_hit_count' + ] + ) + + Idempotency.notifier.instrument(Idempotency::Events::CACHE_HIT, event_payload) + end + end +end diff --git a/spec/idempotency_spec.rb b/spec/idempotency_spec.rb index 1423214..1d8d60e 100644 --- a/spec/idempotency_spec.rb +++ b/spec/idempotency_spec.rb @@ -6,17 +6,24 @@ it 'has a version number' do expect(Idempotency::VERSION).not_to be nil end + let(:notifier) { instance_double(Dry::Monitor::Notifications) } + + before do + allow(Idempotency).to receive(:notifier).and_return(notifier) + end after { Idempotency.reset_config } describe '#use_cache' do - subject { described_class.new(cache:).use_cache(request, request_ids, lock_duration:, &controller_action) } + subject { described_class.new(cache:).use_cache(request, request_ids, lock_duration:, action:, &controller_action) } + let(:statsd_client) { double('statsd_client') } let(:controller_action) do lambda do response end end + let(:action) { 'POST:/int/orders/order_id' } let(:request_ids) { ['tenant_id'] } let(:request) do double( @@ -59,7 +66,16 @@ cache.set(fingerprint, cached_status, cached_headers, cached_body) end - it { is_expected.to eq(expected_response) } + it 'returns cached response and logs cache hit event' do + expect(notifier).to receive(:instrument).with( + Idempotency::Events::CACHE_HIT, + request: request, + action: action, + duration: be_kind_of(Numeric) + ) + + is_expected.to eq(expected_response) + end end context 'when request is not cached' do @@ -73,6 +89,13 @@ end it 'return response and does not cache response' do + expect(notifier).to receive(:instrument).with( + Idempotency::Events::CACHE_MISS, + request: request, + action: action, + duration: be_kind_of(Numeric) + ) + expect { is_expected.to eq(response) }.not_to(change { cache.get(fingerprint) }) end end @@ -86,7 +109,14 @@ .with(fingerprint, be_a(String)) end - it 'return response and caches response' do + it 'returns response, caches it, and logs cache miss metric' do + expect(notifier).to receive(:instrument).with( + Idempotency::Events::CACHE_MISS, + request: request, + action: action, + duration: be_kind_of(Numeric) + ) + expect { is_expected.to eq(response) } .to change { cache.get(fingerprint) } .from(nil).to([400, {}, { error: 'some_error' }.to_json]) @@ -98,6 +128,13 @@ let(:expected_response) { [409, {}, Idempotency.config.response_body.concurrent_error] } it 'returns 409 error and does not cache request' do + expect(notifier).to receive(:instrument).with( + Idempotency::Events::LOCK_CONFLICT, + request: request, + action: action, + duration: be_kind_of(Numeric) + ) + expect { is_expected.to eq(expected_response) }.not_to(change { cache.get(fingerprint) }) end end