diff --git a/config/config.exs b/config/config.exs index 4ae689bdf..490b0bc17 100644 --- a/config/config.exs +++ b/config/config.exs @@ -308,6 +308,8 @@ config :ex_audit, ], primitive_structs: [DateTime, NaiveDateTime, Date] +config :sanbase, Sanbase.Metric.Registry.Sync, sync_secret: "secret_only_on_prod" + # Import configs import_config "ueberauth_config.exs" import_config "scrapers_config.exs" diff --git a/config/runtime.exs b/config/runtime.exs index 9b2e68b6a..f70b00392 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -142,4 +142,7 @@ if config_env() == :prod do cron: [enabled: true] ] ] + + config :sanbase, Sanbase.Metric.Registry.Sync, + sync_secret: System.get_env("METRIC_REGISTRY_SYNC_SECRET") end diff --git a/config/test.exs b/config/test.exs index e7cd39227..51ea917c4 100644 --- a/config/test.exs +++ b/config/test.exs @@ -12,7 +12,9 @@ config :sanbase, Sanbase, url: {:system, "SANBASE_URL", ""} config :sanbase, SanbaseWeb.Endpoint, http: [port: 4001], - server: true + server: true, + website_url: {:system, "WEBSITE_URL", "http://localhost:4001"}, + backend_url: {:system, "BACKEND_URL", "http://localhost:4001"} config :ex_aws, access_key_id: "test_id", diff --git a/lib/sanbase/metric/registry/registry.ex b/lib/sanbase/metric/registry/registry.ex index 28b5b1959..db5570734 100644 --- a/lib/sanbase/metric/registry/registry.ex +++ b/lib/sanbase/metric/registry/registry.ex @@ -219,7 +219,7 @@ defmodule Sanbase.Metric.Registry do end end - def by_name(metric, data_type, fixed_parameters \\ %{}) do + def by_name(metric, data_type \\ "timeseries", fixed_parameters \\ %{}) do query = from(mr in __MODULE__, where: diff --git a/lib/sanbase/metric/registry/sync.ex b/lib/sanbase/metric/registry/sync.ex index 2d2e0ecd7..431f1047d 100644 --- a/lib/sanbase/metric/registry/sync.ex +++ b/lib/sanbase/metric/registry/sync.ex @@ -5,73 +5,139 @@ defmodule Sanbase.Metric.Registry.Sync do """ alias Sanbase.Metric.Registry - @process_name :metric_registry_sync - - @spec initiate(list(non_neg_integer())) :: :ok - def initiate(metric_registry_ids) when is_list(metric_registry_ids) do - case ongoing_sync?() do - true -> - {:error, "Sync process is already running"} - - false -> - pid = spawn_link(__MODULE__, :sync, [metric_registry_ids]) - Process.register(pid, @process_name) - {:ok, pid} + alias Sanbase.Utils.Config + + def by_uuid(uuid), do: Registry.SyncSchema.by_uuid(uuid) + + @doc ~s""" + + """ + @spec sync(list(non_neg_integer())) :: :ok + def sync(metric_registry_ids) when is_list(metric_registry_ids) do + with :ok <- no_running_syncs(), + :ok <- check_initiate_env(), + {:ok, content} <- get_sync_content(metric_registry_ids), + {:ok, sync} <- store_sync_in_db(content), + :ok <- start_sync(sync), + {:ok, sync} <- Registry.SyncSchema.update_status(sync, "executing") do + {:ok, sync} end + |> dbg() end - def initiate_sync(metric_registry_ids) do - with :ok <- check_initiate_env(), - {:ok, content} <- get_sync_content(metric_registry_ids), - :ok <- start_sync(content), - :ok <- record_sync(content) do - {:ok, content} + def mark_sync_as_finished(sync_uuid) do + with {:ok, sync} <- Registry.SyncSchema.by_uuid(sync_uuid), + {:ok, metric_registry_ids} <- extract_metric_registry_ids(sync), + :ok <- mark_metric_registries_as_synced(metric_registry_ids), + :ok <- Registry.SyncSchema.update_status(sync, "completed") do + {:ok, sync} end end - def apply_sync(json_content) do + def apply_sync(json) do with :ok <- check_apply_env(), - {:ok, list} when is_list(list) <- Jason.decode(json_content) do - for %{} = params <- list do - params = Map.put(params, "sync_status", "synced") - - %{"metric" => metric, "data_type" => data_type, "fixed_parameters" => fixed_parameters} = - params - - with {:ok, metric} <- Registry.by_name(metric, data_type, fixed_parameters), - {:ok, metric} <- Registry.update(metric, params) do - {:ok, metric} - end - |> dbg() - end + {:ok, map} when is_map(map) <- Jason.decode(json), + list when is_list(list) <- map["content"], + :ok <- do_apply_sync_content(list), + :ok <- confirm_sync_completed(map["confirmation_endpoint"]) do + :ok end end - defp start_sync(content) do - url = get_sync_target_url() + # Private functions + + defp confirm_sync_completed(url) do + Req.post(url) + end + + defp do_apply_sync_content(list) do + list + |> Enum.reduce(Ecto.Multi.new(), fn params, multi -> + multi_metric_registry_update(multi, params) + end) + end + + defp multi_metric_registry_update(multi, params) do + params = Map.put(params, "sync_status", "synced") + + %{"metric" => metric, "data_type" => data_type, "fixed_parameters" => fixed_parameters} = + params + + with {:ok, metric_registry} <- Registry.by_name(metric, data_type, fixed_parameters) do + changeset = Registry.changeset(metric, params) + Ecto.Multi.update(multi, metric_registry.id, changeset) + end + end - case Req.post(url, json: content) do - {:ok, _} -> :ok - {:error, _} -> {:error, "Failed to sync"} + defp no_running_syncs() do + case Registry.SyncSchema.all_with_status("executing") do + [] -> :ok + [_ | _] -> {:error, "Sync process is already running"} end end + defp start_sync(sync) do + url = get_sync_target_url() |> IO.inspect() + + json = %{ + "sync_uuid" => sync.uuid, + "content" => sync.content, + "generated_at" => DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601(), + "confirmation_endpoint" => get_confirmation_endpoint(sync) + } + + case Req.post(url, json: json) do + {:ok, %Req.Response{status: 200}} -> + # The status is set to executing and it will be set to completed + # once the other side responds to the /mark_metric_registry_sync_as_finished + # endpoint + {:ok, _} = Registry.SyncSchema.update_status(sync, "executing") + :ok + + {:ok, %Req.Response{status: status, body: body}} -> + error = "Failed to sync, received status code: #{status}. Body: #{body}" + {:ok, _} = Registry.SyncSchema.update_status(sync, "failed", error) + {:error, error} + + {:error, reason} -> + error = inspect(reason) + {:ok, _} = Registry.SyncSchema.update_status(sync, "failed", error) + {:error, "Failed to sync, error: #{error}"} + end + end + + defp mark_metric_registries_as_synced(metric_registry_ids) do + for id <- metric_registry_ids do + with {:ok, metric} <- Registry.by_id(id), + {:ok, _metric} <- Registry.update(metric, %{"sync_status" => "synced"}) do + :ok + end + end + end + + defp store_sync_in_db(content) do + Registry.SyncSchema.create(content) + end + defp get_sync_target_url() do - secret = System.get_env("METRIC_REGISTRY_SYNC_SECRET") + secret = Config.module_get(Sanbase.Metric.Registry.Sync, :sync_secret) + deployment_env = Config.module_get(Sanbase, :deployment_env) + port = Config.module_get(SanbaseWeb.Endpoint, :port) - case Sanbase.Utils.Config.module_get(Sanbase, :deployment_env) do - "dev" -> "http://localhost:4000/sync_metric_registry/#{secret}" - "stage" -> "http://api.santiment.net/sync_metric_registry/#{secret}" + case deployment_env do + "dev" -> "http://localhost:#{port}/sync_metric_registry?secret=#{secret}" "prod" -> raise("Cannot initiate sync from PROD") + "stage" -> "https://api.santiment.net/sync_metric_registry?secret=#{secret}" end end defp get_sync_content([]), do: {:error, "Nothing to sync"} defp get_sync_content(metric_registry_ids) do - structs = Sanbase.Metric.Registry.by_ids(metric_registry_ids) - content = generate_content(structs) - {:ok, content} + case Sanbase.Metric.Registry.by_ids(metric_registry_ids) do + [] -> {:error, "Nothing to sync"} + structs -> {:ok, Jason.encode!(structs)} + end end defp check_initiate_env() do @@ -80,7 +146,7 @@ defmodule Sanbase.Metric.Registry.Sync do # If local, the DATABASE_URL should not be set pointing to stage/prod. # Only work if the local postgres is used - local? = deployment_env == "env" and is_nil(database_url) + local? = deployment_env in ["dev", "test"] and is_nil(database_url) stage? = deployment_env == "stage" if local? or stage? do @@ -96,24 +162,28 @@ defmodule Sanbase.Metric.Registry.Sync do # If local, the DATABASE_URL should not be set pointing to stage/prod. # Only work if the local postgres is used - local? = deployment_env == "env" and is_nil(database_url) + local? = deployment_env in ["dev", "test"] and is_nil(database_url) prod? = deployment_env == "prod" if local? or prod? do :ok else - {:error, "Can only apply sync only on PROD"} + {:error, "Can apply sync only on PROD"} end end - defp generate_content(structs) do - api_url = SanbaseWeb.Endpoint.api_url() - from_host = URI.parse(api_url).host + defp extract_metric_registry_ids(sync) do + sync.content + |> Jason.decode!() + |> Enum.map(& &1["id"]) + end + + defp get_confirmation_endpoint(sync) do + secret = Sanbase.Utils.Config.module_get(Sanbase.Metric.Registry.Sync, :sync_secret) - %{ - metric_registry_structs: Jason.encode!(structs), - generated_at: DateTime.utc_now() |> DateTime.to_iso8601(), - from_host: from_host - } + SanbaseWeb.Endpoint.backend_url() + |> URI.parse() + |> URI.append_path("/mark_metric_registry_sync_as_finished/#{sync.uuid}?secret=#{secret}") + |> URI.to_string() end end diff --git a/lib/sanbase/metric/registry/sync_schema.ex b/lib/sanbase/metric/registry/sync_schema.ex new file mode 100644 index 000000000..534e61e9b --- /dev/null +++ b/lib/sanbase/metric/registry/sync_schema.ex @@ -0,0 +1,47 @@ +defmodule Sanbase.Metric.Registry.SyncSchema do + use Ecto.Schema + + import Ecto.Query + import Ecto.Changeset + + schema "metric_registry_syncs" do + field(:uuid, :string) + field(:status, :string) + field(:content, :string) + field(:errors, :string) + + timestamps() + end + + def changeset(%__MODULE__{} = sync, attrs) do + sync + |> cast(attrs, [:uuid, :content, :status, :errors]) + |> validate_inclusion(:status, ["scheduled", "executing", "completed", "failed"]) + end + + def create(content) do + %__MODULE__{} + |> changeset(%{content: content, status: "scheduled", uuid: Ecto.UUID.generate()}) + |> Sanbase.Repo.insert() + end + + def update_status(%__MODULE__{} = struct, status, errors \\ nil) do + struct + |> changeset(%{status: status, errors: errors}) + |> Sanbase.Repo.update() + end + + def by_uuid(uuid) do + query = from(sync in __MODULE__, where: sync.uuid == ^uuid) + + case Sanbase.Repo.one(query) do + nil -> {:error, "Sync with uuid #{uuid} not found"} + sync -> {:ok, sync} + end + end + + def all_with_status(status) do + from(sync in __MODULE__, where: sync.status == ^status) + |> Sanbase.Repo.all() + end +end diff --git a/lib/sanbase/metric/registry/sync_server.ex b/lib/sanbase/metric/registry/sync_server.ex deleted file mode 100644 index b741e5d46..000000000 --- a/lib/sanbase/metric/registry/sync_server.ex +++ /dev/null @@ -1,11 +0,0 @@ -defmodule Sanbase.Metric.Registry.SyncServer do - use GenServer - @name :metric_registry_sync_server - def start_link(_opts) do - GenServer.start_link(__MODULE__, name: @name) - end - - def init(_) do - {:ok, %{}} - end -end diff --git a/lib/sanbase/utils/config.ex b/lib/sanbase/utils/config.ex index 82cf66bbc..6e78cb014 100644 --- a/lib/sanbase/utils/config.ex +++ b/lib/sanbase/utils/config.ex @@ -3,9 +3,12 @@ defmodule Sanbase.Utils.Config do Module for reading configuration values from the application environment. """ - def module_get(module, key) do + def module_get(module, key_or_keys) + when is_atom(key_or_keys) or is_list(key_or_keys) do + keys = List.wrap(key_or_keys) + Application.fetch_env!(:sanbase, module) - |> Keyword.get(key) + |> get_in(keys) |> parse_config_value() end @@ -15,9 +18,12 @@ defmodule Sanbase.Utils.Config do |> parse_config_value() end - def module_get(module, key, default) do + def module_get(module, key_or_keys, default) + when is_atom(key_or_keys) or is_list(key_or_keys) do + keys = List.wrap(key_or_keys) + case Application.fetch_env(:sanbase, module) do - {:ok, env} -> env |> Keyword.get(key, default) + {:ok, env} -> get_in(env, keys) _ -> default end |> parse_config_value() diff --git a/lib/sanbase_web/controllers/metric_registry_controller.ex b/lib/sanbase_web/controllers/metric_registry_controller.ex index 57b7d2fa8..1712d67db 100644 --- a/lib/sanbase_web/controllers/metric_registry_controller.ex +++ b/lib/sanbase_web/controllers/metric_registry_controller.ex @@ -5,7 +5,11 @@ defmodule SanbaseWeb.MetricRegistryController do case secret == get_sync_secret() do true -> # TODO: Remove - IO.inspect(params) + + {:ok, data, _conn_details} = Plug.Conn.read_body(conn) + + IO.inspect({params}) + IO.inspect({data}) conn |> resp(200, "OK") @@ -18,6 +22,28 @@ defmodule SanbaseWeb.MetricRegistryController do end end + def mark_sync_as_finished(conn, %{"sync_uuid" => sync_uuid, "secret" => secret}) do + case secret == get_sync_secret() do + true -> + case Sanbase.Metric.Registry.Sync.mark_sync_as_finished(sync_uuid) do + {:ok, _} -> + conn + |> resp(200, "OK") + |> send_resp() + + {:error, reason} -> + conn + |> resp(500, "Error marking sync as finished. Reason: #{reason}") + |> send_resp() + end + + false -> + conn + |> resp(403, "Unauthorized") + |> send_resp() + end + end + def export_json(conn, _params) do conn |> resp(200, get_metric_registry_json()) @@ -52,6 +78,6 @@ defmodule SanbaseWeb.MetricRegistryController do defp transform(data), do: data defp get_sync_secret() do - Sanbase.Utils.Config.module_get(__MODULE__, :sync_secret, "no_secret") + Sanbase.Utils.Config.module_get(Sanbase.Metric.Registry.Sync, :sync_secret) end end diff --git a/lib/sanbase_web/live/metric_registry/metric_registry_sync_live.ex b/lib/sanbase_web/live/metric_registry/metric_registry_sync_live.ex index e8f2345d7..6969468d3 100644 --- a/lib/sanbase_web/live/metric_registry/metric_registry_sync_live.ex +++ b/lib/sanbase_web/live/metric_registry/metric_registry_sync_live.ex @@ -108,7 +108,7 @@ defmodule SanbaseWeb.MetricRegistrySyncLive do def handle_event("sync", _params, socket) do ids = socket.assigns.metric_ids_to_sync |> Enum.to_list() - case Sanbase.Metric.Registry.Sync.initiate_sync(ids) do + case Sanbase.Metric.Registry.Sync.sync(ids) do {:ok, data} -> {:ok, data} {:error, error} -> {:error, error} end diff --git a/lib/sanbase_web/router.ex b/lib/sanbase_web/router.ex index c85ecb2ba..9d08dbf64 100644 --- a/lib/sanbase_web/router.ex +++ b/lib/sanbase_web/router.ex @@ -182,7 +182,13 @@ defmodule SanbaseWeb.Router do scope "/", SanbaseWeb do get("/metric_registry_export", MetricRegistryController, :export_json) - post("/sync_metric_registry/:secret", MetricRegistryController, :sync) + post("/sync_metric_registry", MetricRegistryController, :sync) + + post( + "/mark_metric_registry_sync_as_finished/:sync_uuid", + MetricRegistryController, + :mark_sync_as_finished + ) get("/api_metric_name_mapping", MetricNameController, :api_metric_name_mapping) get("/projects_data", DataController, :projects_data) diff --git a/priv/repo/migrations/20241218134556_add_metric_registry_syncs.exs b/priv/repo/migrations/20241218134556_add_metric_registry_syncs.exs deleted file mode 100644 index 3e181ba74..000000000 --- a/priv/repo/migrations/20241218134556_add_metric_registry_syncs.exs +++ /dev/null @@ -1,11 +0,0 @@ -defmodule Sanbase.Repo.Migrations.AddMetricRegistrySyncs do - use Ecto.Migration - - def change do - create table(:metric_registry_syncs) do - add(:state, :string) - add(:content, :map) - timestamps() - end - end -end diff --git a/priv/repo/migrations/20250110083203_create_metric_registry_syncs_table.exs b/priv/repo/migrations/20250110083203_create_metric_registry_syncs_table.exs new file mode 100644 index 000000000..845413079 --- /dev/null +++ b/priv/repo/migrations/20250110083203_create_metric_registry_syncs_table.exs @@ -0,0 +1,16 @@ +defmodule Sanbase.Repo.Migrations.CreateMetricRegistrySyncsTable do + use Ecto.Migration + + def change do + create table(:metric_registry_syncs) do + add(:uuid, :string) + add(:status, :string) + add(:content, :text) + add(:errors, :text) + + timestamps() + end + + create(unique_index(:metric_registry_syncs, [:uuid])) + end +end diff --git a/priv/repo/structure.sql b/priv/repo/structure.sql index 0c74a57eb..58e3d951a 100644 --- a/priv/repo/structure.sql +++ b/priv/repo/structure.sql @@ -2374,8 +2374,10 @@ ALTER SEQUENCE public.metric_registry_id_seq OWNED BY public.metric_registry.id; CREATE TABLE public.metric_registry_syncs ( id bigint NOT NULL, - state character varying(255), - content jsonb, + uuid character varying(255), + status character varying(255), + content text, + errors text, inserted_at timestamp without time zone NOT NULL, updated_at timestamp without time zone NOT NULL ); @@ -7313,6 +7315,13 @@ CREATE INDEX menus_user_id_index ON public.menus USING btree (user_id); CREATE UNIQUE INDEX metric_registry_composite_unique_index ON public.metric_registry USING btree (metric, data_type, fixed_parameters); +-- +-- Name: metric_registry_syncs_uuid_index; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX metric_registry_syncs_uuid_index ON public.metric_registry_syncs USING btree (uuid); + + -- -- Name: metrics_name_index; Type: INDEX; Schema: public; Owner: - -- @@ -9782,4 +9791,4 @@ INSERT INTO public."schema_migrations" (version) VALUES (20241128113958); INSERT INTO public."schema_migrations" (version) VALUES (20241128161315); INSERT INTO public."schema_migrations" (version) VALUES (20241202104812); INSERT INTO public."schema_migrations" (version) VALUES (20241212054904); -INSERT INTO public."schema_migrations" (version) VALUES (20241218134556); +INSERT INTO public."schema_migrations" (version) VALUES (20250110083203); diff --git a/test/sanbase/metric_registry/metric_registry_sync_test.exs b/test/sanbase/metric_registry/metric_registry_sync_test.exs index 943e74d5c..6f7097fc9 100644 --- a/test/sanbase/metric_registry/metric_registry_sync_test.exs +++ b/test/sanbase/metric_registry/metric_registry_sync_test.exs @@ -1,12 +1,97 @@ defmodule Sanbase.MetricRegistrySyncTest do use SanbaseWeb.ConnCase + alias Sanbase.Metric.Registry test "syncing", context do - context.conn - |> post("/sync_metric_registry/no_secret", %{ - metrics: [%Sanbase.Metric.Registry{}, "a1", "b2", "c3"] - }) + Req.post("http://localhost:4000/sync_metric_registry?secret=secret_only_on_prod") - assert 1 == 2 + [m1_id, m2_id] = create_sync_requirements() + + {:ok, m1} = Registry.by_id(m1_id) + {:ok, m2} = Registry.by_id(m2_id) + + assert m1.sync_status == "not_synced" + assert m2.sync_status == "not_synced" + + assert {:ok, %{status: "executing", uuid: uuid}} = Registry.Sync.sync([m1_id, m2_id]) + + Process.sleep(1000) + + assert {:ok, %{status: "completed", uuid: ^uuid}} = Registry.Sync.by_uuid(uuid) + {:ok, m1} = Registry.by_id(m1_id) + {:ok, m2} = Registry.by_id(m2_id) + assert m1.sync_status == "synced" + assert m2.sync_status == "synced" + end + + defp create_sync_requirements() do + # Get 3 records + {:ok, m1} = Registry.by_name("price_usd_5m") + {:ok, m2} = Registry.by_name("mvrv_usd") + {:ok, m3} = Registry.by_name("social_volume_total") + + # Create change suggestions + {:ok, ch1} = + Registry.ChangeSuggestion.create_change_suggestion( + m1, + %{"min_interval" => "5m"}, + "for tests", + "ivan@santiment.net" + ) + + {:ok, _unused} = + Registry.ChangeSuggestion.create_change_suggestion( + m1, + %{"min_interval" => "10m"}, + "for tests", + "ivan@santiment.net" + ) + + {:ok, ch2} = + Registry.ChangeSuggestion.create_change_suggestion( + m2, + %{"min_interval" => "5m"}, + "for tests", + "ivan@santiment.net" + ) + + {:ok, ch3} = + Registry.ChangeSuggestion.create_change_suggestion( + m2, + %{"aliases" => [%{name: "mvrv_usd_new_alias"}]}, + "for tests", + "ivan@santiment.net" + ) + + {:ok, ch4} = + Registry.ChangeSuggestion.create_change_suggestion( + m1, + %{"aliases" => [%{name: "price_usd_gap_filled"}]}, + "for tests", + "ivan@santiment.net" + ) + + {:ok, ch5} = + Registry.ChangeSuggestion.create_change_suggestion( + m3, + %{"min_interval" => "2h"}, + "for tests", + "ivan@santiment.net" + ) + + # Approve all 4 suggestions + {:ok, _} = Registry.ChangeSuggestion.update_status(ch1.id, "approved") + {:ok, _} = Registry.ChangeSuggestion.update_status(ch2.id, "approved") + {:ok, _} = Registry.ChangeSuggestion.update_status(ch3.id, "approved") + {:ok, _} = Registry.ChangeSuggestion.update_status(ch4.id, "approved") + {:ok, _} = Registry.ChangeSuggestion.update_status(ch5.id, "approved") + + # Verify only price_usd_5m and mvrv_usd metrics to properly test + # that sync only affects verified metrics + {:ok, _} = Sanbase.Metric.Registry.update(m1, %{is_verified: true}, emit_event: false) + + {:ok, _} = Sanbase.Metric.Registry.update(m2, %{is_verified: true}, emit_event: false) + + [m1.id, m2.id] end end diff --git a/test/sanbase/metric_registry/metric_registry_test.exs b/test/sanbase/metric_registry/metric_registry_test.exs index 80d432201..e31c25d3b 100644 --- a/test/sanbase/metric_registry/metric_registry_test.exs +++ b/test/sanbase/metric_registry/metric_registry_test.exs @@ -1,4 +1,4 @@ -defmodule Sanbase.MetricRegistyTest do +dsefmodule Sanbase.MetricRegistyTest do use Sanbase.DataCase import ExUnit.CaptureLog