Skip to content

Commit

Permalink
backup
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Jan 14, 2025
1 parent b0e2e64 commit 02dcd29
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 96 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ config :ex_audit,
],
primitive_structs: [DateTime, NaiveDateTime, Date]

config :sanbase, Sanbase.Metric.Registry.Sync, sync_secret: "secret_only_on_prod"

# Import configs
import_config "ueberauth_config.exs"
import_config "scrapers_config.exs"
Expand Down
3 changes: 3 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ if config_env() == :prod do
cron: [enabled: true]
]
]

config :sanbase, Sanbase.Metric.Registry.Sync,
sync_secret: System.get_env("METRIC_REGISTRY_SYNC_SECRET")
end
4 changes: 3 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ config :sanbase, Sanbase, url: {:system, "SANBASE_URL", ""}

config :sanbase, SanbaseWeb.Endpoint,
http: [port: 4001],
server: true
server: true,
website_url: {:system, "WEBSITE_URL", "http://localhost:4001"},
backend_url: {:system, "BACKEND_URL", "http://localhost:4001"}

config :ex_aws,
access_key_id: "test_id",
Expand Down
2 changes: 1 addition & 1 deletion lib/sanbase/metric/registry/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ defmodule Sanbase.Metric.Registry do
end
end

def by_name(metric, data_type, fixed_parameters \\ %{}) do
def by_name(metric, data_type \\ "timeseries", fixed_parameters \\ %{}) do
query =
from(mr in __MODULE__,
where:
Expand Down
180 changes: 125 additions & 55 deletions lib/sanbase/metric/registry/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,73 +5,139 @@ defmodule Sanbase.Metric.Registry.Sync do
"""

alias Sanbase.Metric.Registry
@process_name :metric_registry_sync

@spec initiate(list(non_neg_integer())) :: :ok
def initiate(metric_registry_ids) when is_list(metric_registry_ids) do
case ongoing_sync?() do
true ->
{:error, "Sync process is already running"}

false ->
pid = spawn_link(__MODULE__, :sync, [metric_registry_ids])
Process.register(pid, @process_name)
{:ok, pid}
alias Sanbase.Utils.Config

def by_uuid(uuid), do: Registry.SyncSchema.by_uuid(uuid)

@doc ~s"""
"""
@spec sync(list(non_neg_integer())) :: :ok
def sync(metric_registry_ids) when is_list(metric_registry_ids) do
with :ok <- no_running_syncs(),
:ok <- check_initiate_env(),
{:ok, content} <- get_sync_content(metric_registry_ids),
{:ok, sync} <- store_sync_in_db(content),
:ok <- start_sync(sync),
{:ok, sync} <- Registry.SyncSchema.update_status(sync, "executing") do
{:ok, sync}
end
|> dbg()
end

def initiate_sync(metric_registry_ids) do
with :ok <- check_initiate_env(),
{:ok, content} <- get_sync_content(metric_registry_ids),
:ok <- start_sync(content),
:ok <- record_sync(content) do
{:ok, content}
def mark_sync_as_finished(sync_uuid) do
with {:ok, sync} <- Registry.SyncSchema.by_uuid(sync_uuid),
{:ok, metric_registry_ids} <- extract_metric_registry_ids(sync),
:ok <- mark_metric_registries_as_synced(metric_registry_ids),
:ok <- Registry.SyncSchema.update_status(sync, "completed") do
{:ok, sync}
end
end

def apply_sync(json_content) do
def apply_sync(json) do
with :ok <- check_apply_env(),
{:ok, list} when is_list(list) <- Jason.decode(json_content) do
for %{} = params <- list do
params = Map.put(params, "sync_status", "synced")

%{"metric" => metric, "data_type" => data_type, "fixed_parameters" => fixed_parameters} =
params

with {:ok, metric} <- Registry.by_name(metric, data_type, fixed_parameters),
{:ok, metric} <- Registry.update(metric, params) do
{:ok, metric}
end
|> dbg()
end
{:ok, map} when is_map(map) <- Jason.decode(json),
list when is_list(list) <- map["content"],
:ok <- do_apply_sync_content(list),
:ok <- confirm_sync_completed(map["confirmation_endpoint"]) do
:ok
end
end

defp start_sync(content) do
url = get_sync_target_url()
# Private functions

defp confirm_sync_completed(url) do
Req.post(url)
end

defp do_apply_sync_content(list) do
list
|> Enum.reduce(Ecto.Multi.new(), fn params, multi ->
multi_metric_registry_update(multi, params)
end)
end

defp multi_metric_registry_update(multi, params) do
params = Map.put(params, "sync_status", "synced")

%{"metric" => metric, "data_type" => data_type, "fixed_parameters" => fixed_parameters} =
params

with {:ok, metric_registry} <- Registry.by_name(metric, data_type, fixed_parameters) do
changeset = Registry.changeset(metric, params)
Ecto.Multi.update(multi, metric_registry.id, changeset)
end
end

case Req.post(url, json: content) do
{:ok, _} -> :ok
{:error, _} -> {:error, "Failed to sync"}
defp no_running_syncs() do
case Registry.SyncSchema.all_with_status("executing") do
[] -> :ok
[_ | _] -> {:error, "Sync process is already running"}
end
end

defp start_sync(sync) do
url = get_sync_target_url() |> IO.inspect()

json = %{
"sync_uuid" => sync.uuid,
"content" => sync.content,
"generated_at" => DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601(),
"confirmation_endpoint" => get_confirmation_endpoint(sync)
}

case Req.post(url, json: json) do
{:ok, %Req.Response{status: 200}} ->
# The status is set to executing and it will be set to completed
# once the other side responds to the /mark_metric_registry_sync_as_finished
# endpoint
{:ok, _} = Registry.SyncSchema.update_status(sync, "executing")
:ok

{:ok, %Req.Response{status: status, body: body}} ->
error = "Failed to sync, received status code: #{status}. Body: #{body}"
{:ok, _} = Registry.SyncSchema.update_status(sync, "failed", error)
{:error, error}

{:error, reason} ->
error = inspect(reason)
{:ok, _} = Registry.SyncSchema.update_status(sync, "failed", error)
{:error, "Failed to sync, error: #{error}"}
end
end

defp mark_metric_registries_as_synced(metric_registry_ids) do
for id <- metric_registry_ids do
with {:ok, metric} <- Registry.by_id(id),
{:ok, _metric} <- Registry.update(metric, %{"sync_status" => "synced"}) do
:ok
end
end
end

defp store_sync_in_db(content) do
Registry.SyncSchema.create(content)
end

defp get_sync_target_url() do
secret = System.get_env("METRIC_REGISTRY_SYNC_SECRET")
secret = Config.module_get(Sanbase.Metric.Registry.Sync, :sync_secret)
deployment_env = Config.module_get(Sanbase, :deployment_env)
port = Config.module_get(SanbaseWeb.Endpoint, :port)

case Sanbase.Utils.Config.module_get(Sanbase, :deployment_env) do
"dev" -> "http://localhost:4000/sync_metric_registry/#{secret}"
"stage" -> "http://api.santiment.net/sync_metric_registry/#{secret}"
case deployment_env do
"dev" -> "http://localhost:#{port}/sync_metric_registry?secret=#{secret}"
"prod" -> raise("Cannot initiate sync from PROD")
"stage" -> "https://api.santiment.net/sync_metric_registry?secret=#{secret}"
end
end

defp get_sync_content([]), do: {:error, "Nothing to sync"}

defp get_sync_content(metric_registry_ids) do
structs = Sanbase.Metric.Registry.by_ids(metric_registry_ids)
content = generate_content(structs)
{:ok, content}
case Sanbase.Metric.Registry.by_ids(metric_registry_ids) do
[] -> {:error, "Nothing to sync"}
structs -> {:ok, Jason.encode!(structs)}
end
end

defp check_initiate_env() do
Expand All @@ -80,7 +146,7 @@ defmodule Sanbase.Metric.Registry.Sync do

# If local, the DATABASE_URL should not be set pointing to stage/prod.
# Only work if the local postgres is used
local? = deployment_env == "env" and is_nil(database_url)
local? = deployment_env in ["dev", "test"] and is_nil(database_url)
stage? = deployment_env == "stage"

if local? or stage? do
Expand All @@ -96,24 +162,28 @@ defmodule Sanbase.Metric.Registry.Sync do

# If local, the DATABASE_URL should not be set pointing to stage/prod.
# Only work if the local postgres is used
local? = deployment_env == "env" and is_nil(database_url)
local? = deployment_env in ["dev", "test"] and is_nil(database_url)
prod? = deployment_env == "prod"

if local? or prod? do
:ok
else
{:error, "Can only apply sync only on PROD"}
{:error, "Can apply sync only on PROD"}
end
end

defp generate_content(structs) do
api_url = SanbaseWeb.Endpoint.api_url()
from_host = URI.parse(api_url).host
defp extract_metric_registry_ids(sync) do
sync.content
|> Jason.decode!()
|> Enum.map(& &1["id"])
end

defp get_confirmation_endpoint(sync) do
secret = Sanbase.Utils.Config.module_get(Sanbase.Metric.Registry.Sync, :sync_secret)

%{
metric_registry_structs: Jason.encode!(structs),
generated_at: DateTime.utc_now() |> DateTime.to_iso8601(),
from_host: from_host
}
SanbaseWeb.Endpoint.backend_url()
|> URI.parse()
|> URI.append_path("/mark_metric_registry_sync_as_finished/#{sync.uuid}?secret=#{secret}")
|> URI.to_string()
end
end
47 changes: 47 additions & 0 deletions lib/sanbase/metric/registry/sync_schema.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Sanbase.Metric.Registry.SyncSchema do
use Ecto.Schema

import Ecto.Query
import Ecto.Changeset

schema "metric_registry_syncs" do
field(:uuid, :string)
field(:status, :string)
field(:content, :string)
field(:errors, :string)

timestamps()
end

def changeset(%__MODULE__{} = sync, attrs) do
sync
|> cast(attrs, [:uuid, :content, :status, :errors])
|> validate_inclusion(:status, ["scheduled", "executing", "completed", "failed"])
end

def create(content) do
%__MODULE__{}
|> changeset(%{content: content, status: "scheduled", uuid: Ecto.UUID.generate()})
|> Sanbase.Repo.insert()
end

def update_status(%__MODULE__{} = struct, status, errors \\ nil) do
struct
|> changeset(%{status: status, errors: errors})
|> Sanbase.Repo.update()
end

def by_uuid(uuid) do
query = from(sync in __MODULE__, where: sync.uuid == ^uuid)

case Sanbase.Repo.one(query) do
nil -> {:error, "Sync with uuid #{uuid} not found"}
sync -> {:ok, sync}
end
end

def all_with_status(status) do
from(sync in __MODULE__, where: sync.status == ^status)
|> Sanbase.Repo.all()
end
end
11 changes: 0 additions & 11 deletions lib/sanbase/metric/registry/sync_server.ex

This file was deleted.

14 changes: 10 additions & 4 deletions lib/sanbase/utils/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ defmodule Sanbase.Utils.Config do
Module for reading configuration values from the application environment.
"""

def module_get(module, key) do
def module_get(module, key_or_keys)
when is_atom(key_or_keys) or is_list(key_or_keys) do
keys = List.wrap(key_or_keys)

Application.fetch_env!(:sanbase, module)
|> Keyword.get(key)
|> get_in(keys)
|> parse_config_value()
end

Expand All @@ -15,9 +18,12 @@ defmodule Sanbase.Utils.Config do
|> parse_config_value()
end

def module_get(module, key, default) do
def module_get(module, key_or_keys, default)
when is_atom(key_or_keys) or is_list(key_or_keys) do
keys = List.wrap(key_or_keys)

case Application.fetch_env(:sanbase, module) do
{:ok, env} -> env |> Keyword.get(key, default)
{:ok, env} -> get_in(env, keys)
_ -> default
end
|> parse_config_value()
Expand Down
Loading

0 comments on commit 02dcd29

Please sign in to comment.