Skip to content

Commit

Permalink
Merge branch 'release/0.19.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Jan 7, 2025
2 parents 2b427cd + f309056 commit 20c9328
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 22 deletions.
19 changes: 3 additions & 16 deletions lib/uplink/monitors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Uplink.Monitors do
end

def run(_options \\ []) do
Cache.put_new({:monitors, :metrics}, [])
Pipelines.reset_monitors(:metrics)

Instellar.list_monitors()
|> case do
Expand All @@ -39,27 +39,14 @@ defmodule Uplink.Monitors do
defp start_pipeline(monitors, context) do
Logger.info("[Uplink.Monitors] Starting pipeline...")

started_metrics_monitor_ids =
Pipelines.get_monitors(context)
|> Enum.map(fn monitor ->
monitor["attributes"]["id"]
end)

not_started_monitors =
Enum.filter(monitors, fn monitor ->
monitor["attributes"]["id"] not in started_metrics_monitor_ids
end)

grouped_monitors =
Enum.group_by(not_started_monitors, fn monitor ->
Enum.group_by(monitors, fn monitor ->
monitor["attributes"]["type"]
end)

context_monitors = Map.get(grouped_monitors, "#{context}") || []

if Enum.count(context_monitors) > 0 do
Pipelines.append_monitors(context, context_monitors)
end
Pipelines.update_monitors(context, context_monitors)

module = Map.fetch!(@pipeline_modules, context)

Expand Down
14 changes: 9 additions & 5 deletions lib/uplink/pipelines.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
defmodule Uplink.Pipelines do
alias Uplink.Cache

def get_monitors(context) do
@valid_contexts [:metrics]

def get_monitors(context) when context in @valid_contexts do
Cache.get({:monitors, context}) || []
end

def append_monitors(context, monitors) do
Cache.get_and_update({:monitors, context}, fn existing_monitors ->
{existing_monitors, existing_monitors ++ monitors}
end)
def reset_monitors(context) when context in @valid_contexts do
Cache.put({:monitors, context}, [])
end

def update_monitors(context, monitors) when context in @valid_contexts do
Cache.put({:monitors, context}, monitors)
end

def start(module) do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Uplink.MixProject do
def project do
[
app: :uplink,
version: "0.19.0",
version: "0.19.1",
elixir: "~> 1.13",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
26 changes: 26 additions & 0 deletions test/uplink/monitors/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ defmodule Uplink.Monitors.RouterTest do
endpoint: "http://localhost:#{bypass.port}/uplink"
)

Uplink.Cache.put({:monitors, :metrics}, [
%{
"attributes" => %{
"current_state" => "active",
"endpoint" => "https://elastic:9200",
"expires_at" => "2024-11-21T03:14:17Z",
"id" => 1,
"token" => "some-token",
"type" => "metrics",
"uid" => "some-other-uid"
},
"id" => "1",
"links" => %{"self" => "http://localhost:4000/uplink/self/monitors/1"},
"relationships" => %{},
"type" => "monitors"
}
])

{:ok, bypass: bypass}
end

Expand All @@ -59,6 +77,10 @@ defmodule Uplink.Monitors.RouterTest do
end

test "can refresh monitors list", %{signature: signature, bypass: bypass} do
[monitor] = Uplink.Pipelines.get_monitors(:metrics)

assert monitor["attributes"]["uid"] == "some-other-uid"

Bypass.expect_once(bypass, "GET", "/uplink/self/monitors", fn conn ->
conn
|> Plug.Conn.put_resp_header("content-type", "application/json")
Expand All @@ -71,6 +93,10 @@ defmodule Uplink.Monitors.RouterTest do
|> put_req_header("content-type", "application/json")
|> Router.call(@opts)

[monitor] = Uplink.Pipelines.get_monitors(:metrics)

assert monitor["attributes"]["uid"] == "some-uid"

assert conn.status == 200
end
end
Expand Down

0 comments on commit 20c9328

Please sign in to comment.