Skip to content

Commit

Permalink
Add __only_process_by__: list() to the EventBus
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Dec 18, 2024
1 parent 43be578 commit c20bc6b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule Sanbase.EventBus.BillingEventSubscriber do
end
end)

:ok = EventBus.mark_as_completed({__MODULE__, event_shadow})
EventBus.mark_as_completed({__MODULE__, event_shadow})
end

defp do_handle(:update_api_call_limit_table, event_type, event)
Expand Down
20 changes: 20 additions & 0 deletions lib/sanbase/event_bus/event_bus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ defmodule Sanbase.EventBus do
end
end

def handle_event(module, event, event_shadow, state, handle_fun)
when is_function(handle_fun, 0) do
case event do
%{data: %{__only_process_by__: list}} ->
if module in list do
handle_fun.()
else
# If __only_process_by__ is set and the module is not part of it,
# do not process this event, but direclty mark it as processed and
# return the state unchanged
EventBus.mark_as_completed({module, event_shadow})

state
end

_ ->
handle_fun.()
end
end

case Application.compile_env(:sanbase, :env) do
:prod ->
defp handle_invalid_event(params) do
Expand Down
41 changes: 28 additions & 13 deletions lib/sanbase/event_bus/kafka_exporter_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,38 @@ defmodule Sanbase.EventBus.KafkaExporterSubscriber do
end

def handle_call(event_shadow, state) do
maybe_send_to_kafka(event_shadow, :persist_sync)

{:reply, :ok, state}
event = EventBus.fetch_event(event_shadow)

new_state =
Sanbase.EventBus.handle_event(
__MODULE__,
event,
event_shadow,
state,
fn -> handle_event(event, event_shadow, :persist_sync, state) end
)

{:reply, :ok, new_state}
end

def handle_cast(event_shadow, state) do
maybe_send_to_kafka(event_shadow, :persist_async)

{:noreply, state}
event = EventBus.fetch_event(event_shadow)

new_state =
Sanbase.EventBus.handle_event(
__MODULE__,
event,
event_shadow,
state,
fn -> handle_event(event, event_shadow, :persist_async, state) end
)

{:noreply, new_state}
end

defp maybe_send_to_kafka({topic, id} = event_shadow, function)
defp handle_event(event, {_topic, _id} = event_shadow, function, state)
when function in [:persist_sync, :persist_async] do
case EventBus.fetch_event(event_shadow) do
%{data: %{__send_to_kafka__: false}} ->
# This event should not be sent to kafka
:ok

case event do
%{} = event ->
event = restructure_event(event)
kv_tuple = {event.id, Jason.encode!(event)}
Expand All @@ -53,7 +67,8 @@ defmodule Sanbase.EventBus.KafkaExporterSubscriber do
:ok
end

:ok = EventBus.mark_as_completed({__MODULE__, topic, id})
EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end

defp restructure_event(event) do
Expand Down
15 changes: 12 additions & 3 deletions lib/sanbase/event_bus/metric_registry_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do

def handle_cast({_topic, _id} = event_shadow, state) do
event = EventBus.fetch_event(event_shadow)
state = handle_event(event, event_shadow, state)
{:noreply, state}

new_state =
Sanbase.EventBus.handle_event(
__MODULE__,
event,
event_shadow,
state,
fn -> handle_event(event, event_shadow, state) end
)

{:noreply, new_state}
end

# Needed to handle the async tasks
Expand Down Expand Up @@ -96,7 +105,7 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do
end

defp handle_event(_event, event_shadow, state) do
:ok = EventBus.mark_as_completed({__MODULE__, event_shadow})
EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end
end
15 changes: 12 additions & 3 deletions lib/sanbase/event_bus/notification_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@ defmodule Sanbase.EventBus.NotificationSubscriber do

def handle_cast({_topic, _id} = event_shadow, state) do
event = EventBus.fetch_event(event_shadow)
state = handle_event(event, event_shadow, state)
{:noreply, state}

new_state =
Sanbase.EventBus.handle_event(
__MODULE__,
event,
event_shadow,
state,
fn -> handle_event(event, event_shadow, state) end
)

{:noreply, new_state}
end

defp handle_event(_event, event_shadow, state) do
:ok = EventBus.mark_as_completed({__MODULE__, event_shadow})
EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end
end
10 changes: 9 additions & 1 deletion lib/sanbase/event_bus/user_events_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ defmodule Sanbase.EventBus.UserEventsSubscriber do

def handle_cast({_topic, _id} = event_shadow, state) do
event = EventBus.fetch_event(event_shadow)
new_state = handle_event(event, event_shadow, state)

new_state =
Sanbase.EventBus.handle_event(
__MODULE__,
event,
event_shadow,
state,
fn -> handle_event(event, event_shadow, state) end
)

{:noreply, new_state}
end
Expand Down
6 changes: 3 additions & 3 deletions lib/sanbase/metric/registry/change_suggestion.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ defmodule Sanbase.Metric.Registry.ChangeSuggestion do
Node.spawn(node, fn ->
# The caller is sanbase-admin pod. Emit the event to every of the sanbase-web pods
# in the cluster.
# Do not record these events to Kafka, as we'll have the same event duplicated
# 4 times.
# Process the event only by the metric registry subscriber, otherwise the event
# will be recorded multiple in kafka and trigger multiple notifications
Registry.EventEmitter.emit_event({:ok, maybe_struct}, :update_metric_registry, %{
__send_to_kafka__: false
__only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber]
})
end)
end)
Expand Down
2 changes: 1 addition & 1 deletion test/sanbase/event_bus/event_bus_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Sanbase.EventBusTest do
event = EventBus.fetch_event(event_shadow)

Process.send(@receiver_name, event.data.message, [])
EventBus.mark_as_completed(event)
EventBus.mark_as_completed({__MODULE__, event_shadow})

:ok
end
Expand Down

0 comments on commit c20bc6b

Please sign in to comment.