Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync fork #26

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "alephzero" ]
paths-ignore:
- '**/README.md'
- "**/README.md"
pull_request:
branches: [ "alephzero" ]

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Subway is build with middleware pattern.
- Inject optional `defaultBlock` parameter to requests to ensure downstream middleware such as cache can work properly.
- Subscription
- Forward requests to upstream servers.
- TODO: Merge duplicated subscriptions.
- TODO: Rate Limit
- Merge duplicated subscriptions.
- Rate Limit
- Rate limit requests from downstream middleware.
- TODO: Parameter filter
- Deny requests with invalid parameters.
Expand Down
34 changes: 33 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ pub struct MiddlewaresConfig {
pub subscriptions: Vec<String>,
}

#[derive(Debug)]
#[derive(Debug, Validate)]
#[garde(allow_unvalidated)]
pub struct Config {
#[garde(dive)]
pub extensions: ExtensionsConfig,
pub middlewares: MiddlewaresConfig,
#[garde(dive)]
pub rpcs: RpcDefinitions,
}

Expand Down Expand Up @@ -227,9 +230,38 @@ fn validate_config(config: &Config) -> Result<(), anyhow::Error> {
} else if has_optional {
bail!("Method {} has required param after optional param", method.method);
}
(None, None) => env::var(&caps[1]),
_ => Err(env::VarError::NotPresent),
}
};

// replace every matches with early return
// when encountering error
for caps in re.captures_iter(templated_config_str) {
let m = caps
.get(0)
.expect("i==0 means implicit unnamed group that includes the entire match, which is infalliable");
config_str.push_str(&templated_config_str[last_match..m.start()]);
config_str.push_str(
&replacement(&caps).with_context(|| format!("Unable to replace environment variable {}", &caps[1]))?,
);
last_match = m.end();
}
config_str.push_str(&templated_config_str[last_match..]);
Ok(config_str)
}

pub async fn validate(config: &Config) -> Result<(), anyhow::Error> {
// validate use garde::Validate
config.validate(&())?;
// since endpoints connection test is async
// we can't intergrate it into garde::Validate
// and it's not a static validation like format, length, .etc
if let Some(client_config) = &config.extensions.client {
if !client_config.all_endpoints_can_be_connected().await {
anyhow::bail!("Unable to connect to all endpoints");
}
}
Ok(())
}

Expand Down
120 changes: 118 additions & 2 deletions src/config/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use garde::Validate;
use jsonrpsee::core::JsonValue;
use serde::Deserialize;

Expand All @@ -20,13 +21,15 @@ pub struct MethodParam {
pub inject: bool,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Validate, Debug)]
#[garde(allow_unvalidated)]
pub struct RpcMethod {
pub method: String,

#[serde(default)]
pub cache: Option<CacheParams>,

#[garde(custom(validate_params_with_name(&self.method)))]
#[serde(default)]
pub params: Vec<MethodParam>,

Expand All @@ -48,6 +51,31 @@ pub struct RpcMethod {
pub rate_limit_weight: u32,
}

fn validate_params_with_name(method_name: &str) -> impl FnOnce(&[MethodParam], &()) -> garde::Result + '_ {
move |params, _| {
// ensure each method has only one param with inject=true
if params.iter().filter(|x| x.inject).count() > 1 {
return Err(garde::Error::new(format!(
"method {} has more than one inject param",
method_name
)));
}
// ensure there is no required param after optional param
let mut has_optional = false;
for param in params {
if param.optional {
has_optional = true;
} else if has_optional {
return Err(garde::Error::new(format!(
"method {} has required param after optional param",
method_name
)));
}
}
Ok(())
}
}

fn default_rate_limit_weight() -> u32 {
1
}
Expand All @@ -71,11 +99,99 @@ pub struct RpcSubscription {
pub merge_strategy: Option<MergeStrategy>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Validate, Debug)]
#[garde(allow_unvalidated)]
pub struct RpcDefinitions {
#[garde(dive)]
pub methods: Vec<RpcMethod>,
#[serde(default)]
pub subscriptions: Vec<RpcSubscription>,
#[serde(default)]
pub aliases: Vec<(String, String)>,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn validate_params_succeeds_for_valid_params() {
let valid_params = vec![
MethodParam {
name: "param1".to_string(),
ty: "u64".to_string(),
optional: false,
inject: false,
},
MethodParam {
name: "param2".to_string(),
ty: "u64".to_string(),
optional: true,
inject: false,
},
MethodParam {
name: "param3".to_string(),
ty: "u64".to_string(),
optional: true,
inject: false,
},
];
let method_name = "test";
let test_fn = validate_params_with_name(method_name);
assert!(test_fn(&valid_params, &()).is_ok());
}

#[test]
fn validate_params_fails_for_more_than_one_param_has_inject_equals_true() {
let another_invalid_params = vec![
MethodParam {
name: "param1".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
MethodParam {
name: "param2".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
MethodParam {
name: "param3".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
];
let method_name = "test";
let test_fn = validate_params_with_name(method_name);
assert!(test_fn(&another_invalid_params, &()).is_err());
}

#[test]
fn validate_params_fails_for_optional_params_are_not_the_last() {
let method_name = "test";
let invalid_params = vec![
MethodParam {
name: "param1".to_string(),
ty: "u64".to_string(),
optional: false,
inject: false,
},
MethodParam {
name: "param2".to_string(),
ty: "u64".to_string(),
optional: true,
inject: false,
},
MethodParam {
name: "param3".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
];
let test_fn = validate_params_with_name(method_name);
assert!(test_fn(&invalid_params, &()).is_err());
}
}
4 changes: 3 additions & 1 deletion src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ impl Drop for Client {
}
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Validate, Debug)]
#[garde(allow_unvalidated)]
pub struct ClientConfig {
#[garde(inner(custom(validate_endpoint)))]
pub endpoints: Vec<String>,
#[serde(default = "bool_true")]
pub shuffle_endpoints: bool,
Expand Down
9 changes: 6 additions & 3 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ impl ExtensionRegistry {
macro_rules! define_all_extensions {
(
$(
$ext_name:ident: $ext_type:ty
$(#[$attr:meta])* $ext_name:ident: $ext_type:ty
),* $(,)?
) => {
#[derive(Deserialize, Debug, Default)]
use garde::Validate;
#[derive(Deserialize, Debug, Validate, Default)]
#[garde(allow_unvalidated)]
pub struct ExtensionsConfig {
$(
#[serde(default)]
$(#[$attr])*
pub $ext_name: Option<<$ext_type as Extension>::Config>,
)*
}
Expand Down Expand Up @@ -133,6 +135,7 @@ macro_rules! define_all_extensions {
define_all_extensions! {
telemetry: telemetry::Telemetry,
cache: cache::Cache,
#[garde(dive)]
client: client::Client,
merge_subscription: merge_subscription::MergeSubscription,
substrate_api: api::SubstrateApi,
Expand Down
7 changes: 7 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ async fn main() -> anyhow::Result<()> {
let config = subway::config::read_config(&cli.config)?;

subway::logger::enable_logger();
let cli = subway::cli::parse_args();
let config = subway::config::read_config(&cli.config)?;
tracing::trace!("{:#?}", config);
subway::config::validate(&config).await?;
// early return if we're just validating the config
if cli.is_validate() {
return Ok(());
}

let subway_server = subway::server::build(config).await?;
tracing::info!("Server running at {}", subway_server.addr);
Expand Down
57 changes: 57 additions & 0 deletions tests/configs/broken_endpoints.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
extensions:
client:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
- wss://example.com
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: system_health
response: # response contains { isSyncing: false }
!contains
- - isSyncing
- !eq false
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
telemetry:
provider: none
cache:
default_ttl_seconds: 60
default_size: 500
merge_subscription:
keep_alive_seconds: 60
server:
port: 9944
listen_address: '0.0.0.0'
max_connections: 2000
http_methods:
- path: /health
method: system_health
- path: /liveness
method: chain_getBlockHash
cors: all
rate_limit: # these are for demo purpose only, please adjust to your needs
connection: # 20 RPC requests per second per connection
burst: 20
period_secs: 1
ip: # 500 RPC requests per 10 seconds per ip
burst: 500
period_secs: 10
# use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer).
# WARNING: Use with caution, as this xff header can be forged.
use_xff: true # default is false

middlewares:
methods:
- delay
- response
- inject_params
- cache
- upstream
subscriptions:
- merge_subscription
- upstream

rpcs: substrate
56 changes: 56 additions & 0 deletions tests/configs/config_with_env.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
extensions:
client:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: system_health
response: # response contains { isSyncing: false }
!contains
- - isSyncing
- !eq false
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
telemetry:
provider: none
cache:
default_ttl_seconds: 60
default_size: 500
merge_subscription:
keep_alive_seconds: 60
server:
port: ${SUBWAY_PORT:-9944}
listen_address: '0.0.0.0'
max_connections: ${SUBWAY_MAX_CONNECTIONS:-2000}
http_methods:
- path: /health
method: system_health
- path: /liveness
method: chain_getBlockHash
cors: all
rate_limit: # these are for demo purpose only, please adjust to your needs
connection: # 20 RPC requests per second per connection
burst: 20
period_secs: 1
ip: # 500 RPC requests per 10 seconds per ip
burst: 500
period_secs: 10
# use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer).
# WARNING: Use with caution, as this xff header can be forged.
use_xff: true # default is false

middlewares:
methods:
- delay
- response
- inject_params
- cache
- upstream
subscriptions:
- merge_subscription
- upstream

rpcs: substrate
Loading