From c20bc6ba2a8c4fca0de0f08f6fb3cd303d7d0f6a Mon Sep 17 00:00:00 2001 From: Ivan Ivanov Date: Wed, 18 Dec 2024 13:58:29 +0200 Subject: [PATCH] Add __only_process_by__: list() to the EventBus --- .../billing_event_subscriber.ex | 2 +- lib/sanbase/event_bus/event_bus.ex | 20 +++++++++ .../event_bus/kafka_exporter_subscriber.ex | 41 +++++++++++++------ .../event_bus/metric_registry_subscriber.ex | 15 +++++-- .../event_bus/notification_subscriber.ex | 15 +++++-- .../event_bus/user_events_subscriber.ex | 10 ++++- .../metric/registry/change_suggestion.ex | 6 +-- test/sanbase/event_bus/event_bus_test.exs | 2 +- 8 files changed, 86 insertions(+), 25 deletions(-) diff --git a/lib/sanbase/billing/subscription/event_handling/billing_event_subscriber.ex b/lib/sanbase/billing/subscription/event_handling/billing_event_subscriber.ex index f284d0bdc1..3d2ad4fd41 100644 --- a/lib/sanbase/billing/subscription/event_handling/billing_event_subscriber.ex +++ b/lib/sanbase/billing/subscription/event_handling/billing_event_subscriber.ex @@ -55,7 +55,7 @@ defmodule Sanbase.EventBus.BillingEventSubscriber do end end) - :ok = EventBus.mark_as_completed({__MODULE__, event_shadow}) + EventBus.mark_as_completed({__MODULE__, event_shadow}) end defp do_handle(:update_api_call_limit_table, event_type, event) diff --git a/lib/sanbase/event_bus/event_bus.ex b/lib/sanbase/event_bus/event_bus.ex index 6133d43346..150f8eeab8 100644 --- a/lib/sanbase/event_bus/event_bus.ex +++ b/lib/sanbase/event_bus/event_bus.ex @@ -88,6 +88,26 @@ defmodule Sanbase.EventBus do end end + def handle_event(module, event, event_shadow, state, handle_fun) + when is_function(handle_fun, 0) do + case event do + %{data: %{__only_process_by__: list}} -> + if module in list do + handle_fun.() + else + # If __only_process_by__ is set and the module is not part of it, + # do not process this event, but direclty mark it as processed and + # return the state unchanged + EventBus.mark_as_completed({module, event_shadow}) + + state + end + + _ -> + handle_fun.() + end + end + case Application.compile_env(:sanbase, :env) do :prod -> defp handle_invalid_event(params) do diff --git a/lib/sanbase/event_bus/kafka_exporter_subscriber.ex b/lib/sanbase/event_bus/kafka_exporter_subscriber.ex index b6b249e2c0..9ef6368a32 100644 --- a/lib/sanbase/event_bus/kafka_exporter_subscriber.ex +++ b/lib/sanbase/event_bus/kafka_exporter_subscriber.ex @@ -20,24 +20,38 @@ defmodule Sanbase.EventBus.KafkaExporterSubscriber do end def handle_call(event_shadow, state) do - maybe_send_to_kafka(event_shadow, :persist_sync) - - {:reply, :ok, state} + event = EventBus.fetch_event(event_shadow) + + new_state = + Sanbase.EventBus.handle_event( + __MODULE__, + event, + event_shadow, + state, + fn -> handle_event(event, event_shadow, :persist_sync, state) end + ) + + {:reply, :ok, new_state} end def handle_cast(event_shadow, state) do - maybe_send_to_kafka(event_shadow, :persist_async) - - {:noreply, state} + event = EventBus.fetch_event(event_shadow) + + new_state = + Sanbase.EventBus.handle_event( + __MODULE__, + event, + event_shadow, + state, + fn -> handle_event(event, event_shadow, :persist_async, state) end + ) + + {:noreply, new_state} end - defp maybe_send_to_kafka({topic, id} = event_shadow, function) + defp handle_event(event, {_topic, _id} = event_shadow, function, state) when function in [:persist_sync, :persist_async] do - case EventBus.fetch_event(event_shadow) do - %{data: %{__send_to_kafka__: false}} -> - # This event should not be sent to kafka - :ok - + case event do %{} = event -> event = restructure_event(event) kv_tuple = {event.id, Jason.encode!(event)} @@ -53,7 +67,8 @@ defmodule Sanbase.EventBus.KafkaExporterSubscriber do :ok end - :ok = EventBus.mark_as_completed({__MODULE__, topic, id}) + EventBus.mark_as_completed({__MODULE__, event_shadow}) + state end defp restructure_event(event) do diff --git a/lib/sanbase/event_bus/metric_registry_subscriber.ex b/lib/sanbase/event_bus/metric_registry_subscriber.ex index a0992d94b6..195b584912 100644 --- a/lib/sanbase/event_bus/metric_registry_subscriber.ex +++ b/lib/sanbase/event_bus/metric_registry_subscriber.ex @@ -25,8 +25,17 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do def handle_cast({_topic, _id} = event_shadow, state) do event = EventBus.fetch_event(event_shadow) - state = handle_event(event, event_shadow, state) - {:noreply, state} + + new_state = + Sanbase.EventBus.handle_event( + __MODULE__, + event, + event_shadow, + state, + fn -> handle_event(event, event_shadow, state) end + ) + + {:noreply, new_state} end # Needed to handle the async tasks @@ -96,7 +105,7 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do end defp handle_event(_event, event_shadow, state) do - :ok = EventBus.mark_as_completed({__MODULE__, event_shadow}) + EventBus.mark_as_completed({__MODULE__, event_shadow}) state end end diff --git a/lib/sanbase/event_bus/notification_subscriber.ex b/lib/sanbase/event_bus/notification_subscriber.ex index c5006341b1..deb3263ecd 100644 --- a/lib/sanbase/event_bus/notification_subscriber.ex +++ b/lib/sanbase/event_bus/notification_subscriber.ex @@ -19,12 +19,21 @@ defmodule Sanbase.EventBus.NotificationSubscriber do def handle_cast({_topic, _id} = event_shadow, state) do event = EventBus.fetch_event(event_shadow) - state = handle_event(event, event_shadow, state) - {:noreply, state} + + new_state = + Sanbase.EventBus.handle_event( + __MODULE__, + event, + event_shadow, + state, + fn -> handle_event(event, event_shadow, state) end + ) + + {:noreply, new_state} end defp handle_event(_event, event_shadow, state) do - :ok = EventBus.mark_as_completed({__MODULE__, event_shadow}) + EventBus.mark_as_completed({__MODULE__, event_shadow}) state end end diff --git a/lib/sanbase/event_bus/user_events_subscriber.ex b/lib/sanbase/event_bus/user_events_subscriber.ex index 169f0a51f3..6db2b0b8d3 100644 --- a/lib/sanbase/event_bus/user_events_subscriber.ex +++ b/lib/sanbase/event_bus/user_events_subscriber.ex @@ -21,7 +21,15 @@ defmodule Sanbase.EventBus.UserEventsSubscriber do def handle_cast({_topic, _id} = event_shadow, state) do event = EventBus.fetch_event(event_shadow) - new_state = handle_event(event, event_shadow, state) + + new_state = + Sanbase.EventBus.handle_event( + __MODULE__, + event, + event_shadow, + state, + fn -> handle_event(event, event_shadow, state) end + ) {:noreply, new_state} end diff --git a/lib/sanbase/metric/registry/change_suggestion.ex b/lib/sanbase/metric/registry/change_suggestion.ex index 6c3b5dee6d..1116033484 100644 --- a/lib/sanbase/metric/registry/change_suggestion.ex +++ b/lib/sanbase/metric/registry/change_suggestion.ex @@ -110,10 +110,10 @@ defmodule Sanbase.Metric.Registry.ChangeSuggestion do Node.spawn(node, fn -> # The caller is sanbase-admin pod. Emit the event to every of the sanbase-web pods # in the cluster. - # Do not record these events to Kafka, as we'll have the same event duplicated - # 4 times. + # Process the event only by the metric registry subscriber, otherwise the event + # will be recorded multiple in kafka and trigger multiple notifications Registry.EventEmitter.emit_event({:ok, maybe_struct}, :update_metric_registry, %{ - __send_to_kafka__: false + __only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber] }) end) end) diff --git a/test/sanbase/event_bus/event_bus_test.exs b/test/sanbase/event_bus/event_bus_test.exs index 78a46d4f3a..d3e6b798bc 100644 --- a/test/sanbase/event_bus/event_bus_test.exs +++ b/test/sanbase/event_bus/event_bus_test.exs @@ -9,7 +9,7 @@ defmodule Sanbase.EventBusTest do event = EventBus.fetch_event(event_shadow) Process.send(@receiver_name, event.data.message, []) - EventBus.mark_as_completed(event) + EventBus.mark_as_completed({__MODULE__, event_shadow}) :ok end