From 8b6596faa04a358e884ded0b306745df68b2c0aa Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Thu, 30 May 2024 15:29:13 +0800 Subject: [PATCH] feat: avoid some cloning when mirror requests to flownode (#4068) * feat: some refactor mirror requests to flownode * feat: use spawn_bg to avoid impact front-ground write * feat: add mirror row count metric --- src/operator/src/insert.rs | 116 ++++++++++++++++++++---------------- src/operator/src/metrics.rs | 5 ++ 2 files changed, 70 insertions(+), 51 deletions(-) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index a68ed9b6be18..b54efc06dae9 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::sync::Arc; -use api::region::RegionResponse; use api::v1::alter_expr::Kind; use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader}; use api::v1::{ @@ -191,41 +190,6 @@ impl Inserter { } impl Inserter { - fn post_request(&self, requests: RegionInsertRequests) { - let node_manager = self.node_manager.clone(); - let table_flownode_set_cache = self.table_flownode_set_cache.clone(); - // Spawn all tasks that do job for mirror insert requests for flownode - common_runtime::spawn_bg(async move { - match Self::mirror_flow_node_requests(table_flownode_set_cache, requests).await { - Ok(flow_tasks) => { - let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| { - let node_manager = node_manager.clone(); - common_runtime::spawn_write(async move { - node_manager - .flownode(&peer) - .await - .handle_inserts(inserts) - .await - .map(|flow_response| RegionResponse { - affected_rows: flow_response.affected_rows as AffectedRows, - extension: flow_response.extension, - }) - .context(RequestInsertsSnafu) - }) - }); - - if let Err(err) = future::try_join_all(flow_tasks) - .await - .context(JoinTaskSnafu) - { - warn!(err; "Failed to insert data into flownode"); - } - } - Err(err) => warn!(err; "Failed to mirror request to flownode"), - } - }); - } - async fn do_request( &self, requests: RegionInsertRequests, @@ -238,8 +202,44 @@ impl Inserter { ..Default::default() }); - let tasks = self - .group_requests_by_peer(requests.clone()) + // Mirror requests for source table to flownode + match self.mirror_flow_node_requests(&requests).await { + Ok(flow_requests) => { + let node_manager = self.node_manager.clone(); + let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| { + let node_manager = node_manager.clone(); + common_runtime::spawn_bg(async move { + node_manager + .flownode(&peer) + .await + .handle_inserts(inserts) + .await + .context(RequestInsertsSnafu) + }) + }); + + match future::try_join_all(flow_tasks) + .await + .context(JoinTaskSnafu) + { + Ok(ret) => { + let affected_rows = ret + .into_iter() + .map(|resp| resp.map(|r| r.affected_rows)) + .sum::>() + .unwrap_or(0); + crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows); + } + Err(err) => { + warn!(err; "Failed to insert data into flownode"); + } + } + } + Err(err) => warn!(err; "Failed to mirror request to flownode"), + } + + let write_tasks = self + .group_requests_by_peer(requests) .await? .into_iter() .map(|(peer, inserts)| { @@ -254,8 +254,9 @@ impl Inserter { .context(RequestInsertsSnafu) }) }); - let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - self.post_request(requests); + let results = future::try_join_all(write_tasks) + .await + .context(JoinTaskSnafu)?; let affected_rows = results .into_iter() .map(|resp| resp.map(|r| r.affected_rows)) @@ -269,21 +270,22 @@ impl Inserter { /// Mirror requests for source table to flownode async fn mirror_flow_node_requests( - table_flownode_set_cache: TableFlownodeSetCacheRef, - requests: RegionInsertRequests, + &self, + requests: &RegionInsertRequests, ) -> Result> { // store partial source table requests used by flow node(only store what's used) let mut src_table_reqs: HashMap, RegionInsertRequests)>> = HashMap::new(); - for req in requests.requests { - match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) { - Some(Some((_peers, reqs))) => reqs.requests.push(req), + for req in &requests.requests { + let table_id = RegionId::from_u64(req.region_id).table_id(); + match src_table_reqs.get_mut(&table_id) { + Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()), // already know this is not source table Some(None) => continue, _ => { - let table_id = RegionId::from_u64(req.region_id).table_id(); // TODO(discord9): determine where to store the flow node address in distributed mode - let peers = table_flownode_set_cache + let peers = self + .table_flownode_set_cache .get(table_id) .await .context(RequestInsertsSnafu)? @@ -294,7 +296,7 @@ impl Inserter { if !peers.is_empty() { let mut reqs = RegionInsertRequests::default(); - reqs.requests.push(req); + reqs.requests.push(req.clone()); src_table_reqs.insert(table_id, Some((peers, reqs))); } else { // insert a empty entry to avoid repeat query @@ -310,14 +312,26 @@ impl Inserter { .into_iter() .filter_map(|(k, v)| v.map(|v| (k, v))) { - for flownode in peers { + if peers.len() == 1 { + // fast path, zero copy inserts - .entry(flownode.clone()) + .entry(peers[0].clone()) .or_default() .requests - .extend(reqs.requests.clone()); + .extend(reqs.requests); + continue; + } else { + // TODO(discord9): need to split requests to multiple flownodes + for flownode in peers { + inserts + .entry(flownode.clone()) + .or_default() + .requests + .extend(reqs.requests.clone()); + } } } + Ok(inserts) } diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 932aca168003..97c5e0015a55 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -36,6 +36,11 @@ lazy_static! { "table operator ingest rows" ) .unwrap(); + pub static ref DIST_MIRROR_ROW_COUNT: IntCounter = register_int_counter!( + "greptime_table_operator_mirror_rows", + "table operator mirror rows" + ) + .unwrap(); pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!( "greptime_table_operator_delete_rows", "table operator delete rows"