Skip to content

Commit

Permalink
feat(local): PingBalancer print warning if server's fail_rate > 0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Jun 23, 2024
1 parent 8271ff8 commit ff3bf36
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 41 deletions.
46 changes: 29 additions & 17 deletions crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,24 +818,36 @@ struct PingChecker {
impl PingChecker {
/// Checks server's score and update into `ServerScore<E>`
async fn check_update_score(self) {
let server_score = match self.server_type {
ServerType::Tcp => self.server.tcp_score(),
ServerType::Udp => self.server.udp_score(),
};

let score = match self.check_delay().await {
Ok(d) => match self.server_type {
ServerType::Tcp => self.server.tcp_score().push_score(Score::Latency(d)).await,
ServerType::Udp => self.server.udp_score().push_score(Score::Latency(d)).await,
},
Ok(d) => server_score.push_score(Score::Latency(d)).await,
// Penalty
Err(..) => match self.server_type {
ServerType::Tcp => self.server.tcp_score().push_score(Score::Errored).await,
ServerType::Udp => self.server.udp_score().push_score(Score::Errored).await,
},
Err(..) => server_score.push_score(Score::Errored).await,
};

trace!(
"updated remote {} server {} (score: {})",
self.server_type,
self.server.server_config().addr(),
score
);
let stat_data = server_score.stat_data().await;

if stat_data.fail_rate > 0.5 {
warn!(
"balancer: checked & updated remote {} server {} (score: {}), {:?}",
self.server_type,
ServerConfigFormatter::new(self.server.server_config()),
score,
stat_data,
);
} else {
debug!(
"balancer: checked & updated remote {} server {} (score: {}), {:?}",
self.server_type,
ServerConfigFormatter::new(self.server.server_config()),
score,
stat_data,
);
}
}

/// Detect TCP connectivity with Chromium [Network Portal Detection](https://www.chromium.org/chromium-os/chromiumos-design-docs/network-portal-detection)
Expand Down Expand Up @@ -988,7 +1000,7 @@ impl PingChecker {
trace!(
"checked remote {} server {} latency with {} ms",
self.server_type,
self.server.server_config().addr(),
ServerConfigFormatter::new(self.server.server_config()),
elapsed
);
Ok(elapsed)
Expand All @@ -997,7 +1009,7 @@ impl PingChecker {
debug!(
"failed to check {} server {}, error: {}",
self.server_type,
self.server.server_config().addr(),
ServerConfigFormatter::new(self.server.server_config()),
err
);

Expand All @@ -1011,7 +1023,7 @@ impl PingChecker {
trace!(
"checked remote {} server {} latency timeout, elapsed {} ms",
self.server_type,
self.server.server_config().addr(),
ServerConfigFormatter::new(self.server.server_config()),
elapsed
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::Mutex;

use crate::{config::ServerInstanceConfig, local::context::ServiceContext};

use super::server_stat::{Score, ServerStat};
use super::server_stat::{Score, ServerStat, ServerStatData};

/// Server's statistic score
pub struct ServerScore {
Expand Down Expand Up @@ -53,6 +53,11 @@ impl ServerScore {
pub async fn report_failure(&self) -> u32 {
self.push_score(Score::Errored).await
}

/// Get statistic data
pub async fn stat_data(&self) -> ServerStatData {
self.stat_data.lock().await.data().clone()

Check warning on line 59 in crates/shadowsocks-service/src/local/loadbalancing/server_data.rs

View workflow job for this annotation

GitHub Actions / clippy ubuntu-latest

using `clone` on type `ServerStatData` which implements the `Copy` trait

warning: using `clone` on type `ServerStatData` which implements the `Copy` trait --> crates/shadowsocks-service/src/local/loadbalancing/server_data.rs:59:9 | 59 | self.stat_data.lock().await.data().clone() | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try dereferencing it: `*self.stat_data.lock().await.data()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy = note: `#[warn(clippy::clone_on_copy)]` on by default

Check warning on line 59 in crates/shadowsocks-service/src/local/loadbalancing/server_data.rs

View workflow job for this annotation

GitHub Actions / clippy macos-latest

using `clone` on type `ServerStatData` which implements the `Copy` trait

warning: using `clone` on type `ServerStatData` which implements the `Copy` trait --> crates/shadowsocks-service/src/local/loadbalancing/server_data.rs:59:9 | 59 | self.stat_data.lock().await.data().clone() | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try dereferencing it: `*self.stat_data.lock().await.data()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy = note: `#[warn(clippy::clone_on_copy)]` on by default
}
}

impl Debug for ServerScore {
Expand Down
64 changes: 41 additions & 23 deletions crates/shadowsocks-service/src/local/loadbalancing/server_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,37 @@ pub enum Score {
Errored,
}

/// Statistic of a remote server
#[derive(Debug)]
pub struct ServerStat {
/// Server statistic data
#[derive(Debug, Clone, Copy)]
pub struct ServerStatData {
/// Median of latency time (in millisec)
///
/// Use median instead of average time,
/// because probing result may have some really bad cases
rtt: u32,
pub latency_median: u32,
/// Total_Fail / Total_Probe
pub fail_rate: f64,
/// Score's standard deviation
pub latency_stdev: f64,
/// Score's average
pub latency_mean: f64,
}

/// Statistic of a remote server
#[derive(Debug)]
pub struct ServerStat {
/// MAX server's RTT, normally the check timeout milliseconds
max_server_rtt: u32,
/// Total_Fail / Total_Probe
fail_rate: f64,
/// Recently probe data
latency_queue: VecDeque<(Score, Instant)>,
/// Score's standard deviation
latency_stdev: f64,
/// Score's standard deviation MAX
max_latency_stdev: f64,
/// Score's average
latency_mean: f64,
/// User's customized weight
user_weight: f32,
/// Checking window size
check_window: Duration,
/// Statistic Data
data: ServerStatData,
}

fn max_latency_stdev(max_server_rtt: u32) -> f64 {
Expand All @@ -58,25 +65,28 @@ impl ServerStat {
pub fn new(user_weight: f32, max_server_rtt: u32, check_window: Duration) -> ServerStat {
assert!((0.0..=1.0).contains(&user_weight));

let max_latency_stdev = max_latency_stdev(max_server_rtt);
ServerStat {
rtt: max_server_rtt,
max_server_rtt,
fail_rate: 1.0,
latency_queue: VecDeque::new(),
latency_stdev: 0.0,
max_latency_stdev: max_latency_stdev(max_server_rtt),
latency_mean: 0.0,
max_latency_stdev,
user_weight,
check_window,
data: ServerStatData {
latency_median: max_server_rtt,
fail_rate: 1.0,
latency_stdev: max_latency_stdev,
latency_mean: max_server_rtt as f64,
},
}
}

fn score(&self) -> u32 {
// Normalize rtt
let nrtt = self.rtt as f64 / self.max_server_rtt as f64;
let nrtt = self.data.latency_median as f64 / self.max_server_rtt as f64;

// Normalize stdev
let nstdev = self.latency_stdev / self.max_latency_stdev;
let nstdev = self.data.latency_stdev / self.max_latency_stdev;

const SCORE_RTT_WEIGHT: f64 = 1.0;
const SCORE_FAIL_WEIGHT: f64 = 3.0;
Expand All @@ -92,7 +102,7 @@ impl ServerStat {
// 2. The lower errored count, the better
// 3. The lower latency's stdev, the better
// 4. The higher user's weight, the better
let score = (nrtt * SCORE_RTT_WEIGHT + self.fail_rate * SCORE_FAIL_WEIGHT + nstdev * SCORE_STDEV_WEIGHT)
let score = (nrtt * SCORE_RTT_WEIGHT + self.data.fail_rate * SCORE_FAIL_WEIGHT + nstdev * SCORE_STDEV_WEIGHT)
/ (SCORE_RTT_WEIGHT + SCORE_FAIL_WEIGHT + SCORE_STDEV_WEIGHT)
/ user_weight as f64;

Expand Down Expand Up @@ -132,15 +142,17 @@ impl ServerStat {
}

// Error rate
self.fail_rate = cerr as f64 / self.latency_queue.len() as f64;
self.data.fail_rate = cerr as f64 / self.latency_queue.len() as f64;

self.data.latency_stdev = self.max_latency_stdev;
self.data.latency_mean = self.max_server_rtt as f64;
if !vlat.is_empty() {
vlat.sort_unstable();

// Find median of latency
let mid = vlat.len() / 2;

self.rtt = if vlat.len() % 2 == 0 {
self.data.latency_median = if vlat.len() % 2 == 0 {
(vlat[mid] + vlat[mid - 1]) / 2
} else {
vlat[mid]
Expand All @@ -154,17 +166,23 @@ impl ServerStat {
for s in &vlat {
total_lat += *s;
}
self.latency_mean = total_lat as f64 / n;
self.data.latency_mean = total_lat as f64 / n;
let mut acc_diff = 0.0;
for s in &vlat {
let diff = *s as f64 - self.latency_mean;
let diff = *s as f64 - self.data.latency_mean;
acc_diff += diff * diff;
}
// Corrected Sample Standard Deviation
self.latency_stdev = ((1.0 / (n - 1.0)) * acc_diff).sqrt();
self.data.latency_stdev = ((1.0 / (n - 1.0)) * acc_diff).sqrt();
} else {
self.data.latency_mean = vlat[0] as f64;
}
}

self.score()
}

pub fn data(&self) -> &ServerStatData {
&self.data
}
}

0 comments on commit ff3bf36

Please sign in to comment.