Skip to content

Commit

Permalink
Use app context classifier in relayer submitter queues (#3385)
Browse files Browse the repository at this point in the history
### Description

Includes the `app_context` classification in `PendingMessage`, and adds
trait methods on `PendingOperation` to require always having such a
label on `OpQueue` operations. This is done by reusing the matching list
logic from the validator checkpoint labels
(#3057).

The nice thing is that this enables later support for retrying a group
of `OpQueue` operations just by specifying the `app_context` label,
without adding any new logic, since these labels are essentially
matching list results. One downside to using `app_context` for retries
is that the endpoint caller is constrained to only the matching lists
defined by the relayer operator - however imo only the relayer operator
that should be able to trigger retries.

### Drive-by changes

The `OpQueue` type alias is converted to an actual struct, that stores
the queue label (for metrics purposes), and also the `IntGaugeVec`
metric: the generic group of metrics associated with that queue
(basically only `submitter_queue_length` currently).

### Related issues

- Fixes #3240


### Backward compatibility

Yes

### Testing

Manual, by spinning up a relayer for injective and inevm. Sample
metrics, from `--metricAppContexts '[{"name": "injectivelabel",
"matchingList": [{"destination_domain": 6909546 }] }, {"name":
"inevmlabel", "matchingList": [{"destination_domain": 2525 }] }]'`
```
hyperlane_submitter_queue_length{agent="relayer",app_context="inevmlabel",hyperlane_baselib_version="0.1.0",queue_name="confirm_queue",remote="inevm"} 11
hyperlane_submitter_queue_length{agent="relayer",app_context="inevmlabel",hyperlane_baselib_version="0.1.0",queue_name="prepare_queue",remote="inevm"} 0
hyperlane_submitter_queue_length{agent="relayer",app_context="injectivelabel",hyperlane_baselib_version="0.1.0",queue_name="confirm_queue",remote="injective"} 63
hyperlane_submitter_queue_length{agent="relayer",app_context="injectivelabel",hyperlane_baselib_version="0.1.0",queue_name="prepare_queue",remote="injective"} 13281
```
  • Loading branch information
daniel-savu authored and yorhodes committed Mar 22, 2024
1 parent b6aaa1e commit 403bcca
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 60 deletions.
46 changes: 30 additions & 16 deletions rust/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,36 +103,55 @@ impl DefaultIsmCache {
}
}

/// Classifies messages into an app context if they have one.
#[derive(Debug)]
pub struct AppContextClassifier {
pub struct IsmAwareAppContextClassifier {
default_ism: DefaultIsmCache,
app_matching_lists: Vec<(MatchingList, String)>,
app_context_classifier: AppContextClassifier,
}

impl AppContextClassifier {
impl IsmAwareAppContextClassifier {
pub fn new(
destination_mailbox: Arc<dyn Mailbox>,
app_matching_lists: Vec<(MatchingList, String)>,
) -> Self {
Self {
default_ism: DefaultIsmCache::new(destination_mailbox),
app_matching_lists,
app_context_classifier: AppContextClassifier::new(app_matching_lists),
}
}

pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
) -> Result<Option<String>> {
if let Some(app_context) = self.app_context_classifier.get_app_context(message).await? {
return Ok(Some(app_context));
}

if root_ism == self.default_ism.get().await? {
return Ok(Some("default_ism".to_string()));
}

Ok(None)
}
}

/// Classifies messages into an app context if they have one.
#[derive(Debug, new)]
pub struct AppContextClassifier {
app_matching_lists: Vec<(MatchingList, String)>,
}

impl AppContextClassifier {
/// Classifies messages into an app context if they have one, or None
/// if they don't.
/// An app context is a string that identifies the app that sent the message
/// and exists just for metrics.
/// An app context is chosen based on:
/// - the first element in `app_matching_lists` that matches the message
/// - if the message's ISM is the default ISM, the app context is "default_ism"
pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
) -> Result<Option<String>> {
pub async fn get_app_context(&self, message: &HyperlaneMessage) -> Result<Option<String>> {
// Give priority to the matching list. If the app from the matching list happens
// to use the default ISM, it's preferable to use the app context from the matching
// list.
Expand All @@ -142,11 +161,6 @@ impl AppContextClassifier {
}
}

let default_ism = self.default_ism.get().await?;
if root_ism == default_ism {
return Ok(Some("default_ism".to_string()));
}

Ok(None)
}
}
Expand Down Expand Up @@ -264,7 +278,7 @@ pub struct BaseMetadataBuilder {
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
max_depth: u32,
app_context_classifier: AppContextClassifier,
app_context_classifier: IsmAwareAppContextClassifier,
}

impl Debug for BaseMetadataBuilder {
Expand Down
4 changes: 3 additions & 1 deletion rust/agents/relayer/src/msg/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ mod routing;

use aggregation::AggregationIsmMetadataBuilder;
pub(crate) use base::MetadataBuilder;
pub(crate) use base::{AppContextClassifier, BaseMetadataBuilder, MessageMetadataBuilder};
pub(crate) use base::{
AppContextClassifier, BaseMetadataBuilder, IsmAwareAppContextClassifier, MessageMetadataBuilder,
};
use ccip_read::CcipReadIsmMetadataBuilder;
use null_metadata::NullMetadataBuilder;
use routing::RoutingIsmMetadataBuilder;
15 changes: 12 additions & 3 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct MessageContext {
pub struct PendingMessage {
pub message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
#[new(default)]
submitted: bool,
#[new(default)]
Expand Down Expand Up @@ -104,6 +105,10 @@ impl PendingOperation for PendingMessage {
self.ctx.destination_mailbox.domain()
}

fn app_context(&self) -> Option<String> {
self.app_context.clone()
}

#[instrument]
async fn prepare(&mut self) -> PendingOperationResult {
make_op_try!(|| self.on_reprepare());
Expand Down Expand Up @@ -302,7 +307,7 @@ impl PendingOperation for PendingMessage {
}
}

fn _next_attempt_after(&self) -> Option<Instant> {
fn next_attempt_after(&self) -> Option<Instant> {
self.next_attempt_after
}

Expand All @@ -315,8 +320,12 @@ impl PendingOperation for PendingMessage {
impl PendingMessage {
/// Constructor that tries reading the retry count from the HyperlaneDB in order to recompute the `next_attempt_after`.
/// In case of failure, behaves like `Self::new(...)`.
pub fn from_persisted_retries(message: HyperlaneMessage, ctx: Arc<MessageContext>) -> Self {
let mut pm = Self::new(message, ctx);
pub fn from_persisted_retries(
message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
) -> Self {
let mut pm = Self::new(message, ctx, app_context);
match pm
.ctx
.origin_db
Expand Down
14 changes: 12 additions & 2 deletions rust/agents/relayer/src/msg/pending_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ pub trait PendingOperation {
/// The domain this operation will take place on.
fn domain(&self) -> &HyperlaneDomain;

/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;

/// Get tuple of labels for metrics.
fn get_operation_labels(&self) -> (String, String) {
let app_context = self.app_context().unwrap_or("Unknown".to_string());
let destination = self.domain().to_string();
(destination, app_context)
}

/// Prepare to submit this operation. This will be called before every
/// submission and will usually have a very short gap between it and the
/// submit call.
Expand All @@ -50,7 +60,7 @@ pub trait PendingOperation {
///
/// This is only used for sorting, the functions are responsible for
/// returning `NotReady` if it is too early and matters.
fn _next_attempt_after(&self) -> Option<Instant>;
fn next_attempt_after(&self) -> Option<Instant>;

#[cfg(test)]
/// Set the number of times this operation has been retried.
Expand Down Expand Up @@ -80,7 +90,7 @@ impl Ord for DynPendingOperation {
fn cmp(&self, other: &Self) -> Ordering {
use DynPendingOperation::*;
use Ordering::*;
match (self._next_attempt_after(), other._next_attempt_after()) {
match (self.next_attempt_after(), other.next_attempt_after()) {
(Some(a), Some(b)) => a.cmp(&b),
// No time means it should come before
(None, Some(_)) => Less,
Expand Down
16 changes: 12 additions & 4 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use prometheus::IntGauge;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, trace};

use super::pending_message::*;
use super::{metadata::AppContextClassifier, pending_message::*};
use crate::msg::pending_operation::DynPendingOperation;
use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};

/// Finds unprocessed messages from an origin and submits then through a channel
/// for to the appropriate destination.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct MessageProcessor {
db: HyperlaneRocksDB,
Expand All @@ -31,6 +32,7 @@ pub struct MessageProcessor {
send_channels: HashMap<u32, UnboundedSender<Box<DynPendingOperation>>>,
/// Needed context to send a message for each destination chain
destination_ctxs: HashMap<u32, Arc<MessageContext>>,
metric_app_contexts: Vec<(MatchingList, String)>,
#[new(default)]
message_nonce: u32,
}
Expand Down Expand Up @@ -94,10 +96,15 @@ impl ProcessorExt for MessageProcessor {

debug!(%msg, "Sending message to submitter");

let app_context_classifier =
AppContextClassifier::new(self.metric_app_contexts.clone());

let app_context = app_context_classifier.get_app_context(&msg).await?;
// Finally, build the submit arg and dispatch it to the submitter.
let pending_msg = PendingMessage::from_persisted_retries(
msg,
self.destination_ctxs[&destination].clone(),
app_context,
);
self.send_channels[&destination].send(Box::new(pending_msg.into()))?;
self.message_nonce += 1;
Expand Down Expand Up @@ -184,7 +191,7 @@ mod test {
merkle_tree::builder::MerkleTreeBuilder,
msg::{
gas_payment::GasPaymentEnforcer,
metadata::{AppContextClassifier, BaseMetadataBuilder},
metadata::{BaseMetadataBuilder, IsmAwareAppContextClassifier},
pending_operation::PendingOperation,
},
processor::Processor,
Expand Down Expand Up @@ -265,7 +272,7 @@ mod test {
Arc::new(core_metrics),
db.clone(),
5,
AppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]),
IsmAwareAppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]),
)
}

Expand Down Expand Up @@ -296,6 +303,7 @@ mod test {
dummy_processor_metrics(origin_domain.id()),
HashMap::from([(destination_domain.id(), send_channel)]),
HashMap::from([(destination_domain.id(), message_context)]),
vec![],
),
receive_channel,
)
Expand Down Expand Up @@ -425,7 +433,7 @@ mod test {
// Round up the actual backoff because it was calculated with an `Instant::now()` that was a fraction of a second ago
let expected_backoff = PendingMessage::calculate_msg_backoff(*expected_retries)
.map(|b| b.as_secs_f32().round());
let actual_backoff = pm._next_attempt_after().map(|instant| {
let actual_backoff = pm.next_attempt_after().map(|instant| {
instant.duration_since(Instant::now()).as_secs_f32().round()
});
assert_eq!(expected_backoff, actual_backoff);
Expand Down
Loading

0 comments on commit 403bcca

Please sign in to comment.