Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use app context classifier in relayer submitter queues #3385

Merged
merged 11 commits into from
Mar 13, 2024
46 changes: 30 additions & 16 deletions rust/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,36 +103,55 @@ impl DefaultIsmCache {
}
}

/// Classifies messages into an app context if they have one.
#[derive(Debug)]
pub struct AppContextClassifier {
pub struct IsmAwareAppContextClassifier {
default_ism: DefaultIsmCache,
app_matching_lists: Vec<(MatchingList, String)>,
app_context_classifier: AppContextClassifier,
}

impl AppContextClassifier {
impl IsmAwareAppContextClassifier {
pub fn new(
destination_mailbox: Arc<dyn Mailbox>,
app_matching_lists: Vec<(MatchingList, String)>,
) -> Self {
Self {
default_ism: DefaultIsmCache::new(destination_mailbox),
app_matching_lists,
app_context_classifier: AppContextClassifier::new(app_matching_lists),
}
}

pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
) -> Result<Option<String>> {
if let Some(app_context) = self.app_context_classifier.get_app_context(message).await? {
return Ok(Some(app_context));
}

if root_ism == self.default_ism.get().await? {
return Ok(Some("default_ism".to_string()));
}

Ok(None)
}
}

/// Classifies messages into an app context if they have one.
#[derive(Debug, new)]
pub struct AppContextClassifier {
app_matching_lists: Vec<(MatchingList, String)>,
}

impl AppContextClassifier {
/// Classifies messages into an app context if they have one, or None
/// if they don't.
/// An app context is a string that identifies the app that sent the message
/// and exists just for metrics.
/// An app context is chosen based on:
/// - the first element in `app_matching_lists` that matches the message
/// - if the message's ISM is the default ISM, the app context is "default_ism"
pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
) -> Result<Option<String>> {
pub async fn get_app_context(&self, message: &HyperlaneMessage) -> Result<Option<String>> {
// Give priority to the matching list. If the app from the matching list happens
// to use the default ISM, it's preferable to use the app context from the matching
// list.
Expand All @@ -142,11 +161,6 @@ impl AppContextClassifier {
}
}

let default_ism = self.default_ism.get().await?;
if root_ism == default_ism {
return Ok(Some("default_ism".to_string()));
}

Ok(None)
}
}
Expand Down Expand Up @@ -264,7 +278,7 @@ pub struct BaseMetadataBuilder {
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
max_depth: u32,
app_context_classifier: AppContextClassifier,
app_context_classifier: IsmAwareAppContextClassifier,
}

impl Debug for BaseMetadataBuilder {
Expand Down
4 changes: 3 additions & 1 deletion rust/agents/relayer/src/msg/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ mod routing;

use aggregation::AggregationIsmMetadataBuilder;
pub(crate) use base::MetadataBuilder;
pub(crate) use base::{AppContextClassifier, BaseMetadataBuilder, MessageMetadataBuilder};
pub(crate) use base::{
AppContextClassifier, BaseMetadataBuilder, IsmAwareAppContextClassifier, MessageMetadataBuilder,
};
use ccip_read::CcipReadIsmMetadataBuilder;
use null_metadata::NullMetadataBuilder;
use routing::RoutingIsmMetadataBuilder;
15 changes: 12 additions & 3 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct MessageContext {
pub struct PendingMessage {
pub message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
#[new(default)]
submitted: bool,
#[new(default)]
Expand Down Expand Up @@ -104,6 +105,10 @@ impl PendingOperation for PendingMessage {
self.ctx.destination_mailbox.domain()
}

fn app_context(&self) -> Option<String> {
self.app_context.clone()
}

#[instrument]
async fn prepare(&mut self) -> PendingOperationResult {
make_op_try!(|| self.on_reprepare());
Expand Down Expand Up @@ -302,7 +307,7 @@ impl PendingOperation for PendingMessage {
}
}

fn _next_attempt_after(&self) -> Option<Instant> {
fn next_attempt_after(&self) -> Option<Instant> {
self.next_attempt_after
}

Expand All @@ -315,8 +320,12 @@ impl PendingOperation for PendingMessage {
impl PendingMessage {
/// Constructor that tries reading the retry count from the HyperlaneDB in order to recompute the `next_attempt_after`.
/// In case of failure, behaves like `Self::new(...)`.
pub fn from_persisted_retries(message: HyperlaneMessage, ctx: Arc<MessageContext>) -> Self {
let mut pm = Self::new(message, ctx);
pub fn from_persisted_retries(
message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
) -> Self {
let mut pm = Self::new(message, ctx, app_context);
match pm
.ctx
.origin_db
Expand Down
14 changes: 12 additions & 2 deletions rust/agents/relayer/src/msg/pending_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ pub trait PendingOperation {
/// The domain this operation will take place on.
fn domain(&self) -> &HyperlaneDomain;

/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved

/// Get tuple of labels for metrics.
fn get_operation_labels(&self) -> (String, String) {
let app_context = self.app_context().unwrap_or("Unknown".to_string());
let destination = self.domain().to_string();
(destination, app_context)
}

/// Prepare to submit this operation. This will be called before every
/// submission and will usually have a very short gap between it and the
/// submit call.
Expand All @@ -50,7 +60,7 @@ pub trait PendingOperation {
///
/// This is only used for sorting, the functions are responsible for
/// returning `NotReady` if it is too early and matters.
fn _next_attempt_after(&self) -> Option<Instant>;
fn next_attempt_after(&self) -> Option<Instant>;

#[cfg(test)]
/// Set the number of times this operation has been retried.
Expand Down Expand Up @@ -80,7 +90,7 @@ impl Ord for DynPendingOperation {
fn cmp(&self, other: &Self) -> Ordering {
use DynPendingOperation::*;
use Ordering::*;
match (self._next_attempt_after(), other._next_attempt_after()) {
match (self.next_attempt_after(), other.next_attempt_after()) {
(Some(a), Some(b)) => a.cmp(&b),
// No time means it should come before
(None, Some(_)) => Less,
Expand Down
16 changes: 12 additions & 4 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use prometheus::IntGauge;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, trace};

use super::pending_message::*;
use super::{metadata::AppContextClassifier, pending_message::*};
use crate::msg::pending_operation::DynPendingOperation;
use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};

/// Finds unprocessed messages from an origin and submits then through a channel
/// for to the appropriate destination.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct MessageProcessor {
db: HyperlaneRocksDB,
Expand All @@ -31,6 +32,7 @@ pub struct MessageProcessor {
send_channels: HashMap<u32, UnboundedSender<Box<DynPendingOperation>>>,
/// Needed context to send a message for each destination chain
destination_ctxs: HashMap<u32, Arc<MessageContext>>,
metric_app_contexts: Vec<(MatchingList, String)>,
#[new(default)]
message_nonce: u32,
}
Expand Down Expand Up @@ -94,10 +96,15 @@ impl ProcessorExt for MessageProcessor {

debug!(%msg, "Sending message to submitter");

let app_context_classifier =
AppContextClassifier::new(self.metric_app_contexts.clone());

let app_context = app_context_classifier.get_app_context(&msg).await?;
// Finally, build the submit arg and dispatch it to the submitter.
let pending_msg = PendingMessage::from_persisted_retries(
msg,
self.destination_ctxs[&destination].clone(),
app_context,
);
self.send_channels[&destination].send(Box::new(pending_msg.into()))?;
self.message_nonce += 1;
Expand Down Expand Up @@ -184,7 +191,7 @@ mod test {
merkle_tree::builder::MerkleTreeBuilder,
msg::{
gas_payment::GasPaymentEnforcer,
metadata::{AppContextClassifier, BaseMetadataBuilder},
metadata::{BaseMetadataBuilder, IsmAwareAppContextClassifier},
pending_operation::PendingOperation,
},
processor::Processor,
Expand Down Expand Up @@ -265,7 +272,7 @@ mod test {
Arc::new(core_metrics),
db.clone(),
5,
AppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]),
IsmAwareAppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]),
)
}

Expand Down Expand Up @@ -296,6 +303,7 @@ mod test {
dummy_processor_metrics(origin_domain.id()),
HashMap::from([(destination_domain.id(), send_channel)]),
HashMap::from([(destination_domain.id(), message_context)]),
vec![],
),
receive_channel,
)
Expand Down Expand Up @@ -425,7 +433,7 @@ mod test {
// Round up the actual backoff because it was calculated with an `Instant::now()` that was a fraction of a second ago
let expected_backoff = PendingMessage::calculate_msg_backoff(*expected_retries)
.map(|b| b.as_secs_f32().round());
let actual_backoff = pm._next_attempt_after().map(|instant| {
let actual_backoff = pm.next_attempt_after().map(|instant| {
instant.duration_since(Instant::now()).as_secs_f32().round()
});
assert_eq!(expected_backoff, actual_backoff);
Expand Down
Loading
Loading