Skip to content

Commit

Permalink
feat: avoid some cloning when mirror requests to flownode (GreptimeTe…
Browse files Browse the repository at this point in the history
…am#4068)

* feat: some refactor mirror requests to flownode

* feat: use spawn_bg to avoid impact front-ground write

* feat: add mirror row count metric
  • Loading branch information
fengjiachun authored May 30, 2024
1 parent eab309f commit 8b6596f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 51 deletions.
116 changes: 65 additions & 51 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::alter_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
Expand Down Expand Up @@ -191,41 +190,6 @@ impl Inserter {
}

impl Inserter {
fn post_request(&self, requests: RegionInsertRequests) {
let node_manager = self.node_manager.clone();
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
// Spawn all tasks that do job for mirror insert requests for flownode
common_runtime::spawn_bg(async move {
match Self::mirror_flow_node_requests(table_flownode_set_cache, requests).await {
Ok(flow_tasks) => {
let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_write(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.map(|flow_response| RegionResponse {
affected_rows: flow_response.affected_rows as AffectedRows,
extension: flow_response.extension,
})
.context(RequestInsertsSnafu)
})
});

if let Err(err) = future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
warn!(err; "Failed to insert data into flownode");
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}
});
}

async fn do_request(
&self,
requests: RegionInsertRequests,
Expand All @@ -238,8 +202,44 @@ impl Inserter {
..Default::default()
});

let tasks = self
.group_requests_by_peer(requests.clone())
// Mirror requests for source table to flownode
match self.mirror_flow_node_requests(&requests).await {
Ok(flow_requests) => {
let node_manager = self.node_manager.clone();
let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_bg(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.context(RequestInsertsSnafu)
})
});

match future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
Ok(ret) => {
let affected_rows = ret
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<u64>>()
.unwrap_or(0);
crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
}
Err(err) => {
warn!(err; "Failed to insert data into flownode");
}
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}

let write_tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
Expand All @@ -254,8 +254,9 @@ impl Inserter {
.context(RequestInsertsSnafu)
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
self.post_request(requests);
let results = future::try_join_all(write_tasks)
.await
.context(JoinTaskSnafu)?;
let affected_rows = results
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
Expand All @@ -269,21 +270,22 @@ impl Inserter {

/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests(
table_flownode_set_cache: TableFlownodeSetCacheRef,
requests: RegionInsertRequests,
&self,
requests: &RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in requests.requests {
match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) {
Some(Some((_peers, reqs))) => reqs.requests.push(req),
for req in &requests.requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let table_id = RegionId::from_u64(req.region_id).table_id();
// TODO(discord9): determine where to store the flow node address in distributed mode
let peers = table_flownode_set_cache
let peers = self
.table_flownode_set_cache
.get(table_id)
.await
.context(RequestInsertsSnafu)?
Expand All @@ -294,7 +296,7 @@ impl Inserter {

if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req);
reqs.requests.push(req.clone());
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
Expand All @@ -310,14 +312,26 @@ impl Inserter {
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
for flownode in peers {
if peers.len() == 1 {
// fast path, zero copy
inserts
.entry(flownode.clone())
.entry(peers[0].clone())
.or_default()
.requests
.extend(reqs.requests.clone());
.extend(reqs.requests);
continue;
} else {
// TODO(discord9): need to split requests to multiple flownodes
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
}

Ok(inserts)
}

Expand Down
5 changes: 5 additions & 0 deletions src/operator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ lazy_static! {
"table operator ingest rows"
)
.unwrap();
pub static ref DIST_MIRROR_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_mirror_rows",
"table operator mirror rows"
)
.unwrap();
pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_delete_rows",
"table operator delete rows"
Expand Down

0 comments on commit 8b6596f

Please sign in to comment.