From ea49f8a5c43a138fe84207bcf2a0aadd7cfe4da7 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 31 May 2024 14:50:22 +0800 Subject: [PATCH] feat(flow): make write path faster with shared lock (#4073) * feat(WIP): make write faster * feat: read lock on fast path * chore: per review --- src/flow/src/adapter.rs | 20 +++++------ src/flow/src/adapter/flownode_impl.rs | 2 +- src/flow/src/adapter/node_context.rs | 48 ++++++++++++++++----------- 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 52209a172f63..2e66f3850be8 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -159,7 +159,7 @@ pub struct FlownodeManager { table_info_source: TableSource, frontend_invoker: RwLock>>, /// contains mapping from table name to global id, and table schema - node_context: Mutex, + node_context: RwLock, flow_err_collectors: RwLock>, src_send_buf_lens: RwLock>>, tick_manager: FlowTickManager, @@ -194,7 +194,7 @@ impl FlownodeManager { query_engine, table_info_source: srv_map, frontend_invoker: RwLock::new(None), - node_context: Mutex::new(node_context), + node_context: RwLock::new(node_context), flow_err_collectors: Default::default(), src_send_buf_lens: Default::default(), tick_manager, @@ -298,7 +298,7 @@ impl FlownodeManager { } else { // TODO(discord9): condiser remove buggy auto create by schema - let node_ctx = self.node_context.lock().await; + let node_ctx = self.node_context.read().await; let gid: GlobalId = node_ctx .table_repr .get_by_name(&table_name) @@ -462,7 +462,7 @@ impl FlownodeManager { let mut output = BTreeMap::new(); for (name, sink_recv) in self .node_context - .lock() + .write() .await .sink_receiver .iter_mut() @@ -542,11 +542,11 @@ impl FlownodeManager { } // first check how many inputs were sent let (flush_res, buf_len) = if blocking { - let mut ctx = self.node_context.lock().await; - (ctx.flush_all_sender(), ctx.get_send_buf_size()) + let ctx = self.node_context.read().await; + (ctx.flush_all_sender().await, ctx.get_send_buf_size().await) } else { - match self.node_context.try_lock() { - Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()), + match self.node_context.try_read() { + Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await), Err(_) => return Ok(()), } }; @@ -580,7 +580,7 @@ impl FlownodeManager { rows.len() ); let table_id = region_id.table_id(); - self.node_context.lock().await.send(table_id, rows)?; + self.node_context.read().await.send(table_id, rows).await?; // TODO(discord9): put it in a background task? // self.run_available(false).await?; Ok(()) @@ -628,7 +628,7 @@ impl FlownodeManager { } } - let mut node_ctx = self.node_context.lock().await; + let mut node_ctx = self.node_context.write().await; // assign global id to source and sink table for source in source_table_ids { node_ctx diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 152251975ab8..11b2f6d04fc1 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -116,7 +116,7 @@ impl Flownode for FlownodeManager { let now = self.tick_manager.tick(); let fetch_order = { - let ctx = self.node_context.lock().await; + let ctx = self.node_context.read().await; let table_col_names = ctx .table_repr .get_by_table_id(&table_id) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index ffaa3cc70252..fdcc150697eb 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -21,7 +21,7 @@ use common_telemetry::debug; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, RwLock}; use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; @@ -65,7 +65,7 @@ pub struct FlownodeContext { #[derive(Debug)] pub struct SourceSender { sender: broadcast::Sender, - send_buf: VecDeque, + send_buf: RwLock>, } impl Default for SourceSender { @@ -78,6 +78,7 @@ impl Default for SourceSender { } } +// TODO: make all send operation immut impl SourceSender { pub fn get_receiver(&self) -> broadcast::Receiver { self.sender.subscribe() @@ -85,15 +86,16 @@ impl SourceSender { /// send as many as possible rows from send buf /// until send buf is empty or broadchannel is full - pub fn try_send_all(&mut self) -> Result { + pub async fn try_send_all(&self) -> Result { let mut row_cnt = 0; loop { + let mut send_buf = self.send_buf.write().await; // if inner sender channel is empty or send buf is empty, there // is nothing to do for now, just break - if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() { + if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() { break; } - if let Some(row) = self.send_buf.pop_front() { + if let Some(row) = send_buf.pop_front() { self.sender .send(row) .map_err(|err| { @@ -108,17 +110,20 @@ impl SourceSender { } if row_cnt > 0 { debug!("Send {} rows", row_cnt); - debug!("Remaining Send buf.len() = {}", self.send_buf.len()); + debug!( + "Remaining Send buf.len() = {}", + self.send_buf.read().await.len() + ); } Ok(row_cnt) } /// return number of rows it actual send(including what's in the buffer) - pub fn send_rows(&mut self, rows: Vec) -> Result { - self.send_buf.extend(rows); + pub async fn send_rows(&self, rows: Vec) -> Result { + self.send_buf.write().await.extend(rows); - let row_cnt = self.try_send_all()?; + let row_cnt = self.try_send_all().await?; Ok(row_cnt) } @@ -128,30 +133,35 @@ impl FlownodeContext { /// return number of rows it actual send(including what's in the buffer) /// /// TODO(discord9): make this concurrent - pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result { + pub async fn send(&self, table_id: TableId, rows: Vec) -> Result { let sender = self .source_sender - .get_mut(&table_id) + .get(&table_id) .with_context(|| TableNotFoundSnafu { name: table_id.to_string(), })?; // debug!("FlownodeContext::send: trying to send {} rows", rows.len()); - sender.send_rows(rows) + sender.send_rows(rows).await } /// flush all sender's buf /// /// return numbers being sent - pub fn flush_all_sender(&mut self) -> Result { - self.source_sender - .iter_mut() - .map(|(_table_id, src_sender)| src_sender.try_send_all()) - .try_fold(0, |acc, x| x.map(|x| x + acc)) + pub async fn flush_all_sender(&self) -> Result { + let mut sum = 0; + for sender in self.source_sender.values() { + sender.try_send_all().await.inspect(|x| sum += x)?; + } + Ok(sum) } /// Return the sum number of rows in all send buf - pub fn get_send_buf_size(&self) -> usize { - self.source_sender.values().map(|v| v.send_buf.len()).sum() + pub async fn get_send_buf_size(&self) -> usize { + let mut sum = 0; + for sender in self.source_sender.values() { + sum += sender.send_buf.read().await.len(); + } + sum } }