From dd8ac43c9776050d81735afb1ef06bf94dbae047 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Wed, 6 Mar 2024 11:29:55 +0000 Subject: [PATCH] Validator task retries (#3361) ### Description - Makes validator tasks infallible by adding retries - This fixes a bug where certain tasks would return an error an shut down, affecting liveness. This was desired in the relayer since we don't want individual chain failures to affect the liveness of other chains. Now validator tasks either terminate or panic, and panicking will be propagated by `try_join_all`, causing the agent to shut down. - A thing to keep in mind in general is that agents will only terminate if a task panics. If it returns an `Err` but doesn't panic, the task won't be respawned. We should consider the implications of this in the scraper too. If this isn't desired, should consider using `select_all!` ### Drive-by changes - retry logic is moved from `rust/chains/hyperlane-cosmos/src/providers/rpc.rs` into `rust/hyperlane-core/src/rpc_clients/retry.rs`, so we're even closer to turning it into a retrying provider - changes several fn signatures within the validator to now return `ChainCommunicationError`s instead of `eyre::Report`s, for compatibility with the retry logic. Also makes `DbError` convertible to `ChainCommunicationError`, for the same reason. This achieves some progress on https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/2878 - allows the backfill checkpoint submitter task to terminate, since `try_join_all` is tolerant of this (as described above) ### Related issues - Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3349 ### Backward compatibility ### Testing --- rust/Cargo.lock | 1 + rust/agents/validator/src/submit.rs | 68 +++++++++++-------- rust/agents/validator/src/validator.rs | 9 +-- rust/chains/hyperlane-cosmos/src/error.rs | 3 - .../hyperlane-cosmos/src/providers/rpc.rs | 49 +++---------- rust/hyperlane-base/src/agent.rs | 4 +- rust/hyperlane-base/src/db/rocks/mod.rs | 8 ++- rust/hyperlane-core/Cargo.toml | 3 +- rust/hyperlane-core/src/error.rs | 12 +++- rust/hyperlane-core/src/rpc_clients/mod.rs | 6 ++ rust/hyperlane-core/src/rpc_clients/retry.rs | 51 ++++++++++++++ 11 files changed, 133 insertions(+), 81 deletions(-) create mode 100644 rust/hyperlane-core/src/rpc_clients/retry.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index c40386c2b0..43350301df 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4180,6 +4180,7 @@ dependencies = [ "ethers-providers", "eyre", "fixed-hash 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures", "getrandom 0.2.12", "hex 0.4.3", "itertools 0.12.0", diff --git a/rust/agents/validator/src/submit.rs b/rust/agents/validator/src/submit.rs index 3de0a798af..bc040ed56c 100644 --- a/rust/agents/validator/src/submit.rs +++ b/rust/agents/validator/src/submit.rs @@ -3,12 +3,11 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec; -use eyre::{bail, Result}; -use hyperlane_core::MerkleTreeHook; +use hyperlane_core::rpc_clients::call_and_retry_indefinitely; +use hyperlane_core::{ChainCommunicationError, ChainResult, MerkleTreeHook}; use prometheus::IntGauge; use tokio::time::sleep; -use tracing::{debug, info}; -use tracing::{error, instrument}; +use tracing::{debug, error, info}; use hyperlane_base::{db::HyperlaneRocksDB, CheckpointSyncer, CoreMetrics}; use hyperlane_core::{ @@ -60,29 +59,28 @@ impl ValidatorSubmitter { /// Submits signed checkpoints from index 0 until the target checkpoint (inclusive). /// Runs idly forever once the target checkpoint is reached to avoid exiting the task. - #[instrument(err, skip(self), fields(domain=%self.merkle_tree_hook.domain()))] - pub(crate) async fn backfill_checkpoint_submitter( - self, - target_checkpoint: Checkpoint, - ) -> Result<()> { + pub(crate) async fn backfill_checkpoint_submitter(self, target_checkpoint: Checkpoint) { let mut tree = IncrementalMerkle::default(); - self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint) - .await?; + call_and_retry_indefinitely(|| { + let target_checkpoint = target_checkpoint; + let self_clone = self.clone(); + Box::pin(async move { + self_clone + .submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint) + .await?; + Ok(()) + }) + }) + .await; info!( ?target_checkpoint, "Backfill checkpoint submitter successfully reached target checkpoint" ); - - // TODO: remove this once validator is tolerant of tasks exiting. - loop { - sleep(Duration::from_secs(u64::MAX)).await; - } } /// Submits signed checkpoints indefinitely, starting from the `tree`. - #[instrument(err, skip(self, tree), fields(domain=%self.merkle_tree_hook.domain()))] - pub(crate) async fn checkpoint_submitter(self, mut tree: IncrementalMerkle) -> Result<()> { + pub(crate) async fn checkpoint_submitter(self, mut tree: IncrementalMerkle) { // How often to log checkpoint info - once every minute let checkpoint_info_log_period = Duration::from_secs(60); // The instant in which we last logged checkpoint info, if at all @@ -102,10 +100,12 @@ impl ValidatorSubmitter { loop { // Lag by reorg period because this is our correctness checkpoint. - let latest_checkpoint = self - .merkle_tree_hook - .latest_checkpoint(self.reorg_period) - .await?; + let latest_checkpoint = call_and_retry_indefinitely(|| { + let merkle_tree_hook = self.merkle_tree_hook.clone(); + Box::pin(async move { merkle_tree_hook.latest_checkpoint(self.reorg_period).await }) + }) + .await; + self.metrics .latest_checkpoint_observed .set(latest_checkpoint.index as i64); @@ -133,8 +133,20 @@ impl ValidatorSubmitter { continue; } - self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &latest_checkpoint) - .await?; + tree = call_and_retry_indefinitely(|| { + let mut tree = tree; + let self_clone = self.clone(); + Box::pin(async move { + self_clone + .submit_checkpoints_until_correctness_checkpoint( + &mut tree, + &latest_checkpoint, + ) + .await?; + Ok(tree) + }) + }) + .await; self.metrics .latest_checkpoint_processed @@ -150,7 +162,7 @@ impl ValidatorSubmitter { &self, tree: &mut IncrementalMerkle, correctness_checkpoint: &Checkpoint, - ) -> Result<()> { + ) -> ChainResult<()> { // This should never be called with a tree that is ahead of the correctness checkpoint. assert!( !tree_exceeds_checkpoint(correctness_checkpoint, tree), @@ -213,7 +225,9 @@ impl ValidatorSubmitter { ?correctness_checkpoint, "Incorrect tree root, something went wrong" ); - bail!("Incorrect tree root, something went wrong"); + return Err(ChainCommunicationError::CustomError( + "Incorrect tree root, something went wrong".to_string(), + )); } if !checkpoint_queue.is_empty() { @@ -238,7 +252,7 @@ impl ValidatorSubmitter { async fn sign_and_submit_checkpoints( &self, checkpoints: Vec, - ) -> Result<()> { + ) -> ChainResult<()> { let last_checkpoint = checkpoints.as_slice()[checkpoints.len() - 1]; for queued_checkpoint in checkpoints { diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index 19c1ccf02c..1f9b86effa 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -141,7 +141,6 @@ impl BaseAgent for Validator { .expect("Failed to create server"); let server_task = tokio::spawn(async move { server.run(routes); - Ok(()) }) .instrument(info_span!("Validator server")); tasks.push(server_task); @@ -150,7 +149,6 @@ impl BaseAgent for Validator { tasks.push( tokio::spawn(async move { signer_instance.run().await; - Ok(()) }) .instrument(info_span!("SingletonSigner")), ); @@ -168,7 +166,6 @@ impl BaseAgent for Validator { tasks.push( tokio::spawn(async move { metrics_updater.spawn().await.unwrap(); - Ok(()) }) .instrument(info_span!("MetricsUpdater")), ); @@ -200,6 +197,7 @@ impl BaseAgent for Validator { } } + // Note that this only returns an error if one of the tasks panics if let Err(err) = try_join_all(tasks).await { error!(?err, "One of the validator tasks returned an error"); } @@ -207,7 +205,7 @@ impl BaseAgent for Validator { } impl Validator { - async fn run_merkle_tree_hook_sync(&self) -> Instrumented>> { + async fn run_merkle_tree_hook_sync(&self) -> Instrumented> { let index_settings = self.as_ref().settings.chains[self.origin_chain.name()].index_settings(); let contract_sync = self.merkle_tree_hook_sync.clone(); @@ -216,12 +214,11 @@ impl Validator { .await; tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await; - Ok(()) }) .instrument(info_span!("MerkleTreeHookSyncer")) } - async fn run_checkpoint_submitters(&self) -> Vec>>> { + async fn run_checkpoint_submitters(&self) -> Vec>> { let submitter = ValidatorSubmitter::new( self.interval, self.reorg_period, diff --git a/rust/chains/hyperlane-cosmos/src/error.rs b/rust/chains/hyperlane-cosmos/src/error.rs index cb8ff28cd9..d266d317c4 100644 --- a/rust/chains/hyperlane-cosmos/src/error.rs +++ b/rust/chains/hyperlane-cosmos/src/error.rs @@ -47,9 +47,6 @@ pub enum HyperlaneCosmosError { /// Fallback providers failed #[error("Fallback providers failed. (Errors: {0:?})")] FallbackProvidersFailed(Vec), - /// Custom error - #[error("{0}")] - CustomError(String), } impl From for ChainCommunicationError { diff --git a/rust/chains/hyperlane-cosmos/src/providers/rpc.rs b/rust/chains/hyperlane-cosmos/src/providers/rpc.rs index df4b4f686d..9a1efd385c 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/rpc.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/rpc.rs @@ -1,25 +1,19 @@ use async_trait::async_trait; use cosmrs::rpc::client::Client; -use futures::Future; +use hyperlane_core::rpc_clients::call_with_retry; use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, LogMeta, H256, U256}; use sha256::digest; -use std::pin::Pin; -use std::time::Duration; use tendermint::abci::{Event, EventAttribute}; use tendermint::hash::Algorithm; use tendermint::Hash; use tendermint_rpc::endpoint::block::Response as BlockResponse; use tendermint_rpc::endpoint::block_results::Response as BlockResultsResponse; use tendermint_rpc::HttpClient; -use tokio::time::sleep; use tracing::{debug, instrument, trace}; use crate::address::CosmosAddress; use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError}; -const MAX_RPC_RETRIES: usize = 10; -const RPC_RETRY_SLEEP_DURATION: Duration = Duration::from_secs(2); - #[async_trait] /// Trait for wasm indexer. Use rpc provider pub trait WasmIndexer: Send + Sync { @@ -118,28 +112,6 @@ impl CosmosWasmIndexer { .await .map_err(Into::::into)?) } - - // TODO: Refactor this function into a retrying provider. Once the watermark cursor is refactored, retrying should no longer - // be required here if the error is propagated. - #[instrument(err, skip(f))] - async fn call_with_retry( - mut f: impl FnMut() -> Pin> + Send>>, - ) -> ChainResult { - for retry_number in 1..MAX_RPC_RETRIES { - match f().await { - Ok(res) => return Ok(res), - Err(err) => { - debug!(retries=retry_number, error=?err, "Retrying call"); - sleep(RPC_RETRY_SLEEP_DURATION).await; - } - } - } - - // TODO: Return the last error, or a vec of all the error instead of this string error - Err(ChainCommunicationError::from_other( - HyperlaneCosmosError::CustomError("Retrying call failed".to_string()), - )) - } } impl CosmosWasmIndexer { @@ -241,10 +213,9 @@ impl CosmosWasmIndexer { impl WasmIndexer for CosmosWasmIndexer { #[instrument(err, skip(self))] async fn get_finalized_block_number(&self) -> ChainResult { - let latest_block = Self::call_with_retry(move || { - Box::pin(Self::get_latest_block(self.provider.rpc().clone())) - }) - .await?; + let latest_block = + call_with_retry(move || Box::pin(Self::get_latest_block(self.provider.rpc().clone()))) + .await?; let latest_height: u32 = latest_block .block .header @@ -266,15 +237,11 @@ impl WasmIndexer for CosmosWasmIndexer { { let client = self.provider.rpc().clone(); - let (block_res, block_results_res) = tokio::join!( - Self::call_with_retry(|| { Box::pin(Self::get_block(client.clone(), block_number)) }), - Self::call_with_retry(|| { - Box::pin(Self::get_block_results(client.clone(), block_number)) - }), + let (block, block_results) = tokio::join!( + call_with_retry(|| { Box::pin(Self::get_block(client.clone(), block_number)) }), + call_with_retry(|| { Box::pin(Self::get_block_results(client.clone(), block_number)) }), ); - let block = block_res.map_err(ChainCommunicationError::from_other)?; - let block_results = block_results_res.map_err(ChainCommunicationError::from_other)?; - Ok(self.handle_txs(block, block_results, parser)) + Ok(self.handle_txs(block?, block_results?, parser)) } } diff --git a/rust/hyperlane-base/src/agent.rs b/rust/hyperlane-base/src/agent.rs index b11a2a00f5..5f6b504e9a 100644 --- a/rust/hyperlane-base/src/agent.rs +++ b/rust/hyperlane-base/src/agent.rs @@ -3,6 +3,7 @@ use std::{env, fmt::Debug, sync::Arc}; use async_trait::async_trait; use eyre::Result; use hyperlane_core::config::*; +use tracing::info; use crate::{ create_chain_metrics, @@ -79,7 +80,8 @@ pub async fn agent_main() -> Result<()> { let chain_metrics = create_chain_metrics(&metrics)?; let agent = A::from_settings(settings, metrics.clone(), agent_metrics, chain_metrics).await?; - // This await will never end unless a panic occurs + // This await will only end if a panic happens. We won't crash, but instead gracefully shut down agent.run().await; + info!(agent = A::AGENT_NAME, "Shutting down agent..."); Ok(()) } diff --git a/rust/hyperlane-base/src/db/rocks/mod.rs b/rust/hyperlane-base/src/db/rocks/mod.rs index 1cec492dec..e9a626a157 100644 --- a/rust/hyperlane-base/src/db/rocks/mod.rs +++ b/rust/hyperlane-base/src/db/rocks/mod.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use std::{io, path::Path, sync::Arc}; -use hyperlane_core::HyperlaneProtocolError; +use hyperlane_core::{ChainCommunicationError, HyperlaneProtocolError}; use rocksdb::{Options, DB as Rocks}; use tracing::info; @@ -58,6 +58,12 @@ pub enum DbError { HyperlaneError(#[from] HyperlaneProtocolError), } +impl From for ChainCommunicationError { + fn from(value: DbError) -> Self { + ChainCommunicationError::from_other(value) + } +} + type Result = std::result::Result; impl DB { diff --git a/rust/hyperlane-core/Cargo.toml b/rust/hyperlane-core/Cargo.toml index d9bffe576e..5f34bc2091 100644 --- a/rust/hyperlane-core/Cargo.toml +++ b/rust/hyperlane-core/Cargo.toml @@ -26,6 +26,7 @@ ethers-core = { workspace = true, optional = true } ethers-providers = { workspace = true, optional = true } eyre.workspace = true fixed-hash.workspace = true +futures = { workspace = true, optional = true } getrandom.workspace = true hex.workspace = true itertools.workspace = true @@ -55,4 +56,4 @@ agent = ["ethers", "strum"] strum = ["dep:strum"] ethers = ["dep:ethers-core", "dep:ethers-contract", "dep:ethers-providers", "dep:primitive-types"] solana = ["dep:solana-sdk"] -async = ["tokio"] +async = ["tokio", "futures"] diff --git a/rust/hyperlane-core/src/error.rs b/rust/hyperlane-core/src/error.rs index 12d51a4653..3eb1ea5b58 100644 --- a/rust/hyperlane-core/src/error.rs +++ b/rust/hyperlane-core/src/error.rs @@ -10,7 +10,8 @@ use crate::config::StrOrIntParseError; use crate::rpc_clients::RpcClientError; use std::string::FromUtf8Error; -use crate::{Error as PrimitiveTypeError, HyperlaneProviderError, H256, U256}; +use crate::HyperlaneProviderError; +use crate::{Error as PrimitiveTypeError, HyperlaneSignerError, H256, U256}; /// The result of interacting with a chain. pub type ChainResult = Result; @@ -141,6 +142,15 @@ pub enum ChainCommunicationError { #[cfg(feature = "async")] #[error(transparent)] TokioJoinError(#[from] tokio::task::JoinError), + /// Custom error + #[error("{0}")] + CustomError(String), + /// Eyre error + #[error("{0}")] + EyreError(#[from] eyre::Report), + /// Hyperlane signer error + #[error("{0}")] + HyperlaneSignerError(#[from] HyperlaneSignerError), } impl ChainCommunicationError { diff --git a/rust/hyperlane-core/src/rpc_clients/mod.rs b/rust/hyperlane-core/src/rpc_clients/mod.rs index 846e508716..892718918f 100644 --- a/rust/hyperlane-core/src/rpc_clients/mod.rs +++ b/rust/hyperlane-core/src/rpc_clients/mod.rs @@ -3,6 +3,12 @@ pub use self::error::*; #[cfg(feature = "async")] pub use self::fallback::*; +#[cfg(feature = "async")] +pub use self::retry::*; + mod error; #[cfg(feature = "async")] mod fallback; + +#[cfg(feature = "async")] +mod retry; diff --git a/rust/hyperlane-core/src/rpc_clients/retry.rs b/rust/hyperlane-core/src/rpc_clients/retry.rs new file mode 100644 index 0000000000..b5c4e0bd72 --- /dev/null +++ b/rust/hyperlane-core/src/rpc_clients/retry.rs @@ -0,0 +1,51 @@ +use futures::Future; +use std::{pin::Pin, time::Duration}; +use tokio::time::sleep; +use tracing::{instrument, warn}; + +use crate::{ChainCommunicationError, ChainResult}; + +/// Max number of times to retry a call for +pub const DEFAULT_MAX_RPC_RETRIES: usize = 10; + +/// Duration to sleep between retries +pub const RPC_RETRY_SLEEP_DURATION: Duration = Duration::from_secs(2); + +// TODO: Refactor this function into a retrying provider +/// Retry calling a fallible async function a certain number of times, with a delay between each retry +#[instrument(err, skip(f))] +pub async fn call_and_retry_n_times( + mut f: impl FnMut() -> Pin> + Send>>, + n: usize, +) -> ChainResult { + for retry_number in 1..n { + match f().await { + Ok(res) => return Ok(res), + Err(err) => { + warn!(retries=retry_number, error=?err, "Retrying call"); + sleep(RPC_RETRY_SLEEP_DURATION).await; + } + } + } + + // TODO: Return the last error, or a vec of all the error instead of this string error + Err(ChainCommunicationError::CustomError( + "Retrying call failed".to_string(), + )) +} + +/// Retry calling a fallible async function a predefined number of times +#[instrument(err, skip(f))] +pub async fn call_with_retry( + f: impl FnMut() -> Pin> + Send>>, +) -> ChainResult { + call_and_retry_n_times(f, DEFAULT_MAX_RPC_RETRIES).await +} + +/// Retry calling a fallible async function indefinitely, until it succeeds +pub async fn call_and_retry_indefinitely( + f: impl FnMut() -> Pin> + Send>>, +) -> T { + // It's ok to unwrap, because `usize::MAX * RPC_RETRY_SLEEP_DURATION` means billions of years worth of retrying + call_and_retry_n_times(f, usize::MAX).await.unwrap() +}