From a6070a7ef845de927d742b719565d4500f37de2e Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 7 Oct 2024 19:10:39 +0800 Subject: [PATCH] Support multiple health check requests for the failover outbound --- leaf/src/app/outbound/manager.rs | 4 ++ leaf/src/config/conf/config.rs | 17 ++++++ leaf/src/config/internal/config.proto | 10 ++++ leaf/src/config/internal/config.rs | 26 +++++++++ leaf/src/config/json/config.rs | 8 +++ leaf/src/proxy/failover/datagram.rs | 4 ++ leaf/src/proxy/failover/mod.rs | 83 ++++++++++++++++++++++----- leaf/src/proxy/failover/stream.rs | 4 ++ 8 files changed, 141 insertions(+), 15 deletions(-) diff --git a/leaf/src/app/outbound/manager.rs b/leaf/src/app/outbound/manager.rs index 14fe6f574..555c6ce93 100644 --- a/leaf/src/app/outbound/manager.rs +++ b/leaf/src/app/outbound/manager.rs @@ -465,6 +465,8 @@ impl OutboundManager { settings.health_check_prefers.clone(), settings.health_check_on_start, settings.health_check_wait, + settings.health_check_attempts, + settings.health_check_success_percentage, dns_client.clone(), ); let (datagram, mut datagram_abort_handles) = failover::DatagramHandler::new( @@ -480,6 +482,8 @@ impl OutboundManager { settings.health_check_prefers, settings.health_check_on_start, settings.health_check_wait, + settings.health_check_attempts, + settings.health_check_success_percentage, dns_client.clone(), ); let handler = HandlerBuilder::default() diff --git a/leaf/src/config/conf/config.rs b/leaf/src/config/conf/config.rs index fc09fb608..39bc5c0a0 100644 --- a/leaf/src/config/conf/config.rs +++ b/leaf/src/config/conf/config.rs @@ -135,6 +135,8 @@ pub struct ProxyGroup { pub health_check_prefers: Option>, pub health_check_on_start: Option, pub health_check_wait: Option, + pub health_check_attempts: Option, + pub health_check_success_percentage: Option, // tryall pub delay_base: Option, @@ -163,6 +165,8 @@ impl Default for ProxyGroup { health_check_prefers: None, health_check_on_start: None, health_check_wait: None, + health_check_attempts: None, + health_check_success_percentage: None, delay_base: None, method: None, } @@ -626,6 +630,14 @@ pub fn from_lines(lines: Vec>) -> Result { group.health_check_wait = if v == "true" { Some(true) } else { Some(false) }; } + "health-check-attempts" => { + let i = if let Ok(i) = v.parse() { Some(i) } else { None }; + group.health_check_attempts = i; + } + "health-check-success-percentage" => { + let i = if let Ok(i) = v.parse() { Some(i) } else { None }; + group.health_check_success_percentage = i; + } "delay-base" => { let i = if let Ok(i) = v.parse() { Some(i) } else { None }; group.delay_base = i; @@ -1317,6 +1329,11 @@ pub fn to_internal(conf: &mut Config) -> Result { settings.health_check_on_start = ext_proxy_group.health_check_on_start.unwrap_or(false); settings.health_check_wait = ext_proxy_group.health_check_wait.unwrap_or(false); + settings.health_check_attempts = + ext_proxy_group.health_check_attempts.unwrap_or(1); + settings.health_check_success_percentage = ext_proxy_group + .health_check_success_percentage + .unwrap_or(50); let settings = settings.write_to_bytes().unwrap(); outbound.settings = settings; outbounds.push(outbound); diff --git a/leaf/src/config/internal/config.proto b/leaf/src/config/internal/config.proto index 6abb20f44..b61e9af1f 100644 --- a/leaf/src/config/internal/config.proto +++ b/leaf/src/config/internal/config.proto @@ -172,6 +172,7 @@ message ChainOutboundSettings { } message FailOverOutboundSettings { + // A list of outbound tags. repeated string actors = 1; // The connect timeout value for the outbounds, in seconds. Some outbounds don't // need to "connect", those outbounds have zero connect time. Default 4. @@ -186,6 +187,7 @@ message FailOverOutboundSettings { bool fallback_cache = 6; uint32 cache_size = 7; uint32 cache_timeout = 8; + // The outbound to be used when all actors are unavailable. optional string last_resort = 9; // Timeout for health check request, in seconds. Default is 6. uint32 health_check_timeout = 10; @@ -204,6 +206,14 @@ message FailOverOutboundSettings { // Hold incomming connections, wait for the initial health check to be done. // Default false. bool health_check_wait = 15; + // Total number of health check requests per outbound, the RTT to be used to sort + // the outbounds is the average of all successful attempts. Default 1. + uint32 health_check_attempts = 16; + // The percentage of successful health check requests which to consider an outbound + // is available, must be a value between [0, 100], if the result is lower than this + // percentage, the outbound's RTT would be set to a timeout value, thus marks the + // outbound as unavailable. Default 50. + uint32 health_check_success_percentage = 17; } message SelectOutboundSettings { diff --git a/leaf/src/config/internal/config.rs b/leaf/src/config/internal/config.rs index 10059e382..d15e41a25 100644 --- a/leaf/src/config/internal/config.rs +++ b/leaf/src/config/internal/config.rs @@ -3222,6 +3222,10 @@ pub struct FailOverOutboundSettings { pub health_check_on_start: bool, // @@protoc_insertion_point(field:FailOverOutboundSettings.health_check_wait) pub health_check_wait: bool, + // @@protoc_insertion_point(field:FailOverOutboundSettings.health_check_attempts) + pub health_check_attempts: u32, + // @@protoc_insertion_point(field:FailOverOutboundSettings.health_check_success_percentage) + pub health_check_success_percentage: u32, // special fields // @@protoc_insertion_point(special_field:FailOverOutboundSettings.special_fields) pub special_fields: ::protobuf::SpecialFields, @@ -3294,6 +3298,12 @@ impl ::protobuf::Message for FailOverOutboundSettings { 120 => { self.health_check_wait = is.read_bool()?; }, + 128 => { + self.health_check_attempts = is.read_uint32()?; + }, + 136 => { + self.health_check_success_percentage = is.read_uint32()?; + }, tag => { ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; }, @@ -3351,6 +3361,12 @@ impl ::protobuf::Message for FailOverOutboundSettings { if self.health_check_wait != false { my_size += 1 + 1; } + if self.health_check_attempts != 0 { + my_size += ::protobuf::rt::uint32_size(16, self.health_check_attempts); + } + if self.health_check_success_percentage != 0 { + my_size += ::protobuf::rt::uint32_size(17, self.health_check_success_percentage); + } my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); self.special_fields.cached_size().set(my_size as u32); my_size @@ -3402,6 +3418,12 @@ impl ::protobuf::Message for FailOverOutboundSettings { if self.health_check_wait != false { os.write_bool(15, self.health_check_wait)?; } + if self.health_check_attempts != 0 { + os.write_uint32(16, self.health_check_attempts)?; + } + if self.health_check_success_percentage != 0 { + os.write_uint32(17, self.health_check_success_percentage)?; + } os.write_unknown_fields(self.special_fields.unknown_fields())?; ::std::result::Result::Ok(()) } @@ -3434,6 +3456,8 @@ impl ::protobuf::Message for FailOverOutboundSettings { self.health_check_prefers.clear(); self.health_check_on_start = false; self.health_check_wait = false; + self.health_check_attempts = 0; + self.health_check_success_percentage = 0; self.special_fields.clear(); } @@ -3454,6 +3478,8 @@ impl ::protobuf::Message for FailOverOutboundSettings { health_check_prefers: ::std::vec::Vec::new(), health_check_on_start: false, health_check_wait: false, + health_check_attempts: 0, + health_check_success_percentage: 0, special_fields: ::protobuf::SpecialFields::new(), }; &instance diff --git a/leaf/src/config/json/config.rs b/leaf/src/config/json/config.rs index 6f7a035ad..dd581ddd2 100644 --- a/leaf/src/config/json/config.rs +++ b/leaf/src/config/json/config.rs @@ -218,6 +218,10 @@ pub struct FailOverOutboundSettings { pub health_check_on_start: Option, #[serde(rename = "healthCheckWait")] pub health_check_wait: Option, + #[serde(rename = "healthCheckAttempts")] + pub health_check_attempts: Option, + #[serde(rename = "healthCheckSuccessPercentage")] + pub health_check_success_percentage: Option, pub failover: Option, #[serde(rename = "fallbackCache")] pub fallback_cache: Option, @@ -810,6 +814,10 @@ pub fn to_internal(json: &mut Config) -> Result { settings.health_check_on_start = ext_settings.health_check_on_start.unwrap_or(false); settings.health_check_wait = ext_settings.health_check_wait.unwrap_or(false); + settings.health_check_attempts = + ext_settings.health_check_attempts.unwrap_or(1); + settings.health_check_success_percentage = + ext_settings.health_check_success_percentage.unwrap_or(50); settings.check_interval = ext_settings.check_interval.unwrap_or(300); // 300 secs settings.failover = ext_settings.failover.unwrap_or(true); settings.fallback_cache = ext_settings.fallback_cache.unwrap_or(false); diff --git a/leaf/src/proxy/failover/datagram.rs b/leaf/src/proxy/failover/datagram.rs index 6bb2ce846..65290656d 100644 --- a/leaf/src/proxy/failover/datagram.rs +++ b/leaf/src/proxy/failover/datagram.rs @@ -38,6 +38,8 @@ impl Handler { health_check_prefers: Vec, health_check_on_start: bool, health_check_wait: bool, + health_check_attempts: u32, + health_check_success_percentage: u32, dns_client: SyncDnsClient, ) -> (Self, Vec) { let mut abort_handles = Vec::new(); @@ -66,6 +68,8 @@ impl Handler { last_active.clone(), is_first_health_check_done.clone(), notify.as_ref().cloned(), + health_check_attempts, + health_check_success_percentage, )); abort_handles.push(abort_handle); let task: BoxFuture<'static, ()> = Box::pin(abortable.map(|_| ())); diff --git a/leaf/src/proxy/failover/mod.rs b/leaf/src/proxy/failover/mod.rs index 126268315..e939e4545 100644 --- a/leaf/src/proxy/failover/mod.rs +++ b/leaf/src/proxy/failover/mod.rs @@ -40,9 +40,12 @@ async fn single_health_check( tag: String, h: AnyOutboundHandler, dns_client: SyncDnsClient, - delay: Duration, + delay: u32, ) -> Measure { - tokio::time::sleep(delay).await; + tokio::time::sleep(Duration::from_millis( + StdRng::from_entropy().gen_range(0..=delay) as u64, + )) + .await; let dest = match network { Network::Tcp => SocksAddr::Domain("www.google.com".to_string(), 443), @@ -91,12 +94,14 @@ async fn single_health_check( let mut buf = BytesMut::with_capacity(2 * 1024); match stream.read_buf(&mut buf).await { Ok(n) => { + let elapsed = Instant::now().duration_since(start); debug!( - "received {} bytes tcp health check response: {}", + "received {} bytes tcp health check response from {} in {} ms: {}", n, + &tag, + elapsed.as_millis(), String::from_utf8_lossy(&buf[..n.min(12)]), ); - let elapsed = Instant::now().duration_since(start); m = Measure::new(idx, elapsed.as_millis(), tag); } Err(_) => { @@ -158,8 +163,13 @@ async fn single_health_check( let mut buf = vec![0u8; 1500]; match recv.recv_from(&mut buf).await { Ok((n, _)) => { - debug!("received {} bytes udp health check response", n); let elapsed = tokio::time::Instant::now().duration_since(start); + debug!( + "received {} bytes udp health check response from {} in {} ms", + n, + &tag, + elapsed.as_millis() + ); Measure::new(idx, elapsed.as_millis(), tag) } Err(_) => Measure::new(idx, u128::MAX - 3, tag), @@ -177,17 +187,58 @@ async fn health_check( tag: String, h: AnyOutboundHandler, dns_client: SyncDnsClient, - delay: Duration, + delay: u32, health_check_timeout: u64, + health_check_attempts: u32, + health_check_success_percentage: u32, ) -> Measure { debug!("health checking [{}] ({}) index ({})", &tag, &network, idx); + let health_check_timeout = Duration::from_secs(health_check_timeout); + let health_check_timeout_ms = health_check_timeout.as_millis(); + let mut attempts = Vec::new(); + for _ in 0..health_check_attempts { + attempts.push(timeout( + health_check_timeout, + single_health_check( + network, + idx, + tag.clone(), + h.clone(), + dns_client.clone(), + delay, + ), + )); + } + let measures = futures::future::join_all(attempts).await; + let measures = measures + .into_iter() + .map(|x| x.unwrap_or(Measure::new(idx, u128::MAX - 1, tag.clone()))) + .collect::>(); + + let n_success = measures + .iter() + .filter(|x| x.rtt < health_check_timeout_ms as u128) + .count(); + + debug!( + "{} out of {} successful checks for {} [{}]", + n_success, health_check_attempts, &network, &tag + ); + + use std::ops::Div; + let success_percentage = ((n_success as f32).div(health_check_attempts as f32) * 100.) as u32; + if success_percentage < health_check_success_percentage { + return Measure::new(idx, u128::MAX, tag); + } + + let mean_rtt = measures + .iter() + .filter(|x| x.rtt < health_check_timeout_ms as u128) + .map(|x| x.rtt) + .sum::() + .div_euclid(n_success as u128); - timeout( - Duration::from_secs(health_check_timeout), - single_health_check(network, idx, tag.clone(), h, dns_client, delay), - ) - .await - .unwrap_or(Measure::new(idx, u128::MAX - 1, tag)) + Measure::new(idx, mean_rtt, tag) } pub(self) async fn health_check_task( @@ -205,6 +256,8 @@ pub(self) async fn health_check_task( last_active: Arc>, is_first_health_check_done: Arc, wait_for_health_check: Option>, + health_check_attempts: u32, + health_check_success_percentage: u32, ) { loop { let last_active = Instant::now() @@ -213,18 +266,18 @@ pub(self) async fn health_check_task( if last_active < health_check_active.into() { let mut checks = Vec::new(); - let mut rng = StdRng::from_entropy(); for (i, a) in (&actors).iter().enumerate() { let dns_client_cloned = dns_client.clone(); - let delay = Duration::from_millis(rng.gen_range(0..=health_check_delay) as u64); checks.push(Box::pin(health_check( network, i, a.tag().to_owned(), a.clone(), dns_client_cloned, - delay, + health_check_delay, health_check_timeout as u64, + health_check_attempts, + health_check_success_percentage, ))); } let mut measures = futures::future::join_all(checks).await; diff --git a/leaf/src/proxy/failover/stream.rs b/leaf/src/proxy/failover/stream.rs index 8671b2891..6dc095628 100644 --- a/leaf/src/proxy/failover/stream.rs +++ b/leaf/src/proxy/failover/stream.rs @@ -44,6 +44,8 @@ impl Handler { health_check_prefers: Vec, health_check_on_start: bool, health_check_wait: bool, + health_check_attempts: u32, + health_check_success_percentage: u32, dns_client: SyncDnsClient, ) -> (Self, Vec) { let mut abort_handles = Vec::new(); @@ -72,6 +74,8 @@ impl Handler { last_active.clone(), is_first_health_check_done.clone(), notify.as_ref().cloned(), + health_check_attempts, + health_check_success_percentage, )); abort_handles.push(abort_handle); let task: BoxFuture<'static, ()> = Box::pin(abortable.map(|_| ()));