Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use app context classifier in relayer submitter queues #3385

Merged
merged 11 commits into from
Mar 13, 2024
8 changes: 6 additions & 2 deletions rust/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl AppContextClassifier {
pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
root_ism: Option<H256>,
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Option<String>> {
// Give priority to the matching list. If the app from the matching list happens
// to use the default ISM, it's preferable to use the app context from the matching
Expand All @@ -142,6 +142,10 @@ impl AppContextClassifier {
}
}

let Some(root_ism) = root_ism else {
return Ok(None);
};

let default_ism = self.default_ism.get().await?;
if root_ism == default_ism {
return Ok(Some("default_ism".to_string()));
Expand Down Expand Up @@ -191,7 +195,7 @@ impl MessageMetadataBuilder {
) -> Result<Self> {
let app_context = base
.app_context_classifier
.get_app_context(message, ism_address)
.get_app_context(message, Some(ism_address))
.await?;
Ok(Self {
base,
Expand Down
21 changes: 18 additions & 3 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct MessageContext {
pub struct PendingMessage {
pub message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
#[new(default)]
submitted: bool,
#[new(default)]
Expand Down Expand Up @@ -104,6 +105,16 @@ impl PendingOperation for PendingMessage {
self.ctx.destination_mailbox.domain()
}

/// The label to use for metrics granularity.
fn app_context(&self) -> Option<String> {
self.app_context.clone()
}

fn destination(&self) -> String {
// TODO: check whether this is the correct way of converting the address to string
self.ctx.destination_mailbox.address().to_string()
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
}

#[instrument]
async fn prepare(&mut self) -> PendingOperationResult {
make_op_try!(|| self.on_reprepare());
Expand Down Expand Up @@ -302,7 +313,7 @@ impl PendingOperation for PendingMessage {
}
}

fn _next_attempt_after(&self) -> Option<Instant> {
fn next_attempt_after(&self) -> Option<Instant> {
self.next_attempt_after
}

Expand All @@ -315,8 +326,12 @@ impl PendingOperation for PendingMessage {
impl PendingMessage {
/// Constructor that tries reading the retry count from the HyperlaneDB in order to recompute the `next_attempt_after`.
/// In case of failure, behaves like `Self::new(...)`.
pub fn from_persisted_retries(message: HyperlaneMessage, ctx: Arc<MessageContext>) -> Self {
let mut pm = Self::new(message, ctx);
pub fn from_persisted_retries(
message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
) -> Self {
let mut pm = Self::new(message, ctx, app_context);
match pm
.ctx
.origin_db
Expand Down
17 changes: 15 additions & 2 deletions rust/agents/relayer/src/msg/pending_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ pub trait PendingOperation {
/// The domain this operation will take place on.
fn domain(&self) -> &HyperlaneDomain;

/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved

/// The destination contract address this operation is meant for.
fn destination(&self) -> String;

/// Get tuple of labels for metrics.
fn get_operation_labels(&self) -> (String, String) {
let app_context = self.app_context().unwrap_or("Unknown".to_string());
let destination = self.destination();
(destination, app_context)
}

/// Prepare to submit this operation. This will be called before every
/// submission and will usually have a very short gap between it and the
/// submit call.
Expand All @@ -50,7 +63,7 @@ pub trait PendingOperation {
///
/// This is only used for sorting, the functions are responsible for
/// returning `NotReady` if it is too early and matters.
fn _next_attempt_after(&self) -> Option<Instant>;
fn next_attempt_after(&self) -> Option<Instant>;

#[cfg(test)]
/// Set the number of times this operation has been retried.
Expand Down Expand Up @@ -80,7 +93,7 @@ impl Ord for DynPendingOperation {
fn cmp(&self, other: &Self) -> Ordering {
use DynPendingOperation::*;
use Ordering::*;
match (self._next_attempt_after(), other._next_attempt_after()) {
match (self.next_attempt_after(), other.next_attempt_after()) {
(Some(a), Some(b)) => a.cmp(&b),
// No time means it should come before
(None, Some(_)) => Less,
Expand Down
19 changes: 16 additions & 3 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, Mailbox};
use prometheus::IntGauge;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, trace};

use super::pending_message::*;
use super::{metadata::AppContextClassifier, pending_message::*};
use crate::msg::pending_operation::DynPendingOperation;
use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};

/// Finds unprocessed messages from an origin and submits then through a channel
/// for to the appropriate destination.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct MessageProcessor {
db: HyperlaneRocksDB,
Expand All @@ -31,6 +32,8 @@ pub struct MessageProcessor {
send_channels: HashMap<u32, UnboundedSender<Box<DynPendingOperation>>>,
/// Needed context to send a message for each destination chain
destination_ctxs: HashMap<u32, Arc<MessageContext>>,
mailboxes: HashMap<HyperlaneDomain, Arc<dyn Mailbox>>,
metric_app_contexts: Vec<(MatchingList, String)>,
#[new(default)]
message_nonce: u32,
}
Expand Down Expand Up @@ -94,10 +97,18 @@ impl ProcessorExt for MessageProcessor {

debug!(%msg, "Sending message to submitter");

let domain: HyperlaneDomain = KnownHyperlaneDomain::try_from(msg.destination)?.into();
let app_context_classifier = AppContextClassifier::new(
self.mailboxes[&domain].clone(),
self.metric_app_contexts.clone(),
);

let app_context = app_context_classifier.get_app_context(&msg, None).await?;
// Finally, build the submit arg and dispatch it to the submitter.
let pending_msg = PendingMessage::from_persisted_retries(
msg,
self.destination_ctxs[&destination].clone(),
app_context,
);
self.send_channels[&destination].send(Box::new(pending_msg.into()))?;
self.message_nonce += 1;
Expand Down Expand Up @@ -296,6 +307,8 @@ mod test {
dummy_processor_metrics(origin_domain.id()),
HashMap::from([(destination_domain.id(), send_channel)]),
HashMap::from([(destination_domain.id(), message_context)]),
HashMap::new(),
vec![],
),
receive_channel,
)
Expand Down Expand Up @@ -425,7 +438,7 @@ mod test {
// Round up the actual backoff because it was calculated with an `Instant::now()` that was a fraction of a second ago
let expected_backoff = PendingMessage::calculate_msg_backoff(*expected_retries)
.map(|b| b.as_secs_f32().round());
let actual_backoff = pm._next_attempt_after().map(|instant| {
let actual_backoff = pm.next_attempt_after().map(|instant| {
instant.duration_since(Instant::now()).as_secs_f32().round()
});
assert_eq!(expected_backoff, actual_backoff);
Expand Down
96 changes: 65 additions & 31 deletions rust/agents/relayer/src/msg/serial_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;

use derive_new::new;
use futures_util::future::try_join_all;
use prometheus::{IntCounter, IntGauge};
use prometheus::{IntCounter, IntGauge, IntGaugeVec};
use tokio::spawn;
use tokio::sync::{
mpsc::{self},
Expand All @@ -20,7 +20,28 @@ use hyperlane_core::HyperlaneDomain;

use super::pending_operation::*;

type OpQueue = Arc<Mutex<BinaryHeap<Reverse<Box<DynPendingOperation>>>>>;
#[derive(Debug, Clone, new)]
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
struct OpQueue {
metrics: SerialSubmitterMetrics,
#[new(default)]
queue: Arc<Mutex<BinaryHeap<Reverse<Box<DynPendingOperation>>>>>,
}

impl OpQueue {
async fn push(&self, op: Box<DynPendingOperation>) {
// increment the metric before pushing onto the queue, because we lose ownership afterwards
self.metrics.prepare_queue_gauge(&op).inc();
self.queue.lock().await.push(Reverse(op));
}

async fn pop(&self) -> Option<Reverse<Box<DynPendingOperation>>> {
let op = self.queue.lock().await.pop();
op.map(|op| {
self.metrics.prepare_queue_gauge(&op.0).dec();
op
})
}
}

/// SerialSubmitter accepts operations over a channel. It is responsible for
/// executing the right strategy to deliver those messages to the destination
Expand Down Expand Up @@ -91,8 +112,8 @@ impl SerialSubmitter {
metrics,
rx: rx_prepare,
} = self;
let prepare_queue: OpQueue = Default::default();
let confirm_queue: OpQueue = Default::default();
let prepare_queue = OpQueue::new(metrics.clone());
let confirm_queue = OpQueue::new(metrics.clone());

// This is a channel because we want to only have a small number of messages
// sitting ready to go at a time and this acts as a synchronization tool
Expand Down Expand Up @@ -149,7 +170,7 @@ async fn receive_task(
// make sure things are getting wired up correctly; if this works in testing it
// should also be valid in production.
debug_assert_eq!(*op.domain(), domain);
prepare_queue.lock().await.push(Reverse(op));
prepare_queue.push(op).await;
}
}

Expand All @@ -162,16 +183,17 @@ async fn prepare_task(
) {
loop {
// Pick the next message to try preparing.
let next = {
let mut queue = prepare_queue.lock().await;
metrics.prepare_queue_length.set(queue.len() as i64);
queue.pop()
};
let next = prepare_queue.pop().await;

let Some(Reverse(mut op)) = next else {
// queue is empty so give some time before checking again to prevent burning CPU
sleep(Duration::from_millis(200)).await;
continue;
};

// decrement the metric gauge for this operation, since it was just popped off the queue
metrics.prepare_queue_gauge(&op).dec();

trace!(?op, "Preparing operation");
debug_assert_eq!(*op.domain(), domain);

Expand All @@ -186,12 +208,14 @@ async fn prepare_task(
}
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
prepare_queue.lock().await.push(Reverse(op));
metrics.prepare_queue_gauge(&op).inc();
prepare_queue.push(op).await;
sleep(Duration::from_millis(200)).await;
}
PendingOperationResult::Reprepare => {
metrics.ops_failed.inc();
prepare_queue.lock().await.push(Reverse(op));
metrics.prepare_queue_gauge(&op).inc();
prepare_queue.push(op).await;
}
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
Expand All @@ -216,14 +240,14 @@ async fn submit_task(
PendingOperationResult::Success => {
debug!(?op, "Operation submitted");
metrics.ops_submitted.inc();
confirm_queue.lock().await.push(Reverse(op));
confirm_queue.push(op).await;
}
PendingOperationResult::NotReady => {
panic!("Pending operation was prepared and therefore must be ready")
}
PendingOperationResult::Reprepare => {
metrics.ops_failed.inc();
prepare_queue.lock().await.push(Reverse(op));
prepare_queue.push(op).await;
}
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
Expand All @@ -241,15 +265,14 @@ async fn confirm_task(
) {
loop {
// Pick the next message to try confirming.
let next = {
let mut queue = confirm_queue.lock().await;
metrics.confirm_queue_length.set(queue.len() as i64);
queue.pop()
};
let Some(Reverse(mut op)) = next else {
let Some(Reverse(mut op)) = confirm_queue.pop().await else {
sleep(Duration::from_secs(5)).await;
continue;
};

// decrement the metric gauge for this operation, since it was just popped off the queue
metrics.confirm_queue_gauge(&op).dec();

trace!(?op, "Confirming operation");
debug_assert_eq!(*op.domain(), domain);

Expand All @@ -260,12 +283,12 @@ async fn confirm_task(
}
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
confirm_queue.lock().await.push(Reverse(op));
confirm_queue.push(op).await;
sleep(Duration::from_secs(5)).await;
}
PendingOperationResult::Reprepare => {
metrics.ops_reorged.inc();
prepare_queue.lock().await.push(Reverse(op));
prepare_queue.push(op).await;
}
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
Expand All @@ -276,9 +299,7 @@ async fn confirm_task(

#[derive(Debug, Clone)]
pub struct SerialSubmitterMetrics {
prepare_queue_length: IntGauge,
confirm_queue_length: IntGauge,

submitter_queue_length: IntGaugeVec,
ops_prepared: IntCounter,
ops_submitted: IntCounter,
ops_confirmed: IntCounter,
Expand All @@ -291,12 +312,7 @@ impl SerialSubmitterMetrics {
pub fn new(metrics: &CoreMetrics, destination: &HyperlaneDomain) -> Self {
let destination = destination.name();
Self {
prepare_queue_length: metrics
.submitter_queue_length()
.with_label_values(&[destination, "prepare_queue"]),
confirm_queue_length: metrics
.submitter_queue_length()
.with_label_values(&[destination, "confirm_queue"]),
submitter_queue_length: metrics.submitter_queue_length(),
ops_prepared: metrics
.operations_processed_count()
.with_label_values(&["prepared", destination]),
Expand All @@ -317,4 +333,22 @@ impl SerialSubmitterMetrics {
.with_label_values(&["dropped", destination]),
}
}

pub fn prepare_queue_gauge(&self, operation: &DynPendingOperation) -> IntGauge {
let (app_context, destination) = operation.get_operation_labels();
self.submitter_queue_length.with_label_values(&[
&destination,
"prepare_queue",
&app_context,
])
}

pub fn confirm_queue_gauge(&self, operation: &DynPendingOperation) -> IntGauge {
let (app_context, destination) = operation.get_operation_labels();
self.submitter_queue_length.with_label_values(&[
&destination,
"confirm_queue",
&app_context,
])
}
}
Loading
Loading