From 5255925e7288d99409e70f6f0247c72843635770 Mon Sep 17 00:00:00 2001 From: kamiyaa Date: Wed, 15 Jan 2025 14:17:07 +0800 Subject: [PATCH] feat: return # of messages prioritized in the queue when calling message retry endpoint --- rust/main/Cargo.lock | 33 +- rust/main/Cargo.toml | 1 + rust/main/agents/relayer/Cargo.toml | 3 + rust/main/agents/relayer/src/lib.rs | 3 +- rust/main/agents/relayer/src/msg/op_queue.rs | 209 +++++++++++-- .../agents/relayer/src/msg/op_submitter.rs | 4 +- rust/main/agents/relayer/src/relayer.rs | 6 +- .../relayer/src/server/message_retry.rs | 291 ++++++++++++++---- rust/main/agents/relayer/src/server/mod.rs | 13 +- rust/main/utils/run-locally/Cargo.toml | 1 + rust/main/utils/run-locally/src/main.rs | 4 + rust/main/utils/run-locally/src/server.rs | 55 ++++ 12 files changed, 519 insertions(+), 104 deletions(-) create mode 100644 rust/main/utils/run-locally/src/server.rs diff --git a/rust/main/Cargo.lock b/rust/main/Cargo.lock index aea92da917..d290968dd1 100644 --- a/rust/main/Cargo.lock +++ b/rust/main/Cargo.lock @@ -518,6 +518,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core 0.3.4", + "axum-macros", "bitflags 1.3.2", "bytes", "futures-util", @@ -606,6 +607,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck 0.4.1", + "proc-macro2 1.0.86", + "quote 1.0.37", + "syn 2.0.77", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -7126,7 +7139,9 @@ dependencies = [ "tokio-test", "tracing", "tracing-futures", + "tracing-test", "typetag", + "uuid 1.12.0", ] [[package]] @@ -7262,7 +7277,7 @@ dependencies = [ "rkyv_derive", "seahash", "tinyvec", - "uuid 1.10.0", + "uuid 1.12.0", ] [[package]] @@ -7362,6 +7377,7 @@ dependencies = [ "once_cell", "regex", "relayer", + "reqwest", "ripemd", "serde", "serde_json", @@ -7879,7 +7895,7 @@ dependencies = [ "time", "tracing", "url", - "uuid 1.10.0", + "uuid 1.12.0", ] [[package]] @@ -7940,7 +7956,7 @@ dependencies = [ "sea-query-derive", "serde_json", "time", - "uuid 1.10.0", + "uuid 1.12.0", ] [[package]] @@ -7956,7 +7972,7 @@ dependencies = [ "serde_json", "sqlx", "time", - "uuid 1.10.0", + "uuid 1.12.0", ] [[package]] @@ -9278,7 +9294,7 @@ dependencies = [ "time", "tokio-stream", "url", - "uuid 1.10.0", + "uuid 1.12.0", "whoami", ] @@ -9681,7 +9697,7 @@ dependencies = [ "tokio", "tracing", "url", - "uuid 1.10.0", + "uuid 1.12.0", "walkdir", ] @@ -10534,10 +10550,11 @@ dependencies = [ [[package]] name = "uuid" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "744018581f9a3454a9e15beb8a33b017183f1e7c0cd170232a2d1453b23a51c4" dependencies = [ + "getrandom 0.2.15", "serde", ] diff --git a/rust/main/Cargo.toml b/rust/main/Cargo.toml index 832bee0c42..54fd5cca12 100644 --- a/rust/main/Cargo.toml +++ b/rust/main/Cargo.toml @@ -153,6 +153,7 @@ typetag = "0.2" uint = "0.9.5" ureq = { version = "2.4", default-features = false } url = "2.3" +uuid = { version = "1.12.0", features = ["v4"] } walkdir = "2" warp = "0.3" which = "4.3" diff --git a/rust/main/agents/relayer/Cargo.toml b/rust/main/agents/relayer/Cargo.toml index 5a891d912c..0f41fee7cc 100644 --- a/rust/main/agents/relayer/Cargo.toml +++ b/rust/main/agents/relayer/Cargo.toml @@ -44,6 +44,7 @@ tokio-metrics.workspace = true tracing-futures.workspace = true tracing.workspace = true typetag.workspace = true +uuid.workspace = true hyperlane-core = { path = "../../hyperlane-core", features = [ "agent", @@ -53,8 +54,10 @@ hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] } hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } [dev-dependencies] +axum = { workspace = true, features = ["macros"] } once_cell.workspace = true mockall.workspace = true +tracing-test.workspace = true tokio-test.workspace = true hyperlane-test = { path = "../../hyperlane-test" } hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] } diff --git a/rust/main/agents/relayer/src/lib.rs b/rust/main/agents/relayer/src/lib.rs index 62b896d628..9a6e1e4147 100644 --- a/rust/main/agents/relayer/src/lib.rs +++ b/rust/main/agents/relayer/src/lib.rs @@ -3,8 +3,9 @@ mod msg; mod processor; mod prover; mod relayer; -mod server; mod settings; +pub mod server; + pub use msg::GAS_EXPENDITURE_LOG_MESSAGE; pub use relayer::*; diff --git a/rust/main/agents/relayer/src/msg/op_queue.rs b/rust/main/agents/relayer/src/msg/op_queue.rs index 99c9dde39f..10b0fad27e 100644 --- a/rust/main/agents/relayer/src/msg/op_queue.rs +++ b/rust/main/agents/relayer/src/msg/op_queue.rs @@ -4,9 +4,9 @@ use derive_new::new; use hyperlane_core::{PendingOperation, PendingOperationStatus, QueueOperation}; use prometheus::{IntGauge, IntGaugeVec}; use tokio::sync::{broadcast::Receiver, Mutex}; -use tracing::{debug, info, instrument}; +use tracing::{debug, instrument}; -use crate::settings::matching_list::MatchingList; +use crate::server::{MessageRetryQueueResponse, MessageRetryRequest}; pub type OperationPriorityQueue = Arc>>>; @@ -16,7 +16,7 @@ pub type OperationPriorityQueue = Arc>> pub struct OpQueue { metrics: IntGaugeVec, queue_metrics_label: String, - retry_rx: Arc>>, + retry_receiver: Arc>>>, #[new(default)] pub queue: OperationPriorityQueue, } @@ -72,28 +72,57 @@ impl OpQueue { // The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task // that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now. let mut message_retry_requests = vec![]; - while let Ok(message_id) = self.retry_rx.lock().await.try_recv() { - message_retry_requests.push(message_id); + // we only need to lock self.retry_receiver once + { + let mut retry_receiver = self.retry_receiver.lock().await; + while let Ok(retry_request) = retry_receiver.try_recv() { + message_retry_requests.push(( + retry_request, + MessageRetryQueueResponse { + evaluated: 0, + matched: 0, + }, + )); + } } if message_retry_requests.is_empty() { return; } - let mut queue = self.queue.lock().await; - let mut reprioritized_queue: BinaryHeap<_> = queue - .drain() - .map(|Reverse(mut op)| { - if message_retry_requests.iter().any(|r| r.op_matches(&op)) { - info!( - operation = %op, - queue_label = %self.queue_metrics_label, - "Retrying OpQueue operation" - ); - op.reset_attempts() - } - Reverse(op) - }) - .collect(); - queue.append(&mut reprioritized_queue); + + let queue_length: usize; + { + let mut queue = self.queue.lock().await; + queue_length = queue.len(); + + let mut reprioritized_queue: BinaryHeap<_> = queue + .drain() + .map(|Reverse(mut op)| { + let mut matched = false; + message_retry_requests + .iter_mut() + .for_each(|(retry_req, retry_response)| { + if !retry_req.pattern.op_matches(&op) { + return; + } + // update retry metrics + retry_response.matched += 1; + matched = true; + }); + if matched { + op.reset_attempts(); + } + Reverse(op) + }) + .collect(); + queue.append(&mut reprioritized_queue); + } + + for (retry_req, mut retry_response) in message_retry_requests { + retry_response.evaluated = queue_length; + if let Err(err) = retry_req.transmitter.send(retry_response).await { + tracing::error!(?err, "Failed to send retry response"); + } + } } /// Get the metric associated with this operation @@ -115,6 +144,8 @@ impl OpQueue { #[cfg(test)] pub mod test { + use crate::{server::ENDPOINT_MESSAGES_QUEUE_SIZE, settings::matching_list::MatchingList}; + use super::*; use hyperlane_core::{ HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneDomainTechnicalStack, @@ -127,7 +158,7 @@ pub mod test { str::FromStr, time::{Duration, Instant}, }; - use tokio::sync; + use tokio::sync::{self, mpsc}; #[derive(Debug, Clone, Serialize)] pub struct MockPendingOperation { @@ -364,12 +395,22 @@ pub mod test { .await; } + let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); + // Retry by message ids broadcaster - .send(MatchingList::with_message_id(op_ids[1])) + .send(Arc::new(MessageRetryRequest { + uuid: "59400966-e7fa-4fb9-9372-9a671d4392c3".to_string(), + pattern: MatchingList::with_message_id(op_ids[1]), + transmitter: transmitter.clone(), + })) .unwrap(); broadcaster - .send(MatchingList::with_message_id(op_ids[2])) + .send(Arc::new(MessageRetryRequest { + uuid: "59400966-e7fa-4fb9-9372-9a671d4392c3".to_string(), + pattern: MatchingList::with_message_id(op_ids[2]), + transmitter, + })) .unwrap(); // Pop elements from queue 1 @@ -425,11 +466,15 @@ pub mod test { .await; } + let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); + // Retry by domain broadcaster - .send(MatchingList::with_destination_domain( - destination_domain_2.id(), - )) + .send(Arc::new(MessageRetryRequest { + uuid: "a5b39473-7cc5-48a1-8bed-565454ba1037".to_string(), + pattern: MatchingList::with_destination_domain(destination_domain_2.id()), + transmitter, + })) .unwrap(); // Pop elements from queue @@ -447,4 +492,114 @@ pub mod test { assert_eq!(popped[3], op_ids[0]); assert_eq!(popped[4], op_ids[1]); } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_process_retry_requests_by_id() { + let (metrics, queue_metrics_label) = dummy_metrics_and_label(); + let broadcaster = sync::broadcast::Sender::new(100); + let mut op_queue_1 = OpQueue::new( + metrics.clone(), + queue_metrics_label.clone(), + Arc::new(Mutex::new(broadcaster.subscribe())), + ); + + // Add some operations to the queue with increasing `next_attempt_after` values + let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into(); + let messages_to_send = 5; + let ops: VecDeque<_> = (1..=messages_to_send) + .map(|seconds_to_next_attempt| { + Box::new(MockPendingOperation::new( + seconds_to_next_attempt, + destination_domain.clone(), + )) as QueueOperation + }) + .collect(); + + let op_ids: Vec<_> = ops.iter().map(|op| op.id()).collect(); + + // push to queue 1 + for op in ops { + op_queue_1 + .push(op, Some(PendingOperationStatus::FirstPrepareAttempt)) + .await; + } + + let (transmitter, mut receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); + + // Retry by message ids + broadcaster + .send(Arc::new(MessageRetryRequest { + uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), + pattern: MatchingList::with_message_id(op_ids[1]), + transmitter: transmitter.clone(), + })) + .unwrap(); + + op_queue_1.process_retry_requests().await; + + let retry_response = receiver.recv().await.unwrap(); + + assert_eq!(retry_response.evaluated, 5); + assert_eq!(retry_response.matched, 1); + } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_process_retry_requests_by_domain() { + let (metrics, queue_metrics_label) = dummy_metrics_and_label(); + let broadcaster = sync::broadcast::Sender::new(100); + let mut op_queue_1 = OpQueue::new( + metrics.clone(), + queue_metrics_label.clone(), + Arc::new(Mutex::new(broadcaster.subscribe())), + ); + + // Add some operations to the queue with increasing `next_attempt_after` values + let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into(); + let messages_to_send = 5; + let mut ops: VecDeque<_> = (1..=messages_to_send) + .map(|seconds_to_next_attempt| { + Box::new(MockPendingOperation::new( + seconds_to_next_attempt, + destination_domain.clone(), + )) as QueueOperation + }) + .collect(); + ops.push_back(Box::new(MockPendingOperation::new( + 10, + KnownHyperlaneDomain::Arbitrum.into(), + )) as QueueOperation); + ops.push_back(Box::new(MockPendingOperation::new( + 10, + KnownHyperlaneDomain::Arbitrum.into(), + )) as QueueOperation); + + // push to queue 1 + for op in ops { + op_queue_1 + .push(op, Some(PendingOperationStatus::FirstPrepareAttempt)) + .await; + } + + let (transmitter, mut receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); + + // Retry by message ids + broadcaster + .send(Arc::new(MessageRetryRequest { + uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), + pattern: MatchingList::with_destination_domain( + KnownHyperlaneDomain::Arbitrum as u32, + ), + transmitter: transmitter.clone(), + })) + .unwrap(); + + op_queue_1.process_retry_requests().await; + + let retry_response = receiver.recv().await.unwrap(); + + assert_eq!(retry_response.evaluated, 7); + assert_eq!(retry_response.matched, 2); + } } diff --git a/rust/main/agents/relayer/src/msg/op_submitter.rs b/rust/main/agents/relayer/src/msg/op_submitter.rs index 1f0a54234c..84e26c6f56 100644 --- a/rust/main/agents/relayer/src/msg/op_submitter.rs +++ b/rust/main/agents/relayer/src/msg/op_submitter.rs @@ -32,7 +32,7 @@ use hyperlane_core::{ }; use crate::msg::pending_message::CONFIRM_DELAY; -use crate::settings::matching_list::MatchingList; +use crate::server::MessageRetryRequest; use super::op_queue::OpQueue; use super::op_queue::OperationPriorityQueue; @@ -105,7 +105,7 @@ impl SerialSubmitter { pub fn new( domain: HyperlaneDomain, rx: mpsc::UnboundedReceiver, - retry_op_transmitter: Sender, + retry_op_transmitter: &Sender>, metrics: SerialSubmitterMetrics, max_batch_size: u32, task_monitor: TaskMonitor, diff --git a/rust/main/agents/relayer/src/relayer.rs b/rust/main/agents/relayer/src/relayer.rs index b1f013b6ae..0962678df6 100644 --- a/rust/main/agents/relayer/src/relayer.rs +++ b/rust/main/agents/relayer/src/relayer.rs @@ -318,7 +318,7 @@ impl BaseAgent for Relayer { })); tasks.push(console_server.instrument(info_span!("Tokio console server"))); } - let sender = BroadcastSender::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); + let sender = BroadcastSender::new(ENDPOINT_MESSAGES_QUEUE_SIZE); // send channels by destination chain let mut send_channels = HashMap::with_capacity(self.destination_chains.len()); let mut prep_queues = HashMap::with_capacity(self.destination_chains.len()); @@ -328,7 +328,7 @@ impl BaseAgent for Relayer { let serial_submitter = SerialSubmitter::new( dest_domain.clone(), receive_channel, - sender.clone(), + &sender, SerialSubmitterMetrics::new(&self.core.metrics, dest_domain), // Default to submitting one message at a time if there is no batch config self.core.settings.chains[dest_domain.name()] @@ -385,7 +385,7 @@ impl BaseAgent for Relayer { ); } // run server - let custom_routes = relayer_server::Server::new() + let custom_routes = relayer_server::Server::new(self.destination_chains.len()) .with_op_retry(sender.clone()) .with_message_queue(prep_queues) .routes(); diff --git a/rust/main/agents/relayer/src/server/message_retry.rs b/rust/main/agents/relayer/src/server/message_retry.rs index 6d160355a5..d6a26cf0c8 100644 --- a/rust/main/agents/relayer/src/server/message_retry.rs +++ b/rust/main/agents/relayer/src/server/message_retry.rs @@ -1,32 +1,97 @@ +use std::sync::Arc; + use crate::settings::matching_list::MatchingList; use axum::{extract::State, routing, Json, Router}; use derive_new::new; -use tokio::sync::broadcast::Sender; +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast::Sender, mpsc}; const MESSAGE_RETRY_API_BASE: &str = "/message_retry"; -#[derive(new, Clone)] +#[derive(Clone, Debug, new)] pub struct MessageRetryApi { - tx: Sender, + retry_request_transmitter: Sender>, + destination_chains: usize, +} + +#[derive(Clone, Debug)] +pub struct MessageRetryRequest { + pub uuid: String, + pub pattern: MatchingList, + pub transmitter: mpsc::Sender, +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct MessageRetryQueueResponse { + /// how many pending operations were evaluated + pub evaluated: usize, + /// how many of the pending operations matched the retry request pattern + pub matched: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct MessageRetryResponse { + /// ID of the retry request + pub uuid: String, + /// how many pending operations were evaluated + pub evaluated: usize, + /// how many of the pending operations matched the retry request pattern + pub matched: u64, } async fn retry_message( - State(tx): State>, + State(state): State, Json(retry_req_payload): Json, -) -> String { - match tx.send(retry_req_payload) { - Ok(_) => "Moved message(s) to the front of the queue".to_string(), - // Technically it's bad practice to print the error message to the user, but - // this endpoint is for debugging purposes only. - Err(err) => format!("Failed to send retry request to the queue: {}", err), +) -> Result, String> { + let uuid = uuid::Uuid::new_v4(); + let uuid_string = uuid.to_string(); + + tracing::debug!(?retry_req_payload); + tracing::debug!(uuid = uuid_string, "Sending message retry request"); + + // Create a channel that can hold each chain's SerialSubmitter + // message retry responses. + // 3 queues for each chain (prepare, submit, confirm) + let (transmitter, mut receiver) = mpsc::channel(3 * state.destination_chains); + state + .retry_request_transmitter + .send(Arc::new(MessageRetryRequest { + uuid: uuid_string.clone(), + pattern: retry_req_payload, + transmitter, + })) + .map_err(|err| { + // Technically it's bad practice to print the error message to the user, but + // this endpoint is for debugging purposes only. + format!("Failed to send retry request to the queue: {}", err) + })?; + + let mut resp = MessageRetryResponse { + uuid: uuid_string, + evaluated: 0, + matched: 0, + }; + + // Wait for responses from relayer + tracing::debug!(uuid = resp.uuid, "Waiting for response from relayer"); + while let Some(relayer_resp) = receiver.recv().await { + tracing::debug!( + evaluated = relayer_resp.evaluated, + matched = relayer_resp.matched, + "Received relayer response" + ); + resp.evaluated += relayer_resp.evaluated; + resp.matched += relayer_resp.matched; } + + Ok(Json(resp)) } impl MessageRetryApi { pub fn router(&self) -> Router { Router::new() .route("/", routing::post(retry_message)) - .with_state(self.tx.clone()) + .with_state(self.clone()) } pub fn get_route(&self) -> (&'static str, Router) { @@ -41,13 +106,21 @@ mod tests { use super::*; use axum::http::StatusCode; use hyperlane_core::{HyperlaneMessage, QueueOperation}; + use serde::de::DeserializeOwned; use serde_json::json; use std::net::SocketAddr; use tokio::sync::broadcast::{Receiver, Sender}; - fn setup_test_server() -> (SocketAddr, Receiver) { - let broadcast_tx = Sender::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); - let message_retry_api = MessageRetryApi::new(broadcast_tx.clone()); + #[derive(Debug)] + struct TestServerSetup { + pub socket_address: SocketAddr, + pub retry_req_rx: Receiver>, + } + + fn setup_test_server() -> TestServerSetup { + let broadcast_tx = Sender::new(ENDPOINT_MESSAGES_QUEUE_SIZE); + + let message_retry_api = MessageRetryApi::new(broadcast_tx.clone(), 10); let (path, retry_router) = message_retry_api.get_route(); let app = Router::new().nest(path, retry_router); @@ -58,12 +131,47 @@ mod tests { let addr = server.local_addr(); tokio::spawn(server); - (addr, broadcast_tx.subscribe()) + let retry_req_rx = broadcast_tx.subscribe(); + + TestServerSetup { + socket_address: addr, + retry_req_rx, + } + } + + async fn send_retry_responses_future( + mut retry_request_receiver: Receiver>, + pending_operations: Vec, + metrics: Vec<(usize, u64)>, + ) { + if let Ok(req) = retry_request_receiver.recv().await { + for (op, (evaluated, matched)) in pending_operations.iter().zip(metrics) { + // Check that the list received by the server matches the pending operation + assert!(req.pattern.op_matches(&op)); + let resp = MessageRetryQueueResponse { evaluated, matched }; + req.transmitter.send(resp).await.unwrap(); + } + } } + async fn parse_response_to_json(response: reqwest::Response) -> T { + let resp_body = response + .text() + .await + .expect("Failed to parse response body"); + let resp_json: T = + serde_json::from_str(&resp_body).expect("Failed to deserialize response body"); + resp_json + } + + #[tracing_test::traced_test] #[tokio::test] async fn test_message_id_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); // Create a random message with a random message ID @@ -75,25 +183,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_destination_domain_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -108,25 +228,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_origin_domain_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -141,25 +273,38 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_sender_address_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage::default(); @@ -170,25 +315,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_recipient_address_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage::default(); @@ -199,25 +356,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + let (_t1, response_res) = tokio::join!(respond_task, response); + + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_multiple_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -238,19 +407,27 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } } diff --git a/rust/main/agents/relayer/src/server/mod.rs b/rust/main/agents/relayer/src/server/mod.rs index 083f8d94d2..2e2c4d39aa 100644 --- a/rust/main/agents/relayer/src/server/mod.rs +++ b/rust/main/agents/relayer/src/server/mod.rs @@ -1,9 +1,9 @@ use axum::Router; use derive_new::new; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::broadcast::Sender; -use crate::{msg::op_queue::OperationPriorityQueue, settings::matching_list::MatchingList}; +use crate::msg::op_queue::OperationPriorityQueue; pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 100; @@ -15,14 +15,15 @@ mod message_retry; #[derive(new)] pub struct Server { + destination_chains: usize, #[new(default)] - retry_transmitter: Option>, + retry_transmitter: Option>>, #[new(default)] op_queues: Option>, } impl Server { - pub fn with_op_retry(mut self, transmitter: Sender) -> Self { + pub fn with_op_retry(mut self, transmitter: Sender>) -> Self { self.retry_transmitter = Some(transmitter); self } @@ -36,8 +37,8 @@ impl Server { /// Can be extended with additional routes and feature flags to enable/disable individually. pub fn routes(self) -> Vec<(&'static str, Router)> { let mut routes = vec![]; - if let Some(retry_transmitter) = self.retry_transmitter { - routes.push(MessageRetryApi::new(retry_transmitter).get_route()); + if let Some(tx) = self.retry_transmitter { + routes.push(MessageRetryApi::new(tx, self.destination_chains).get_route()); } if let Some(op_queues) = self.op_queues { routes.push(ListOperationsApi::new(op_queues).get_route()); diff --git a/rust/main/utils/run-locally/Cargo.toml b/rust/main/utils/run-locally/Cargo.toml index 9dedae9cea..a994324687 100644 --- a/rust/main/utils/run-locally/Cargo.toml +++ b/rust/main/utils/run-locally/Cargo.toml @@ -14,6 +14,7 @@ hyperlane-cosmos = { path = "../../chains/hyperlane-cosmos" } toml_edit.workspace = true k256.workspace = true jobserver.workspace = true +reqwest.workspace = true ripemd.workspace = true sha2.workspace = true serde.workspace = true diff --git a/rust/main/utils/run-locally/src/main.rs b/rust/main/utils/run-locally/src/main.rs index 7aeb3ae101..cdfbc0c7b2 100644 --- a/rust/main/utils/run-locally/src/main.rs +++ b/rust/main/utils/run-locally/src/main.rs @@ -51,6 +51,7 @@ mod invariants; mod logging; mod metrics; mod program; +mod server; mod solana; mod utils; @@ -539,6 +540,9 @@ fn main() -> ExitCode { sleep(Duration::from_secs(5)); } + // test retry request + let resp = server::run_retry_request().expect("Failed to process retry request"); + assert!(resp.matched > 0); report_test_result(failure_occurred) } diff --git a/rust/main/utils/run-locally/src/server.rs b/rust/main/utils/run-locally/src/server.rs new file mode 100644 index 0000000000..f0037ed267 --- /dev/null +++ b/rust/main/utils/run-locally/src/server.rs @@ -0,0 +1,55 @@ +use std::io; + +use reqwest::Url; + +use relayer::server::MessageRetryResponse; + +use crate::RELAYER_METRICS_PORT; + +/// create tokio runtime to send a retry request to +/// relayer to retry all existing messages in the queues +pub fn run_retry_request() -> io::Result { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + runtime + .unwrap() + .block_on(async { call_retry_request().await }) +} + +/// sends a request to relayer to retry all existing messages +/// in the queues +async fn call_retry_request() -> io::Result { + let client = reqwest::Client::new(); + + let url = Url::parse(&format!( + "http://0.0.0.0:{RELAYER_METRICS_PORT}/message_retry" + )) + .map_err(|err| { + println!("Failed to parse url: {err}"); + io::Error::new(io::ErrorKind::InvalidInput, err.to_string()) + })?; + + let body = vec![serde_json::json!({ + "message_id": "*" + })]; + let retry_response = client.post(url).json(&body).send().await.map_err(|err| { + println!("Failed to send request: {err}"); + io::Error::new(io::ErrorKind::InvalidData, err.to_string()) + })?; + + let response_text = retry_response.text().await.map_err(|err| { + println!("Failed to parse response body: {err}"); + io::Error::new(io::ErrorKind::InvalidData, err.to_string()) + })?; + + println!("Retry Request Response: {:?}", response_text); + + let response_json: MessageRetryResponse = + serde_json::from_str(&response_text).map_err(|err| { + println!("Failed to parse response body to json: {err}"); + io::Error::new(io::ErrorKind::InvalidData, err.to_string()) + })?; + + Ok(response_json) +}