Skip to content

Commit

Permalink
add highest_seen_tree_index
Browse files Browse the repository at this point in the history
  • Loading branch information
aroralanuk committed Jan 11, 2025
1 parent 2d4963c commit 920c7da
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 4 deletions.
48 changes: 44 additions & 4 deletions rust/main/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
collections::{HashMap, HashSet},
fmt::{Debug, Formatter},
sync::Arc,
time::Duration,
};

use async_trait::async_trait;
Expand All @@ -10,7 +11,7 @@ use eyre::Result;
use futures_util::future::try_join_all;
use hyperlane_base::{
broadcast::BroadcastMpscSender,
db::{HyperlaneRocksDB, DB},
db::{HyperlaneDb, HyperlaneRocksDB, DB},
metrics::{AgentMetrics, MetricsUpdater},
settings::{ChainConf, IndexSettings},
AgentMetadata, BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics,
Expand Down Expand Up @@ -526,16 +527,38 @@ impl Relayer {
return tokio::spawn(async {}).instrument(info_span!("MerkleTreeHookSync"));
}
};
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {

let origin_clone = origin.clone();
let origin_db = Arc::new(self.dbs.get(origin).unwrap().clone());
let core_metrics = self.core_metrics.clone();

let hook_sync_handle = tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
contract_sync
.clone()
.sync(
"merkle_tree_hook",
SyncOptions::new(Some(cursor), tx_id_receiver),
)
.await
.await;
}))
.instrument(info_span!("MerkleTreeHookSync"))
.instrument(info_span!("MerkleTreeHookSync"));

let index_updater_handle =
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
loop {
Self::update_highest_seen_tree_index(&core_metrics, &origin_db, &origin_clone)
.await;

// sleep for 5 seconds, ie, 5 seconds between updates
tokio::time::sleep(Duration::from_secs(5)).await;
}
}))
.instrument(info_span!("MerkleTreeHighestSeenIndexUpdater"));

tokio::spawn(async move {
let _ = tokio::join!(hook_sync_handle, index_updater_handle);
})
.instrument(info_span!("MerkleTreeHookSync_with_IndexUpdater"))
}

fn run_message_processor(
Expand Down Expand Up @@ -620,6 +643,23 @@ impl Relayer {
}))
.instrument(span)
}

async fn update_highest_seen_tree_index(
core_metrics: &CoreMetrics,
origin_db: &Arc<HyperlaneRocksDB>,
origin: &HyperlaneDomain,
) {
let highest_seen_tree_index = origin_db
.retrieve_highest_seen_tree_index()
.unwrap_or_default();

if let Some(highest_seen_tree_index) = highest_seen_tree_index {
core_metrics
.highest_seen_tree_index()
.with_label_values(&[origin.name()])
.set(highest_seen_tree_index as i64);
}
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions rust/main/hyperlane-base/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ pub trait HyperlaneDb: Send + Sync {
leaf_index: &u32,
) -> DbResult<Option<MerkleTreeInsertion>>;

fn store_highest_seen_tree_index(&self, index: &u32) -> DbResult<()>;

/// Retrieve the highest seen tree index by the syncer
fn retrieve_highest_seen_tree_index(&self) -> DbResult<Option<u32>>;

fn store_merkle_leaf_index_by_message_id(
&self,
message_id: &H256,
Expand Down
15 changes: 15 additions & 0 deletions rust/main/hyperlane-base/src/db/rocks/hyperlane_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const NONCE_PROCESSED: &str = "nonce_processed_";
const GAS_PAYMENT_BY_SEQUENCE: &str = "gas_payment_by_sequence_";
const GAS_PAYMENT_BLOCK_BY_SEQUENCE: &str = "gas_payment_block_by_sequence_";
const HIGHEST_SEEN_MESSAGE_NONCE: &str = "highest_seen_message_nonce_";
const HIGHEST_SEEN_TREE_INDEX: &str = "highest_seen_tree_index_";
const GAS_PAYMENT_FOR_MESSAGE_ID: &str = "gas_payment_sequence_for_message_id_v2_";
const GAS_PAYMENT_META_PROCESSED: &str = "gas_payment_meta_processed_v3_";
const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_v2_";
Expand Down Expand Up @@ -211,6 +212,12 @@ impl HyperlaneRocksDB {
&insertion.index(),
&insertion_block_number,
)?;

let current_highest = self.retrieve_highest_seen_tree_index()?.unwrap_or_default();
if insertion.index() > current_highest {
self.store_highest_seen_tree_index(&insertion.index())?;
}

// Return true to indicate the tree insertion was processed
Ok(true)
}
Expand Down Expand Up @@ -614,6 +621,14 @@ impl HyperlaneDb for HyperlaneRocksDB {
self.retrieve_value_by_key(MERKLE_TREE_INSERTION, leaf_index)
}

fn retrieve_highest_seen_tree_index(&self) -> DbResult<Option<u32>> {
self.retrieve_value_by_key(HIGHEST_SEEN_TREE_INDEX, &bool::default())
}

fn store_highest_seen_tree_index(&self, index: &u32) -> DbResult<()> {
self.store_value_by_key(HIGHEST_SEEN_TREE_INDEX, &bool::default(), index)
}

fn store_merkle_leaf_index_by_message_id(
&self,
message_id: &H256,
Expand Down
19 changes: 19 additions & 0 deletions rust/main/hyperlane-base/src/metrics/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct CoreMetrics {
span_events: IntCounterVec,
last_known_message_nonce: IntGaugeVec,
latest_leaf_index: IntGaugeVec,
highest_seen_tree_index: IntGaugeVec,
submitter_queue_length: IntGaugeVec,

operations_processed_count: IntCounterVec,
Expand Down Expand Up @@ -122,6 +123,15 @@ impl CoreMetrics {
&["origin"],
registry
)?;
let highest_seen_tree_index = register_int_gauge_vec_with_registry!(
opts!(
namespaced!("highest_seen_tree_index"),
"Highest tree index seen by the relayer",
const_labels_ref
),
&["origin"],
registry
)?;

let observed_validator_latest_index = register_int_gauge_vec_with_registry!(
opts!(
Expand Down Expand Up @@ -189,6 +199,7 @@ impl CoreMetrics {
span_events,
last_known_message_nonce,
latest_leaf_index,
highest_seen_tree_index,

submitter_queue_length,

Expand Down Expand Up @@ -329,6 +340,14 @@ impl CoreMetrics {
self.latest_leaf_index.clone()
}

/// Reports the current highest tree index seen by the relayer.
///
/// Labels:
/// - `origin`: Origin chain the tree index is being tracked at.
pub fn highest_seen_tree_index(&self) -> IntGaugeVec {
self.highest_seen_tree_index.clone()
}

/// Latest message nonce in the validator.
///
/// Phase:
Expand Down

0 comments on commit 920c7da

Please sign in to comment.