Skip to content

Commit

Permalink
Validator task retries (#3361)
Browse files Browse the repository at this point in the history
### Description

- Makes validator tasks infallible by adding retries 
- This fixes a bug where certain tasks would return an error an shut
down, affecting liveness. This was desired in the relayer since we don't
want individual chain failures to affect the liveness of other chains.
Now validator tasks either terminate or panic, and panicking will be
propagated by `try_join_all`, causing the agent to shut down.
- A thing to keep in mind in general is that agents will only terminate
if a task panics. If it returns an `Err` but doesn't panic, the task
won't be respawned. We should consider the implications of this in the
scraper too. If this isn't desired, should consider using `select_all!`

### Drive-by changes

- retry logic is moved from
`rust/chains/hyperlane-cosmos/src/providers/rpc.rs` into
`rust/hyperlane-core/src/rpc_clients/retry.rs`, so we're even closer to
turning it into a retrying provider
- changes several fn signatures within the validator to now return
`ChainCommunicationError`s instead of `eyre::Report`s, for compatibility
with the retry logic. Also makes `DbError` convertible to
`ChainCommunicationError`, for the same reason. This achieves some
progress on
#2878
- allows the backfill checkpoint submitter task to terminate, since
`try_join_all` is tolerant of this (as described above)

### Related issues

- Fixes #3349


### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
  • Loading branch information
daniel-savu authored Mar 6, 2024
1 parent 16c0fb1 commit dd8ac43
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 81 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 41 additions & 27 deletions rust/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec;

use eyre::{bail, Result};
use hyperlane_core::MerkleTreeHook;
use hyperlane_core::rpc_clients::call_and_retry_indefinitely;
use hyperlane_core::{ChainCommunicationError, ChainResult, MerkleTreeHook};
use prometheus::IntGauge;
use tokio::time::sleep;
use tracing::{debug, info};
use tracing::{error, instrument};
use tracing::{debug, error, info};

use hyperlane_base::{db::HyperlaneRocksDB, CheckpointSyncer, CoreMetrics};
use hyperlane_core::{
Expand Down Expand Up @@ -60,29 +59,28 @@ impl ValidatorSubmitter {

/// Submits signed checkpoints from index 0 until the target checkpoint (inclusive).
/// Runs idly forever once the target checkpoint is reached to avoid exiting the task.
#[instrument(err, skip(self), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn backfill_checkpoint_submitter(
self,
target_checkpoint: Checkpoint,
) -> Result<()> {
pub(crate) async fn backfill_checkpoint_submitter(self, target_checkpoint: Checkpoint) {
let mut tree = IncrementalMerkle::default();
self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint)
.await?;
call_and_retry_indefinitely(|| {
let target_checkpoint = target_checkpoint;
let self_clone = self.clone();
Box::pin(async move {
self_clone
.submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint)
.await?;
Ok(())
})
})
.await;

info!(
?target_checkpoint,
"Backfill checkpoint submitter successfully reached target checkpoint"
);

// TODO: remove this once validator is tolerant of tasks exiting.
loop {
sleep(Duration::from_secs(u64::MAX)).await;
}
}

/// Submits signed checkpoints indefinitely, starting from the `tree`.
#[instrument(err, skip(self, tree), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn checkpoint_submitter(self, mut tree: IncrementalMerkle) -> Result<()> {
pub(crate) async fn checkpoint_submitter(self, mut tree: IncrementalMerkle) {
// How often to log checkpoint info - once every minute
let checkpoint_info_log_period = Duration::from_secs(60);
// The instant in which we last logged checkpoint info, if at all
Expand All @@ -102,10 +100,12 @@ impl ValidatorSubmitter {

loop {
// Lag by reorg period because this is our correctness checkpoint.
let latest_checkpoint = self
.merkle_tree_hook
.latest_checkpoint(self.reorg_period)
.await?;
let latest_checkpoint = call_and_retry_indefinitely(|| {
let merkle_tree_hook = self.merkle_tree_hook.clone();
Box::pin(async move { merkle_tree_hook.latest_checkpoint(self.reorg_period).await })
})
.await;

self.metrics
.latest_checkpoint_observed
.set(latest_checkpoint.index as i64);
Expand Down Expand Up @@ -133,8 +133,20 @@ impl ValidatorSubmitter {
continue;
}

self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &latest_checkpoint)
.await?;
tree = call_and_retry_indefinitely(|| {
let mut tree = tree;
let self_clone = self.clone();
Box::pin(async move {
self_clone
.submit_checkpoints_until_correctness_checkpoint(
&mut tree,
&latest_checkpoint,
)
.await?;
Ok(tree)
})
})
.await;

self.metrics
.latest_checkpoint_processed
Expand All @@ -150,7 +162,7 @@ impl ValidatorSubmitter {
&self,
tree: &mut IncrementalMerkle,
correctness_checkpoint: &Checkpoint,
) -> Result<()> {
) -> ChainResult<()> {
// This should never be called with a tree that is ahead of the correctness checkpoint.
assert!(
!tree_exceeds_checkpoint(correctness_checkpoint, tree),
Expand Down Expand Up @@ -213,7 +225,9 @@ impl ValidatorSubmitter {
?correctness_checkpoint,
"Incorrect tree root, something went wrong"
);
bail!("Incorrect tree root, something went wrong");
return Err(ChainCommunicationError::CustomError(
"Incorrect tree root, something went wrong".to_string(),
));
}

if !checkpoint_queue.is_empty() {
Expand All @@ -238,7 +252,7 @@ impl ValidatorSubmitter {
async fn sign_and_submit_checkpoints(
&self,
checkpoints: Vec<CheckpointWithMessageId>,
) -> Result<()> {
) -> ChainResult<()> {
let last_checkpoint = checkpoints.as_slice()[checkpoints.len() - 1];

for queued_checkpoint in checkpoints {
Expand Down
9 changes: 3 additions & 6 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ impl BaseAgent for Validator {
.expect("Failed to create server");
let server_task = tokio::spawn(async move {
server.run(routes);
Ok(())
})
.instrument(info_span!("Validator server"));
tasks.push(server_task);
Expand All @@ -150,7 +149,6 @@ impl BaseAgent for Validator {
tasks.push(
tokio::spawn(async move {
signer_instance.run().await;
Ok(())
})
.instrument(info_span!("SingletonSigner")),
);
Expand All @@ -168,7 +166,6 @@ impl BaseAgent for Validator {
tasks.push(
tokio::spawn(async move {
metrics_updater.spawn().await.unwrap();
Ok(())
})
.instrument(info_span!("MetricsUpdater")),
);
Expand Down Expand Up @@ -200,14 +197,15 @@ impl BaseAgent for Validator {
}
}

// Note that this only returns an error if one of the tasks panics
if let Err(err) = try_join_all(tasks).await {
error!(?err, "One of the validator tasks returned an error");
}
}
}

impl Validator {
async fn run_merkle_tree_hook_sync(&self) -> Instrumented<JoinHandle<Result<()>>> {
async fn run_merkle_tree_hook_sync(&self) -> Instrumented<JoinHandle<()>> {
let index_settings =
self.as_ref().settings.chains[self.origin_chain.name()].index_settings();
let contract_sync = self.merkle_tree_hook_sync.clone();
Expand All @@ -216,12 +214,11 @@ impl Validator {
.await;
tokio::spawn(async move {
contract_sync.clone().sync("merkle_tree_hook", cursor).await;
Ok(())
})
.instrument(info_span!("MerkleTreeHookSyncer"))
}

async fn run_checkpoint_submitters(&self) -> Vec<Instrumented<JoinHandle<Result<()>>>> {
async fn run_checkpoint_submitters(&self) -> Vec<Instrumented<JoinHandle<()>>> {
let submitter = ValidatorSubmitter::new(
self.interval,
self.reorg_period,
Expand Down
3 changes: 0 additions & 3 deletions rust/chains/hyperlane-cosmos/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ pub enum HyperlaneCosmosError {
/// Fallback providers failed
#[error("Fallback providers failed. (Errors: {0:?})")]
FallbackProvidersFailed(Vec<HyperlaneCosmosError>),
/// Custom error
#[error("{0}")]
CustomError(String),
}

impl From<HyperlaneCosmosError> for ChainCommunicationError {
Expand Down
49 changes: 8 additions & 41 deletions rust/chains/hyperlane-cosmos/src/providers/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
use async_trait::async_trait;
use cosmrs::rpc::client::Client;
use futures::Future;
use hyperlane_core::rpc_clients::call_with_retry;
use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, LogMeta, H256, U256};
use sha256::digest;
use std::pin::Pin;
use std::time::Duration;
use tendermint::abci::{Event, EventAttribute};
use tendermint::hash::Algorithm;
use tendermint::Hash;
use tendermint_rpc::endpoint::block::Response as BlockResponse;
use tendermint_rpc::endpoint::block_results::Response as BlockResultsResponse;
use tendermint_rpc::HttpClient;
use tokio::time::sleep;
use tracing::{debug, instrument, trace};

use crate::address::CosmosAddress;
use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError};

const MAX_RPC_RETRIES: usize = 10;
const RPC_RETRY_SLEEP_DURATION: Duration = Duration::from_secs(2);

#[async_trait]
/// Trait for wasm indexer. Use rpc provider
pub trait WasmIndexer: Send + Sync {
Expand Down Expand Up @@ -118,28 +112,6 @@ impl CosmosWasmIndexer {
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}

// TODO: Refactor this function into a retrying provider. Once the watermark cursor is refactored, retrying should no longer
// be required here if the error is propagated.
#[instrument(err, skip(f))]
async fn call_with_retry<T>(
mut f: impl FnMut() -> Pin<Box<dyn Future<Output = ChainResult<T>> + Send>>,
) -> ChainResult<T> {
for retry_number in 1..MAX_RPC_RETRIES {
match f().await {
Ok(res) => return Ok(res),
Err(err) => {
debug!(retries=retry_number, error=?err, "Retrying call");
sleep(RPC_RETRY_SLEEP_DURATION).await;
}
}
}

// TODO: Return the last error, or a vec of all the error instead of this string error
Err(ChainCommunicationError::from_other(
HyperlaneCosmosError::CustomError("Retrying call failed".to_string()),
))
}
}

impl CosmosWasmIndexer {
Expand Down Expand Up @@ -241,10 +213,9 @@ impl CosmosWasmIndexer {
impl WasmIndexer for CosmosWasmIndexer {
#[instrument(err, skip(self))]
async fn get_finalized_block_number(&self) -> ChainResult<u32> {
let latest_block = Self::call_with_retry(move || {
Box::pin(Self::get_latest_block(self.provider.rpc().clone()))
})
.await?;
let latest_block =
call_with_retry(move || Box::pin(Self::get_latest_block(self.provider.rpc().clone())))
.await?;
let latest_height: u32 = latest_block
.block
.header
Expand All @@ -266,15 +237,11 @@ impl WasmIndexer for CosmosWasmIndexer {
{
let client = self.provider.rpc().clone();

let (block_res, block_results_res) = tokio::join!(
Self::call_with_retry(|| { Box::pin(Self::get_block(client.clone(), block_number)) }),
Self::call_with_retry(|| {
Box::pin(Self::get_block_results(client.clone(), block_number))
}),
let (block, block_results) = tokio::join!(
call_with_retry(|| { Box::pin(Self::get_block(client.clone(), block_number)) }),
call_with_retry(|| { Box::pin(Self::get_block_results(client.clone(), block_number)) }),
);
let block = block_res.map_err(ChainCommunicationError::from_other)?;
let block_results = block_results_res.map_err(ChainCommunicationError::from_other)?;

Ok(self.handle_txs(block, block_results, parser))
Ok(self.handle_txs(block?, block_results?, parser))
}
}
4 changes: 3 additions & 1 deletion rust/hyperlane-base/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{env, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use eyre::Result;
use hyperlane_core::config::*;
use tracing::info;

use crate::{
create_chain_metrics,
Expand Down Expand Up @@ -79,7 +80,8 @@ pub async fn agent_main<A: BaseAgent>() -> Result<()> {
let chain_metrics = create_chain_metrics(&metrics)?;
let agent = A::from_settings(settings, metrics.clone(), agent_metrics, chain_metrics).await?;

// This await will never end unless a panic occurs
// This await will only end if a panic happens. We won't crash, but instead gracefully shut down
agent.run().await;
info!(agent = A::AGENT_NAME, "Shutting down agent...");
Ok(())
}
8 changes: 7 additions & 1 deletion rust/hyperlane-base/src/db/rocks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::PathBuf;
use std::{io, path::Path, sync::Arc};

use hyperlane_core::HyperlaneProtocolError;
use hyperlane_core::{ChainCommunicationError, HyperlaneProtocolError};
use rocksdb::{Options, DB as Rocks};
use tracing::info;

Expand Down Expand Up @@ -58,6 +58,12 @@ pub enum DbError {
HyperlaneError(#[from] HyperlaneProtocolError),
}

impl From<DbError> for ChainCommunicationError {
fn from(value: DbError) -> Self {
ChainCommunicationError::from_other(value)
}
}

type Result<T> = std::result::Result<T, DbError>;

impl DB {
Expand Down
3 changes: 2 additions & 1 deletion rust/hyperlane-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ethers-core = { workspace = true, optional = true }
ethers-providers = { workspace = true, optional = true }
eyre.workspace = true
fixed-hash.workspace = true
futures = { workspace = true, optional = true }
getrandom.workspace = true
hex.workspace = true
itertools.workspace = true
Expand Down Expand Up @@ -55,4 +56,4 @@ agent = ["ethers", "strum"]
strum = ["dep:strum"]
ethers = ["dep:ethers-core", "dep:ethers-contract", "dep:ethers-providers", "dep:primitive-types"]
solana = ["dep:solana-sdk"]
async = ["tokio"]
async = ["tokio", "futures"]
12 changes: 11 additions & 1 deletion rust/hyperlane-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use crate::config::StrOrIntParseError;
use crate::rpc_clients::RpcClientError;
use std::string::FromUtf8Error;

use crate::{Error as PrimitiveTypeError, HyperlaneProviderError, H256, U256};
use crate::HyperlaneProviderError;
use crate::{Error as PrimitiveTypeError, HyperlaneSignerError, H256, U256};

/// The result of interacting with a chain.
pub type ChainResult<T> = Result<T, ChainCommunicationError>;
Expand Down Expand Up @@ -141,6 +142,15 @@ pub enum ChainCommunicationError {
#[cfg(feature = "async")]
#[error(transparent)]
TokioJoinError(#[from] tokio::task::JoinError),
/// Custom error
#[error("{0}")]
CustomError(String),
/// Eyre error
#[error("{0}")]
EyreError(#[from] eyre::Report),
/// Hyperlane signer error
#[error("{0}")]
HyperlaneSignerError(#[from] HyperlaneSignerError),
}

impl ChainCommunicationError {
Expand Down
6 changes: 6 additions & 0 deletions rust/hyperlane-core/src/rpc_clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ pub use self::error::*;
#[cfg(feature = "async")]
pub use self::fallback::*;

#[cfg(feature = "async")]
pub use self::retry::*;

mod error;
#[cfg(feature = "async")]
mod fallback;

#[cfg(feature = "async")]
mod retry;
Loading

0 comments on commit dd8ac43

Please sign in to comment.