Skip to content

Commit

Permalink
Add resource schema to be the output of metrics query
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Jan 10, 2025
1 parent 94ee75c commit e92cdb2
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
11 changes: 11 additions & 0 deletions lib/uplink/availability.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Uplink.Availability do
alias Uplink.Clients.Instellar

alias __MODULE__.Query
alias __MODULE__.Response
alias __MODULE__.Resource

def check! do
case get_monitor() do
Expand Down Expand Up @@ -36,6 +38,15 @@ defmodule Uplink.Availability do
query = Query.build(nodes, indices)

Metrics.query!(monitor, query)
|> case do
%{status: 200, body: %{"responses" => responses}} ->
nodes
|> Response.parse(responses)
|> Enum.map(&Resource.parse/1)

_ ->
{:error, :could_not_query_metrics}
end
end

defp get_monitor do
Expand Down
44 changes: 44 additions & 0 deletions lib/uplink/availability/resource.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Uplink.Availability.Resource do
use Ecto.Schema
import Ecto.Changeset

@primary_key false
embedded_schema do
field :node, :string

embeds_one :available, Available, primary_key: false do
field :cpu_cores, :integer
field :memory_bytes, :decimal
field :storage_bytes, :decimal
end

embeds_one :usage, Usage, primary_key: false do
field :load_norm_5, :decimal
field :memory_bytes, :decimal
field :storage_bytes, :decimal
end
end

def changeset(resource, params) do
resource
|> cast(params, [:node])
|> cast_embed(:available, with: &available_changeset/2)
|> cast_embed(:usage, with: &usage_changeset/2)
end

def available_changeset(available, params) do
available
|> cast(params, [:cpu_cores, :memory_bytes, :storage_bytes])
end

def usage_changeset(usage, params) do
usage
|> cast(params, [:load_norm_5, :memory_bytes, :storage_bytes])
end

def parse(params) do
%__MODULE__{}
|> changeset(params)
|> apply_action!(:insert)
end
end
52 changes: 52 additions & 0 deletions lib/uplink/availability/response.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Uplink.Availability.Response do
alias Uplink.Availability.Query

def parse(nodes, responses) when is_list(responses) do
retrieval_keys = Query.retrieval_keys()

params = %{
retrieval_keys: retrieval_keys,
responses: responses
}

Enum.map(nodes, &parse_node(&1, params))
end

defp parse_node(node, %{retrieval_keys: retrieval_keys, responses: responses}) do
usage_params =
Enum.filter(responses, fn response ->
%{"aggregations" => aggregations} = response

Map.has_key?(aggregations, node.name)
end)
|> Enum.reduce(%{}, fn response, acc ->
%{"aggregations" => aggregations} = response

relevant_key =
aggregations
|> Map.keys()
|> Enum.filter(&Enum.member?(retrieval_keys, &1))
|> List.first()

result =
Map.get(aggregations, relevant_key)
|> Map.fetch!("value")

Map.put(acc, relevant_key, result)
end)

%{
"node" => node.name,
"usage" => %{
"load_norm_5" => Map.get(usage_params, "load_norm_5", 0.0),
"memory_bytes" => Map.get(usage_params, "memory_used_bytes", 0),
"storage_bytes" => Map.get(usage_params, "filesystem_used_bytes", 0)
},
"available" => %{
"cpu_cores" => node.cpu_cores_count,
"memory_bytes" => node.total_memory,
"storage_bytes" => node.total_storage
}
}
end
end

0 comments on commit e92cdb2

Please sign in to comment.