Skip to content

Commit

Permalink
Start working on metric_registry_export
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Jan 8, 2025
1 parent 93009bb commit b0e2e64
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 115 deletions.
3 changes: 3 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ config :sanbase, Sanbase.KafkaExporter, producer: Sanbase.InMemoryKafka.Producer
# with. When running the app locally these values are overridden by the values
# in the .env.dev or dev.secret.exs files, which are ignored by git and not
# published in the repository. Please do not report these as security issues.
# To create the user for your local env:
# In psql: CREATE ROLE postgres WITH LOGIN SUPERUSER PASSWORD 'postgres';
# In the terminal: mix ecto.setup
config :sanbase, Sanbase.Repo,
username: "postgres",
password: "postgres",
Expand Down
99 changes: 74 additions & 25 deletions lib/sanbase/metric/registry/populate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,8 @@ defmodule Sanbase.Metric.Registry.Populate do
if inserts > 0 or updates > 0 do
map = %{inserts_count: inserts, updates_count: updates}

{inserted_metrics, updated_metrics} =
Enum.reduce(list, {[], []}, fn {type, record}, {insert_acc, update_acc} ->
case type do
:insert -> {[record.metric | insert_acc], update_acc}
:update -> {insert_acc, [record.metric | update_acc]}
_ -> {insert_acc, update_acc}
end
end)

# Emit locally event with more data. The distributed events are used only to refresh the
# persistent term, the local node event will also trigger notifications
{inserted_metrics, updated_metrics} = extract_inserted_updated_metrics(list)
# Emit locally event with more data
local_event_map =
Map.merge(map, %{inserted_metrics: inserted_metrics, updated_metrics: updated_metrics})

Expand All @@ -181,22 +172,80 @@ defmodule Sanbase.Metric.Registry.Populate do
%{}
)

# Emit distributed event only to the MetricRegistrySubscriber so it refreshed the stored data
# in persistent term
Node.list()
|> Enum.each(fn node ->
IO.puts("Emitting event :bulk_metric_registry_change to #{node}")

Node.spawn(node, fn ->
Sanbase.Metric.Registry.EventEmitter.emit_event(
{:ok, map},
:bulk_metric_registry_change,
%{__only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber]}
)
end)
end)
# Emit distributed event
emit_distributed_event(map)
else
:ok
end
end

defp extract_inserted_updated_metrics(list) do
{_inserted_metrics, _updated_metrics} =
Enum.reduce(list, {[], []}, fn {type, record}, {insert_acc, update_acc} ->
case type do
:insert -> {[record.metric | insert_acc], update_acc}
:update -> {insert_acc, [record.metric | update_acc]}
_ -> {insert_acc, update_acc}
end
end)
end

defp emit_distributed_event(map) do
Node.list()
|> Enum.each(fn node ->
IO.puts("Emitting event :bulk_metric_registry_change to #{node}")

Node.spawn(node, fn ->
Sanbase.Metric.Registry.EventEmitter.emit_event(
{:ok, map},
:bulk_metric_registry_change,
%{__only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber]}
)
end)
end)
end
end

defmodule Stack do
use GenServer

def start_link(initial_stack \\ []) do
GenServer.start_link(__MODULE__, initial_stack, name: __MODULE__)
end

def push(element) do
GenServer.call(__MODULE__, {:push, element})
end

def pop do
GenServer.call(__MODULE__, :pop)
end

def peek do
GenServer.call(__MODULE__, :peek)
end

def init(initial_stack) do
{:ok, initial_stack}
end

def handle_call({:push, element}, _from, stack) do
{:reply, :ok, [element | stack]}
end

def handle_call(:pop, _from, [h | t]) do
{:reply, h, t}
end

def handle_call(:pop, _from, []) do
{:reply, nil, []}
end

def handle_call(:peek, _from, [h | _] = stack) do
{:reply, h, stack}
end

def handle_call(:peek, _from, []) do
{:reply, nil, []}
end
end
11 changes: 7 additions & 4 deletions lib/sanbase/metric/registry/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,13 @@ defmodule Sanbase.Metric.Registry do
"""
@spec all() :: [t()]
def all() do
query =
from(m in __MODULE__,
order_by: [asc: m.id]
)
query = from(m in __MODULE__, order_by: [asc: m.id])

Sanbase.Repo.all(query)
end

def by_ids(ids) do
query = from(m in __MODULE__, where: m.id in ^ids)

Sanbase.Repo.all(query)
end
Expand Down
119 changes: 119 additions & 0 deletions lib/sanbase/metric/registry/sync.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
defmodule Sanbase.Metric.Registry.Sync do
@moduledoc """
This module is responsible for syncing the metric registry with the external
source of truth.
"""

alias Sanbase.Metric.Registry
@process_name :metric_registry_sync

@spec initiate(list(non_neg_integer())) :: :ok
def initiate(metric_registry_ids) when is_list(metric_registry_ids) do
case ongoing_sync?() do
true ->
{:error, "Sync process is already running"}

false ->
pid = spawn_link(__MODULE__, :sync, [metric_registry_ids])
Process.register(pid, @process_name)
{:ok, pid}
end
end

def initiate_sync(metric_registry_ids) do
with :ok <- check_initiate_env(),
{:ok, content} <- get_sync_content(metric_registry_ids),
:ok <- start_sync(content),
:ok <- record_sync(content) do
{:ok, content}
end
end

def apply_sync(json_content) do
with :ok <- check_apply_env(),
{:ok, list} when is_list(list) <- Jason.decode(json_content) do
for %{} = params <- list do
params = Map.put(params, "sync_status", "synced")

%{"metric" => metric, "data_type" => data_type, "fixed_parameters" => fixed_parameters} =
params

with {:ok, metric} <- Registry.by_name(metric, data_type, fixed_parameters),
{:ok, metric} <- Registry.update(metric, params) do
{:ok, metric}
end
|> dbg()
end
end
end

defp start_sync(content) do
url = get_sync_target_url()

case Req.post(url, json: content) do
{:ok, _} -> :ok
{:error, _} -> {:error, "Failed to sync"}
end
end

defp get_sync_target_url() do
secret = System.get_env("METRIC_REGISTRY_SYNC_SECRET")

case Sanbase.Utils.Config.module_get(Sanbase, :deployment_env) do
"dev" -> "http://localhost:4000/sync_metric_registry/#{secret}"
"stage" -> "http://api.santiment.net/sync_metric_registry/#{secret}"
"prod" -> raise("Cannot initiate sync from PROD")
end
end

defp get_sync_content([]), do: {:error, "Nothing to sync"}

defp get_sync_content(metric_registry_ids) do
structs = Sanbase.Metric.Registry.by_ids(metric_registry_ids)
content = generate_content(structs)
{:ok, content}
end

defp check_initiate_env() do
deployment_env = Sanbase.Utils.Config.module_get(Sanbase, :deployment_env)
database_url = System.get_env("DATABASE_URL")

# If local, the DATABASE_URL should not be set pointing to stage/prod.
# Only work if the local postgres is used
local? = deployment_env == "env" and is_nil(database_url)
stage? = deployment_env == "stage"

if local? or stage? do
:ok
else
{:error, "Can only deploy sync from STAGE to PROD"}
end
end

defp check_apply_env() do
deployment_env = Sanbase.Utils.Config.module_get(Sanbase, :deployment_env)
database_url = System.get_env("DATABASE_URL")

# If local, the DATABASE_URL should not be set pointing to stage/prod.
# Only work if the local postgres is used
local? = deployment_env == "env" and is_nil(database_url)
prod? = deployment_env == "prod"

if local? or prod? do
:ok
else
{:error, "Can only apply sync only on PROD"}
end
end

defp generate_content(structs) do
api_url = SanbaseWeb.Endpoint.api_url()
from_host = URI.parse(api_url).host

%{
metric_registry_structs: Jason.encode!(structs),
generated_at: DateTime.utc_now() |> DateTime.to_iso8601(),
from_host: from_host
}
end
end
11 changes: 11 additions & 0 deletions lib/sanbase/metric/registry/sync_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Sanbase.Metric.Registry.SyncServer do
use GenServer
@name :metric_registry_sync_server
def start_link(_opts) do
GenServer.start_link(__MODULE__, name: @name)
end

def init(_) do
{:ok, %{}}
end
end
46 changes: 45 additions & 1 deletion lib/sanbase_web/controllers/metric_registry_controller.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,57 @@
defmodule SanbaseWeb.MetricRegistryController do
use SanbaseWeb, :controller

def sync(conn, %{"secret" => secret} = params) do
case secret == get_sync_secret() do
true ->
# TODO: Remove
IO.inspect(params)

conn
|> resp(200, "OK")
|> send_resp()

false ->
conn
|> resp(403, "Unauthorized")
|> send_resp()
end
end

def export_json(conn, _params) do
conn
|> resp(200, "ok")
|> resp(200, get_metric_registry_json())
|> send_resp()
end

defp get_metric_registry_json() do
Sanbase.Metric.Registry.all()
|> Enum.take(1)
|> Enum.map(&transform/1)

# |> Enum.map(&Jason.encode!/1)
# |> Enum.intersperse("\n")
end

defp transform(struct) when is_struct(struct) do
struct
|> Map.from_struct()
|> Map.drop([:__meta__, :inserted_at, :updated_at, :change_suggestions])
|> Map.new(fn
{k, v} when is_list(v) ->
{k, Enum.map(v, &transform/1)}

{k, v} when is_map(v) ->
{k, transform(v)}

{k, v} ->
{k, v}
end)
end

defp transform(data), do: data

defp get_sync_secret() do
Sanbase.Utils.Config.module_get(__MODULE__, :sync_secret, "no_secret")
end
end
Loading

0 comments on commit b0e2e64

Please sign in to comment.