Skip to content

Commit

Permalink
Fix test for pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Oct 29, 2024
1 parent 31b5cc9 commit 213508c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 136 deletions.
210 changes: 79 additions & 131 deletions lib/uplink/metrics/instance/document.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,111 +63,96 @@ defimpl Uplink.Metrics.Document, for: Uplink.Metrics.Instance do
release = Map.get(config, "image.release")
serial = Map.get(config, "image.serial")

initial_networking_data = %{
"bytes_received" => 0,
"bytes_sent" => 0,
"errors_received" => 0,
"errors_sent" => 0,
"packets_dropped_inbound" => 0,
"packets_dropped_outbound" => 0,
"packets_received" => 0,
"packets_sent" => 0
}

current_counters =
Enum.reduce(
network,
initial_networking_data,
&accumulate_networking_metrics/2
)

previous_counters =
Enum.reduce(
previous_network_metric_data,
initial_networking_data,
&accumulate_networking_metrics/2
)

%{
"bytes_received" => previous_bytes_received,
"bytes_sent" => previous_bytes_sent,
"packets_received" => previous_packets_received,
"packets_sent" => previous_packets_sent
} = previous_counters

%{
"bytes_received" => bytes_received,
"bytes_sent" => bytes_sent,
"errors_received" => errors_received,
"errors_sent" => errors_sent,
"packets_dropped_inbound" => packets_dropped_inbound,
"packets_dropped_outbound" => packets_dropped_outbound,
"packets_received" => packets_received,
"packets_sent" => packets_sent
} = current_counters

diff_bytes_received = bytes_received - previous_bytes_received
diff_packets_received = packets_received - previous_packets_received
diff_bytes_sent = bytes_sent - previous_bytes_sent
diff_packets_sent = packets_sent - previous_packets_sent

time_diff_milliseconds =
DateTime.to_unix(timestamp, :millisecond) -
previous_network_metric_timestamp

network_params = %{
"metricset" => %{
"period" => time_diff_milliseconds
},
"host" => %{
"name" => name,
"created" => data.created_at,
"accessed" => data.last_used_at,
"containerized" => data.type == "container",
"os" => %{
"codename" => os,
"build" => "#{release}-#{serial}"
Enum.map(network, fn {interface, network_data} ->
{_, previous_network_data} =
Enum.find(previous_network_metric_data, fn {i, _} -> i == interface end)

%{"counters" => current_counters} = network_data

%{"counters" => previous_counters} = previous_network_data

%{
"bytes_received" => previous_bytes_received,
"bytes_sent" => previous_bytes_sent,
"packets_received" => previous_packets_received,
"packets_sent" => previous_packets_sent
} = previous_counters

%{
"bytes_received" => bytes_received,
"bytes_sent" => bytes_sent,
"errors_received" => errors_received,
"errors_sent" => errors_sent,
"packets_dropped_inbound" => packets_dropped_inbound,
"packets_dropped_outbound" => packets_dropped_outbound,
"packets_received" => packets_received,
"packets_sent" => packets_sent
} = current_counters

diff_bytes_received = bytes_received - previous_bytes_received
diff_packets_received = packets_received - previous_packets_received
diff_bytes_sent = bytes_sent - previous_bytes_sent
diff_packets_sent = packets_sent - previous_packets_sent

network_params = %{
"metricset" => %{
"period" => time_diff_milliseconds
},
"network" => %{
"in" => %{
"bytes" => diff_bytes_received,
"packets" => diff_packets_received
},
"ingress" => %{
"bytes" => diff_bytes_received,
"packets" => diff_packets_received
},
"out" => %{
"bytes" => diff_bytes_sent,
"packets" => diff_packets_sent
"host" => %{
"name" => name,
"created" => data.created_at,
"accessed" => data.last_used_at,
"containerized" => data.type == "container",
"os" => %{
"codename" => os,
"build" => "#{release}-#{serial}"
},
"egress" => %{
"bytes" => diff_bytes_sent,
"packets" => diff_packets_sent
"network" => %{
"in" => %{
"bytes" => diff_bytes_received,
"packets" => diff_packets_received
},
"ingress" => %{
"bytes" => diff_bytes_received,
"packets" => diff_packets_received
},
"out" => %{
"bytes" => diff_bytes_sent,
"packets" => diff_packets_sent
},
"egress" => %{
"bytes" => diff_bytes_sent,
"packets" => diff_packets_sent
}
}
}
},
"system" => %{
"network" => %{
"in" => %{
"bytes" => diff_bytes_received,
"dropped" => packets_dropped_inbound,
"errors" => errors_received,
"packets" => diff_packets_received
},
"out" => %{
"bytes" => diff_bytes_received,
"dropped" => packets_dropped_outbound,
"errors" => errors_sent,
"packets" => diff_packets_sent
},
"system" => %{
"network" => %{
"in" => %{
"bytes" => diff_bytes_received,
"dropped" => packets_dropped_inbound,
"errors" => errors_received,
"packets" => diff_packets_received
},
"name" => interface,
"out" => %{
"bytes" => diff_bytes_received,
"dropped" => packets_dropped_outbound,
"errors" => errors_sent,
"packets" => diff_packets_sent
}
}
}
}
}

instance
|> build_base()
|> Map.merge(network_params)
instance
|> build_base()
|> Map.merge(network_params)
end)
end

def diskio(%Instance{metrics: metrics} = instance) do
Expand Down Expand Up @@ -376,43 +361,6 @@ defimpl Uplink.Metrics.Document, for: Uplink.Metrics.Instance do
|> Map.merge(filesystem_params)
end

defp accumulate_networking_metrics({_interface, data}, acc) do
bytes_received = acc["bytes_received"] + data["counters"]["bytes_received"]

bytes_sent = acc["bytes_sent"] + data["counters"]["bytes_sent"]

errors_received =
acc["errors_received"] + data["counters"]["errors_received"]

errors_sent = acc["errors_sent"] + data["counters"]["errors_sent"]

packets_dropped_inbound =
acc["packets_dropped_inbound"] +
data["counters"]["packets_dropped_inbound"]

packets_dropped_outbound =
acc["packets_dropped_outbound"] +
data["counters"]["packets_dropped_outbound"]

packets_received =
acc["packets_received"] +
data["counters"]["packets_received"]

packets_sent =
acc["packets_sent"] +
data["counters"]["packets_sent"]

acc
|> Map.put("bytes_received", bytes_received)
|> Map.put("bytes_sent", bytes_sent)
|> Map.put("errors_received", errors_received)
|> Map.put("errors_sent", errors_sent)
|> Map.put("packets_dropped_inbound", packets_dropped_inbound)
|> Map.put("packets_dropped_outbound", packets_dropped_outbound)
|> Map.put("packets_received", packets_received)
|> Map.put("packets_sent", packets_sent)
end

defp cpu_percentage(cores, time_diff_seconds, earlier_usage, later_usage) do
available_compute = cores * time_diff_seconds * :math.pow(10, 9)

Expand Down
20 changes: 16 additions & 4 deletions lib/uplink/metrics/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ defmodule Uplink.Metrics.Pipeline do

monitors
|> Enum.map(fn monitor ->
Metrics.push!(monitor, documents) |> IO.inspect()
Metrics.push!(monitor, documents)
end)

messages
Expand All @@ -120,11 +120,23 @@ defmodule Uplink.Metrics.Pipeline do
end)

dataset
|> Enum.flat_map(fn {type, data} ->
index = Metrics.index(type)
|> Enum.flat_map(&build_request/1)
end

defp build_request({type, data}) when is_list(data) do
index = Metrics.index(type)

Enum.reduce(data, [], fn entry, acc ->
metadata = %{"create" => %{"_index" => index}}

[metadata, data]
[metadata, entry | acc]
end)
end

defp build_request({type, data}) when is_map(data) do
index = Metrics.index(type)
metadata = %{"create" => %{"_index" => index}}

[metadata, data]
end
end
6 changes: 5 additions & 1 deletion test/scenarios/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,11 @@ defmodule Uplink.Scenarios.Pipeline do
],
account: %{id: "upmaru-stage"}
},
previous_cpu_metric: nil
previous_cpu_metric: nil,
previous_network_metric: nil,
cpu_60_metric: nil,
cpu_300_metric: nil,
cpu_900_metric: nil
}

{:ok,
Expand Down

0 comments on commit 213508c

Please sign in to comment.