diff --git a/rust/main/agents/relayer/src/relayer.rs b/rust/main/agents/relayer/src/relayer.rs index b1f013b6ae..4b037f7868 100644 --- a/rust/main/agents/relayer/src/relayer.rs +++ b/rust/main/agents/relayer/src/relayer.rs @@ -2,6 +2,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::{Debug, Formatter}, sync::Arc, + time::Duration, }; use async_trait::async_trait; @@ -10,7 +11,7 @@ use eyre::Result; use futures_util::future::try_join_all; use hyperlane_base::{ broadcast::BroadcastMpscSender, - db::{HyperlaneRocksDB, DB}, + db::{HyperlaneDb, HyperlaneRocksDB, DB}, metrics::{AgentMetrics, MetricsUpdater}, settings::{ChainConf, IndexSettings}, AgentMetadata, BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, @@ -526,16 +527,38 @@ impl Relayer { return tokio::spawn(async {}).instrument(info_span!("MerkleTreeHookSync")); } }; - tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + + let origin_clone = origin.clone(); + let origin_db = Arc::new(self.dbs.get(origin).unwrap().clone()); + let core_metrics = self.core_metrics.clone(); + + let hook_sync_handle = tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() .sync( "merkle_tree_hook", SyncOptions::new(Some(cursor), tx_id_receiver), ) - .await + .await; })) - .instrument(info_span!("MerkleTreeHookSync")) + .instrument(info_span!("MerkleTreeHookSync")); + + let index_updater_handle = + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + loop { + Self::update_highest_seen_tree_index(&core_metrics, &origin_db, &origin_clone) + .await; + + // sleep for 5 seconds, ie, 5 seconds between updates + tokio::time::sleep(Duration::from_secs(5)).await; + } + })) + .instrument(info_span!("MerkleTreeHighestSeenIndexUpdater")); + + tokio::spawn(async move { + let _ = tokio::join!(hook_sync_handle, index_updater_handle); + }) + .instrument(info_span!("MerkleTreeHookSync_with_IndexUpdater")) } fn run_message_processor( @@ -620,6 +643,23 @@ impl Relayer { })) .instrument(span) } + + async fn update_highest_seen_tree_index( + core_metrics: &CoreMetrics, + origin_db: &Arc, + origin: &HyperlaneDomain, + ) { + let highest_seen_tree_index = origin_db + .retrieve_highest_seen_tree_index() + .unwrap_or_default(); + + if let Some(highest_seen_tree_index) = highest_seen_tree_index { + core_metrics + .highest_seen_tree_index() + .with_label_values(&[origin.name()]) + .set(highest_seen_tree_index as i64); + } + } } #[cfg(test)] diff --git a/rust/main/hyperlane-base/src/db/mod.rs b/rust/main/hyperlane-base/src/db/mod.rs index 04a3e59cc6..26df5cd0ca 100644 --- a/rust/main/hyperlane-base/src/db/mod.rs +++ b/rust/main/hyperlane-base/src/db/mod.rs @@ -135,6 +135,11 @@ pub trait HyperlaneDb: Send + Sync { leaf_index: &u32, ) -> DbResult>; + fn store_highest_seen_tree_index(&self, index: &u32) -> DbResult<()>; + + /// Retrieve the highest seen tree index by the syncer + fn retrieve_highest_seen_tree_index(&self) -> DbResult>; + fn store_merkle_leaf_index_by_message_id( &self, message_id: &H256, diff --git a/rust/main/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/main/hyperlane-base/src/db/rocks/hyperlane_db.rs index 76a51bfe3c..b7f2ab3dc0 100644 --- a/rust/main/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/main/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -25,6 +25,7 @@ const NONCE_PROCESSED: &str = "nonce_processed_"; const GAS_PAYMENT_BY_SEQUENCE: &str = "gas_payment_by_sequence_"; const GAS_PAYMENT_BLOCK_BY_SEQUENCE: &str = "gas_payment_block_by_sequence_"; const HIGHEST_SEEN_MESSAGE_NONCE: &str = "highest_seen_message_nonce_"; +const HIGHEST_SEEN_TREE_INDEX: &str = "highest_seen_tree_index_"; const GAS_PAYMENT_FOR_MESSAGE_ID: &str = "gas_payment_sequence_for_message_id_v2_"; const GAS_PAYMENT_META_PROCESSED: &str = "gas_payment_meta_processed_v3_"; const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_v2_"; @@ -211,6 +212,12 @@ impl HyperlaneRocksDB { &insertion.index(), &insertion_block_number, )?; + + let current_highest = self.retrieve_highest_seen_tree_index()?.unwrap_or_default(); + if insertion.index() > current_highest { + self.store_highest_seen_tree_index(&insertion.index())?; + } + // Return true to indicate the tree insertion was processed Ok(true) } @@ -614,6 +621,14 @@ impl HyperlaneDb for HyperlaneRocksDB { self.retrieve_value_by_key(MERKLE_TREE_INSERTION, leaf_index) } + fn retrieve_highest_seen_tree_index(&self) -> DbResult> { + self.retrieve_value_by_key(HIGHEST_SEEN_TREE_INDEX, &bool::default()) + } + + fn store_highest_seen_tree_index(&self, index: &u32) -> DbResult<()> { + self.store_value_by_key(HIGHEST_SEEN_TREE_INDEX, &bool::default(), index) + } + fn store_merkle_leaf_index_by_message_id( &self, message_id: &H256, diff --git a/rust/main/hyperlane-base/src/metrics/core.rs b/rust/main/hyperlane-base/src/metrics/core.rs index d3cbaf3db4..5bed85b60d 100644 --- a/rust/main/hyperlane-base/src/metrics/core.rs +++ b/rust/main/hyperlane-base/src/metrics/core.rs @@ -39,6 +39,7 @@ pub struct CoreMetrics { span_events: IntCounterVec, last_known_message_nonce: IntGaugeVec, latest_leaf_index: IntGaugeVec, + highest_seen_tree_index: IntGaugeVec, submitter_queue_length: IntGaugeVec, operations_processed_count: IntCounterVec, @@ -122,6 +123,15 @@ impl CoreMetrics { &["origin"], registry )?; + let highest_seen_tree_index = register_int_gauge_vec_with_registry!( + opts!( + namespaced!("highest_seen_tree_index"), + "Highest tree index seen by the relayer", + const_labels_ref + ), + &["origin"], + registry + )?; let observed_validator_latest_index = register_int_gauge_vec_with_registry!( opts!( @@ -189,6 +199,7 @@ impl CoreMetrics { span_events, last_known_message_nonce, latest_leaf_index, + highest_seen_tree_index, submitter_queue_length, @@ -329,6 +340,14 @@ impl CoreMetrics { self.latest_leaf_index.clone() } + /// Reports the current highest tree index seen by the relayer. + /// + /// Labels: + /// - `origin`: Origin chain the tree index is being tracked at. + pub fn highest_seen_tree_index(&self) -> IntGaugeVec { + self.highest_seen_tree_index.clone() + } + /// Latest message nonce in the validator. /// /// Phase: