Skip to content

Commit

Permalink
Support multiple health check requests for the failover outbound
Browse files Browse the repository at this point in the history
  • Loading branch information
eycorsican committed Oct 7, 2024
1 parent 061bc75 commit a6070a7
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 15 deletions.
4 changes: 4 additions & 0 deletions leaf/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ impl OutboundManager {
settings.health_check_prefers.clone(),
settings.health_check_on_start,
settings.health_check_wait,
settings.health_check_attempts,
settings.health_check_success_percentage,
dns_client.clone(),
);
let (datagram, mut datagram_abort_handles) = failover::DatagramHandler::new(
Expand All @@ -480,6 +482,8 @@ impl OutboundManager {
settings.health_check_prefers,
settings.health_check_on_start,
settings.health_check_wait,
settings.health_check_attempts,
settings.health_check_success_percentage,
dns_client.clone(),
);
let handler = HandlerBuilder::default()
Expand Down
17 changes: 17 additions & 0 deletions leaf/src/config/conf/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub struct ProxyGroup {
pub health_check_prefers: Option<Vec<String>>,
pub health_check_on_start: Option<bool>,
pub health_check_wait: Option<bool>,
pub health_check_attempts: Option<u32>,
pub health_check_success_percentage: Option<u32>,

// tryall
pub delay_base: Option<u32>,
Expand Down Expand Up @@ -163,6 +165,8 @@ impl Default for ProxyGroup {
health_check_prefers: None,
health_check_on_start: None,
health_check_wait: None,
health_check_attempts: None,
health_check_success_percentage: None,
delay_base: None,
method: None,
}
Expand Down Expand Up @@ -626,6 +630,14 @@ pub fn from_lines(lines: Vec<io::Result<String>>) -> Result<Config> {
group.health_check_wait =
if v == "true" { Some(true) } else { Some(false) };
}
"health-check-attempts" => {
let i = if let Ok(i) = v.parse() { Some(i) } else { None };
group.health_check_attempts = i;
}
"health-check-success-percentage" => {
let i = if let Ok(i) = v.parse() { Some(i) } else { None };
group.health_check_success_percentage = i;
}
"delay-base" => {
let i = if let Ok(i) = v.parse() { Some(i) } else { None };
group.delay_base = i;
Expand Down Expand Up @@ -1317,6 +1329,11 @@ pub fn to_internal(conf: &mut Config) -> Result<internal::Config> {
settings.health_check_on_start =
ext_proxy_group.health_check_on_start.unwrap_or(false);
settings.health_check_wait = ext_proxy_group.health_check_wait.unwrap_or(false);
settings.health_check_attempts =
ext_proxy_group.health_check_attempts.unwrap_or(1);
settings.health_check_success_percentage = ext_proxy_group
.health_check_success_percentage
.unwrap_or(50);
let settings = settings.write_to_bytes().unwrap();
outbound.settings = settings;
outbounds.push(outbound);
Expand Down
10 changes: 10 additions & 0 deletions leaf/src/config/internal/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ message ChainOutboundSettings {
}

message FailOverOutboundSettings {
// A list of outbound tags.
repeated string actors = 1;
// The connect timeout value for the outbounds, in seconds. Some outbounds don't
// need to "connect", those outbounds have zero connect time. Default 4.
Expand All @@ -186,6 +187,7 @@ message FailOverOutboundSettings {
bool fallback_cache = 6;
uint32 cache_size = 7;
uint32 cache_timeout = 8;
// The outbound to be used when all actors are unavailable.
optional string last_resort = 9;
// Timeout for health check request, in seconds. Default is 6.
uint32 health_check_timeout = 10;
Expand All @@ -204,6 +206,14 @@ message FailOverOutboundSettings {
// Hold incomming connections, wait for the initial health check to be done.
// Default false.
bool health_check_wait = 15;
// Total number of health check requests per outbound, the RTT to be used to sort
// the outbounds is the average of all successful attempts. Default 1.
uint32 health_check_attempts = 16;
// The percentage of successful health check requests which to consider an outbound
// is available, must be a value between [0, 100], if the result is lower than this
// percentage, the outbound's RTT would be set to a timeout value, thus marks the
// outbound as unavailable. Default 50.
uint32 health_check_success_percentage = 17;
}

message SelectOutboundSettings {
Expand Down
26 changes: 26 additions & 0 deletions leaf/src/config/internal/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3222,6 +3222,10 @@ pub struct FailOverOutboundSettings {
pub health_check_on_start: bool,
// @@protoc_insertion_point(field:FailOverOutboundSettings.health_check_wait)
pub health_check_wait: bool,
// @@protoc_insertion_point(field:FailOverOutboundSettings.health_check_attempts)
pub health_check_attempts: u32,
// @@protoc_insertion_point(field:FailOverOutboundSettings.health_check_success_percentage)
pub health_check_success_percentage: u32,
// special fields
// @@protoc_insertion_point(special_field:FailOverOutboundSettings.special_fields)
pub special_fields: ::protobuf::SpecialFields,
Expand Down Expand Up @@ -3294,6 +3298,12 @@ impl ::protobuf::Message for FailOverOutboundSettings {
120 => {
self.health_check_wait = is.read_bool()?;
},
128 => {
self.health_check_attempts = is.read_uint32()?;
},
136 => {
self.health_check_success_percentage = is.read_uint32()?;
},
tag => {
::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?;
},
Expand Down Expand Up @@ -3351,6 +3361,12 @@ impl ::protobuf::Message for FailOverOutboundSettings {
if self.health_check_wait != false {
my_size += 1 + 1;
}
if self.health_check_attempts != 0 {
my_size += ::protobuf::rt::uint32_size(16, self.health_check_attempts);
}
if self.health_check_success_percentage != 0 {
my_size += ::protobuf::rt::uint32_size(17, self.health_check_success_percentage);
}
my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields());
self.special_fields.cached_size().set(my_size as u32);
my_size
Expand Down Expand Up @@ -3402,6 +3418,12 @@ impl ::protobuf::Message for FailOverOutboundSettings {
if self.health_check_wait != false {
os.write_bool(15, self.health_check_wait)?;
}
if self.health_check_attempts != 0 {
os.write_uint32(16, self.health_check_attempts)?;
}
if self.health_check_success_percentage != 0 {
os.write_uint32(17, self.health_check_success_percentage)?;
}
os.write_unknown_fields(self.special_fields.unknown_fields())?;
::std::result::Result::Ok(())
}
Expand Down Expand Up @@ -3434,6 +3456,8 @@ impl ::protobuf::Message for FailOverOutboundSettings {
self.health_check_prefers.clear();
self.health_check_on_start = false;
self.health_check_wait = false;
self.health_check_attempts = 0;
self.health_check_success_percentage = 0;
self.special_fields.clear();
}

Expand All @@ -3454,6 +3478,8 @@ impl ::protobuf::Message for FailOverOutboundSettings {
health_check_prefers: ::std::vec::Vec::new(),
health_check_on_start: false,
health_check_wait: false,
health_check_attempts: 0,
health_check_success_percentage: 0,
special_fields: ::protobuf::SpecialFields::new(),
};
&instance
Expand Down
8 changes: 8 additions & 0 deletions leaf/src/config/json/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ pub struct FailOverOutboundSettings {
pub health_check_on_start: Option<bool>,
#[serde(rename = "healthCheckWait")]
pub health_check_wait: Option<bool>,
#[serde(rename = "healthCheckAttempts")]
pub health_check_attempts: Option<u32>,
#[serde(rename = "healthCheckSuccessPercentage")]
pub health_check_success_percentage: Option<u32>,
pub failover: Option<bool>,
#[serde(rename = "fallbackCache")]
pub fallback_cache: Option<bool>,
Expand Down Expand Up @@ -810,6 +814,10 @@ pub fn to_internal(json: &mut Config) -> Result<internal::Config> {
settings.health_check_on_start =
ext_settings.health_check_on_start.unwrap_or(false);
settings.health_check_wait = ext_settings.health_check_wait.unwrap_or(false);
settings.health_check_attempts =
ext_settings.health_check_attempts.unwrap_or(1);
settings.health_check_success_percentage =
ext_settings.health_check_success_percentage.unwrap_or(50);
settings.check_interval = ext_settings.check_interval.unwrap_or(300); // 300 secs
settings.failover = ext_settings.failover.unwrap_or(true);
settings.fallback_cache = ext_settings.fallback_cache.unwrap_or(false);
Expand Down
4 changes: 4 additions & 0 deletions leaf/src/proxy/failover/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ impl Handler {
health_check_prefers: Vec<String>,
health_check_on_start: bool,
health_check_wait: bool,
health_check_attempts: u32,
health_check_success_percentage: u32,
dns_client: SyncDnsClient,
) -> (Self, Vec<AbortHandle>) {
let mut abort_handles = Vec::new();
Expand Down Expand Up @@ -66,6 +68,8 @@ impl Handler {
last_active.clone(),
is_first_health_check_done.clone(),
notify.as_ref().cloned(),
health_check_attempts,
health_check_success_percentage,
));
abort_handles.push(abort_handle);
let task: BoxFuture<'static, ()> = Box::pin(abortable.map(|_| ()));
Expand Down
83 changes: 68 additions & 15 deletions leaf/src/proxy/failover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ async fn single_health_check(
tag: String,
h: AnyOutboundHandler,
dns_client: SyncDnsClient,
delay: Duration,
delay: u32,
) -> Measure {
tokio::time::sleep(delay).await;
tokio::time::sleep(Duration::from_millis(
StdRng::from_entropy().gen_range(0..=delay) as u64,
))
.await;

let dest = match network {
Network::Tcp => SocksAddr::Domain("www.google.com".to_string(), 443),
Expand Down Expand Up @@ -91,12 +94,14 @@ async fn single_health_check(
let mut buf = BytesMut::with_capacity(2 * 1024);
match stream.read_buf(&mut buf).await {
Ok(n) => {
let elapsed = Instant::now().duration_since(start);
debug!(
"received {} bytes tcp health check response: {}",
"received {} bytes tcp health check response from {} in {} ms: {}",
n,
&tag,
elapsed.as_millis(),
String::from_utf8_lossy(&buf[..n.min(12)]),
);
let elapsed = Instant::now().duration_since(start);
m = Measure::new(idx, elapsed.as_millis(), tag);
}
Err(_) => {
Expand Down Expand Up @@ -158,8 +163,13 @@ async fn single_health_check(
let mut buf = vec![0u8; 1500];
match recv.recv_from(&mut buf).await {
Ok((n, _)) => {
debug!("received {} bytes udp health check response", n);
let elapsed = tokio::time::Instant::now().duration_since(start);
debug!(
"received {} bytes udp health check response from {} in {} ms",
n,
&tag,
elapsed.as_millis()
);
Measure::new(idx, elapsed.as_millis(), tag)
}
Err(_) => Measure::new(idx, u128::MAX - 3, tag),
Expand All @@ -177,17 +187,58 @@ async fn health_check(
tag: String,
h: AnyOutboundHandler,
dns_client: SyncDnsClient,
delay: Duration,
delay: u32,
health_check_timeout: u64,
health_check_attempts: u32,
health_check_success_percentage: u32,
) -> Measure {
debug!("health checking [{}] ({}) index ({})", &tag, &network, idx);
let health_check_timeout = Duration::from_secs(health_check_timeout);
let health_check_timeout_ms = health_check_timeout.as_millis();
let mut attempts = Vec::new();
for _ in 0..health_check_attempts {
attempts.push(timeout(
health_check_timeout,
single_health_check(
network,
idx,
tag.clone(),
h.clone(),
dns_client.clone(),
delay,
),
));
}
let measures = futures::future::join_all(attempts).await;
let measures = measures
.into_iter()
.map(|x| x.unwrap_or(Measure::new(idx, u128::MAX - 1, tag.clone())))
.collect::<Vec<_>>();

let n_success = measures
.iter()
.filter(|x| x.rtt < health_check_timeout_ms as u128)
.count();

debug!(
"{} out of {} successful checks for {} [{}]",
n_success, health_check_attempts, &network, &tag
);

use std::ops::Div;
let success_percentage = ((n_success as f32).div(health_check_attempts as f32) * 100.) as u32;
if success_percentage < health_check_success_percentage {
return Measure::new(idx, u128::MAX, tag);
}

let mean_rtt = measures
.iter()
.filter(|x| x.rtt < health_check_timeout_ms as u128)
.map(|x| x.rtt)
.sum::<u128>()
.div_euclid(n_success as u128);

timeout(
Duration::from_secs(health_check_timeout),
single_health_check(network, idx, tag.clone(), h, dns_client, delay),
)
.await
.unwrap_or(Measure::new(idx, u128::MAX - 1, tag))
Measure::new(idx, mean_rtt, tag)
}

pub(self) async fn health_check_task(
Expand All @@ -205,6 +256,8 @@ pub(self) async fn health_check_task(
last_active: Arc<Mutex<Instant>>,
is_first_health_check_done: Arc<AtomicBool>,
wait_for_health_check: Option<Arc<Notify>>,
health_check_attempts: u32,
health_check_success_percentage: u32,
) {
loop {
let last_active = Instant::now()
Expand All @@ -213,18 +266,18 @@ pub(self) async fn health_check_task(

if last_active < health_check_active.into() {
let mut checks = Vec::new();
let mut rng = StdRng::from_entropy();
for (i, a) in (&actors).iter().enumerate() {
let dns_client_cloned = dns_client.clone();
let delay = Duration::from_millis(rng.gen_range(0..=health_check_delay) as u64);
checks.push(Box::pin(health_check(
network,
i,
a.tag().to_owned(),
a.clone(),
dns_client_cloned,
delay,
health_check_delay,
health_check_timeout as u64,
health_check_attempts,
health_check_success_percentage,
)));
}
let mut measures = futures::future::join_all(checks).await;
Expand Down
4 changes: 4 additions & 0 deletions leaf/src/proxy/failover/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ impl Handler {
health_check_prefers: Vec<String>,
health_check_on_start: bool,
health_check_wait: bool,
health_check_attempts: u32,
health_check_success_percentage: u32,
dns_client: SyncDnsClient,
) -> (Self, Vec<AbortHandle>) {
let mut abort_handles = Vec::new();
Expand Down Expand Up @@ -72,6 +74,8 @@ impl Handler {
last_active.clone(),
is_first_health_check_done.clone(),
notify.as_ref().cloned(),
health_check_attempts,
health_check_success_percentage,
));
abort_handles.push(abort_handle);
let task: BoxFuture<'static, ()> = Box::pin(abortable.map(|_| ()));
Expand Down

0 comments on commit a6070a7

Please sign in to comment.