diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 25bb3cb2bf2c..52209a172f63 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -35,12 +35,12 @@ use itertools::Itertools; use query::{QueryEngine, QueryEngineFactory}; use serde::{Deserialize, Serialize}; use session::context::QueryContext; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; use tokio::sync::{oneshot, watch, Mutex, RwLock}; -use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; +use crate::adapter::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::table_source::TableSource; use crate::adapter::util::column_schemas_to_proto; @@ -66,6 +66,11 @@ use error::Error; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; +// TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9 +pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; + +pub const UPDATE_AT_TS_COL: &str = "update_at"; + // TODO: refactor common types for flow to a separate module /// FlowId is a unique identifier for a flow task pub type FlowId = u64; @@ -279,10 +284,16 @@ impl FlownodeManager { .map(|i| meta.schema.column_schemas[i].name.clone()) .collect_vec(); let schema = meta.schema.column_schemas; - let is_auto_create = schema - .last() - .map(|s| s.name == "__ts_placeholder") - .unwrap_or(false); + // check if the last column is the auto created timestamp column, hence the table is auto created from + // flow's plan type + let is_auto_create = { + let correct_name = schema + .last() + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) + .unwrap_or(false); + let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1); + correct_name && correct_time_index + }; (primary_keys, schema, is_auto_create) } else { // TODO(discord9): condiser remove buggy auto create by schema @@ -302,6 +313,7 @@ impl FlownodeManager { .clone(); // TODO(discord9): use default key from schema let primary_keys = schema + .typ() .keys .first() .map(|v| { @@ -312,24 +324,31 @@ impl FlownodeManager { }) .unwrap_or_default(); let update_at = ColumnSchema::new( - "update_at", + UPDATE_AT_TS_COL, ConcreteDataType::timestamp_millisecond_datatype(), true, ); // TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one let ts_col = ColumnSchema::new( - "__ts_placeholder", + AUTO_CREATED_PLACEHOLDER_TS_COL, ConcreteDataType::timestamp_millisecond_datatype(), true, ) .with_time_index(true); let wout_ts = schema + .typ() .column_types + .clone() .into_iter() .enumerate() .map(|(idx, typ)| { - ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable) + let name = schema + .names + .get(idx) + .cloned() + .unwrap_or(format!("Col_{}", idx)); + ColumnSchema::new(name, typ.scalar_type, typ.nullable) }) .collect_vec(); @@ -339,7 +358,7 @@ impl FlownodeManager { (primary_keys, with_ts, true) }; - + let schema_len = schema.len(); let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; debug!( @@ -348,16 +367,7 @@ impl FlownodeManager { table_name.join("."), reqs ); - let now = SystemTime::now(); - let now = now - .duration_since(SystemTime::UNIX_EPOCH) - .map(|s| s.as_millis() as repr::Timestamp) - .unwrap_or_else(|_| { - -(SystemTime::UNIX_EPOCH - .duration_since(now) - .unwrap() - .as_millis() as repr::Timestamp) - }); + let now = self.tick_manager.tick(); for req in reqs { match req { DiffRequest::Insert(insert) => { @@ -370,13 +380,23 @@ impl FlownodeManager { ))]); // ts col, if auto create if is_auto_create { + ensure!( + row.len() == schema_len - 1, + InternalSnafu { + reason: format!( + "Row len mismatch, expect {} got {}", + schema_len - 1, + row.len() + ) + } + ); row.extend([Value::from( common_time::Timestamp::new_millisecond(0), )]); } - row.into() + Ok(row.into()) }) - .collect::>(); + .collect::, Error>>()?; let table_name = table_name.last().unwrap().clone(); let req = RowInsertRequest { table_name, @@ -490,9 +510,12 @@ impl FlownodeManager { debug!("Starting to run"); loop { // TODO(discord9): only run when new inputs arrive or scheduled to - self.run_available().await.unwrap(); + debug!("call run_available in run every second"); + self.run_available(true).await.unwrap(); + debug!("call send_writeback_requests in run every second"); // TODO(discord9): error handling self.send_writeback_requests().await.unwrap(); + debug!("call log_all_errors in run every second"); self.log_all_errors().await; tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -501,29 +524,44 @@ impl FlownodeManager { /// Run all available subgraph in the flow node /// This will try to run all dataflow in this node /// - /// However this is not blocking and can sometimes return while actual computation is still running in worker thread + /// set `blocking` to true to wait until lock is acquired + /// and false to return immediately if lock is not acquired /// TODO(discord9): add flag for subgraph that have input since last run - pub async fn run_available(&self) -> Result<(), Error> { - let now = self.tick_manager.tick(); - + pub async fn run_available(&self, blocking: bool) -> Result<(), Error> { loop { + let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { // TODO(discord9): consider how to handle error in individual worker - worker.lock().await.run_available(now).await.unwrap(); + if blocking { + worker.lock().await.run_available(now).await?; + } else if let Ok(worker) = worker.try_lock() { + worker.run_available(now).await?; + } else { + return Ok(()); + } } // first check how many inputs were sent - let send_cnt = match self.node_context.lock().await.flush_all_sender() { - Ok(cnt) => cnt, + let (flush_res, buf_len) = if blocking { + let mut ctx = self.node_context.lock().await; + (ctx.flush_all_sender(), ctx.get_send_buf_size()) + } else { + match self.node_context.try_lock() { + Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()), + Err(_) => return Ok(()), + } + }; + match flush_res { + Ok(_) => (), Err(err) => { common_telemetry::error!("Flush send buf errors: {:?}", err); break; } }; - // if no inputs - if send_cnt == 0 { + // if no thing in send buf then break + if buf_len == 0 { break; } else { - debug!("FlownodeManager::run_available: send_cnt={}", send_cnt); + debug!("Send buf len = {}", buf_len); } } @@ -543,6 +581,8 @@ impl FlownodeManager { ); let table_id = region_id.table_id(); self.node_context.lock().await.send(table_id, rows)?; + // TODO(discord9): put it in a background task? + // self.run_available(false).await?; Ok(()) } } @@ -653,21 +693,22 @@ impl FlownodeManager { /// /// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid /// TSO coord mess -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct FlowTickManager { + /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp start: Instant, -} - -impl std::fmt::Debug for FlowTickManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FlowTickManager").finish() - } + /// The timestamp when the flow started + start_timestamp: repr::Timestamp, } impl FlowTickManager { pub fn new() -> Self { FlowTickManager { start: Instant::now(), + start_timestamp: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as repr::Timestamp, } } @@ -677,6 +718,6 @@ impl FlowTickManager { pub fn tick(&self) -> repr::Timestamp { let current = Instant::now(); let since_the_epoch = current - self.start; - since_the_epoch.as_millis() as repr::Timestamp + since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp } } diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index e770bb5e4cf1..152251975ab8 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -14,13 +14,17 @@ //! impl `FlowNode` trait for FlowNodeManager so standalone can call them +use std::collections::HashMap; + use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; +use common_telemetry::debug; use itertools::Itertools; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use crate::adapter::FlownodeManager; use crate::repr::{self, DiffRow}; @@ -101,12 +105,57 @@ impl Flownode for FlownodeManager { async fn handle_inserts(&self, request: InsertRequests) -> Result { for write_request in request.requests { let region_id = write_request.region_id; - let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]); + let table_id = RegionId::from(region_id).table_id(); + + let (insert_schema, rows_proto) = write_request + .rows + .map(|r| (r.schema, r.rows)) + .unwrap_or_default(); + // TODO(discord9): reconsider time assignment mechanism let now = self.tick_manager.tick(); + + let fetch_order = { + let ctx = self.node_context.lock().await; + let table_col_names = ctx + .table_repr + .get_by_table_id(&table_id) + .map(|r| r.1) + .and_then(|id| ctx.schema.get(&id)) + .map(|desc| &desc.names) + .context(UnexpectedSnafu { + err_msg: format!("Table not found: {}", table_id), + })?; + let name_to_col = HashMap::<_, _>::from_iter( + insert_schema + .iter() + .enumerate() + .map(|(i, name)| (&name.column_name, i)), + ); + let fetch_order: Vec = table_col_names + .iter() + .map(|names| { + name_to_col.get(names).copied().context(UnexpectedSnafu { + err_msg: format!("Column not found: {}", names), + }) + }) + .try_collect()?; + if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) { + debug!("Reordering columns: {:?}", fetch_order) + } + fetch_order + }; + let rows: Vec = rows_proto .into_iter() - .map(repr::Row::from) + .map(|r| { + let r = repr::Row::from(r); + let reordered = fetch_order + .iter() + .map(|&i| r.inner[i].clone()) + .collect_vec(); + repr::Row::new(reordered) + }) .map(|r| (r, now, 1)) .collect_vec(); self.handle_write_request(region_id.into(), rows) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index b1d01373fb8a..ffaa3cc70252 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; -use crate::repr::{DiffRow, RelationType, BROADCAST_CAP}; +use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP}; /// A context that holds the information of the dataflow #[derive(Default, Debug)] @@ -51,10 +51,8 @@ pub struct FlownodeContext { mpsc::UnboundedReceiver, ), >, - /// store source in buffer for each source table, in case broadcast channel is full - pub send_buffer: BTreeMap>, /// the schema of the table, query from metasrv or inferred from TypedPlan - pub schema: HashMap, + pub schema: HashMap, /// All the tables that have been registered in the worker pub table_repr: IdToNameMap, pub query_context: Option>, @@ -73,7 +71,8 @@ pub struct SourceSender { impl Default for SourceSender { fn default() -> Self { Self { - sender: broadcast::Sender::new(BROADCAST_CAP), + // TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data + sender: broadcast::Sender::new(BROADCAST_CAP * 2), send_buf: Default::default(), } } @@ -109,6 +108,7 @@ impl SourceSender { } if row_cnt > 0 { debug!("Send {} rows", row_cnt); + debug!("Remaining Send buf.len() = {}", self.send_buf.len()); } Ok(row_cnt) @@ -140,12 +140,19 @@ impl FlownodeContext { } /// flush all sender's buf + /// + /// return numbers being sent pub fn flush_all_sender(&mut self) -> Result { self.source_sender .iter_mut() .map(|(_table_id, src_sender)| src_sender.try_send_all()) .try_fold(0, |acc, x| x.map(|x| x + acc)) } + + /// Return the sum number of rows in all send buf + pub fn get_send_buf_size(&self) -> usize { + self.source_sender.values().map(|v| v.send_buf.len()).sum() + } } impl FlownodeContext { @@ -226,7 +233,7 @@ impl FlownodeContext { /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. /// /// Returns an error if no table has been registered with the provided names - pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> { + pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> { let id = self .table_repr .get_by_name(name) @@ -297,7 +304,7 @@ impl FlownodeContext { .get_by_name(table_name) .map(|(_, gid)| gid) .unwrap(); - self.schema.insert(gid, schema); + self.schema.insert(gid, schema.into_unnamed()); Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index cfa41f785ac8..53932cd692c2 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -17,7 +17,6 @@ use common_error::ext::BoxedError; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; -use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; @@ -25,7 +24,7 @@ use crate::adapter::error::{ Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; use crate::adapter::TableName; -use crate::repr::{self, ColumnType, RelationType}; +use crate::repr::{self, ColumnType, RelationDesc, RelationType}; /// mapping of table name <-> table id should be query from tableinfo manager pub struct TableSource { @@ -107,7 +106,7 @@ impl TableSource { pub async fn get_table_name_schema( &self, table_id: &TableId, - ) -> Result<(TableName, RelationType), Error> { + ) -> Result<(TableName, RelationDesc), Error> { let table_info_value = self .get_table_info_value(table_id) .await? @@ -123,14 +122,20 @@ impl TableSource { ]; let raw_schema = table_info_value.table_info.meta.schema; - let column_types = raw_schema + let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema .column_schemas + .clone() .into_iter() - .map(|col| ColumnType { - nullable: col.is_nullable(), - scalar_type: col.data_type, + .map(|col| { + ( + ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }, + col.name, + ) }) - .collect_vec(); + .unzip(); let key = table_info_value.table_info.meta.primary_key_indices; let keys = vec![repr::Key::from(key)]; @@ -138,10 +143,13 @@ impl TableSource { let time_index = raw_schema.timestamp_index; Ok(( table_name, - RelationType { - column_types, - keys, - time_index, + RelationDesc { + typ: RelationType { + column_types, + keys, + time_index, + }, + names: col_names, }, )) } diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 2261f4de14f7..50bd48f5fb70 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -124,9 +124,13 @@ fn mfp_subgraph( // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. // 2. Output the updates. // 3. Truncate all updates within that range. - let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = arrange.read().last_compaction_time(); let from = from.unwrap_or(repr::Timestamp::MIN); - let output_kv = arrange.read().get_updates_in_range(from..=now); + let range = ( + std::ops::Bound::Excluded(from), + std::ops::Bound::Included(now), + ); + let output_kv = arrange.read().get_updates_in_range(range); // the output is expected to be key -> empty val let output = output_kv .into_iter() diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index e46f8c2bedc3..fa29a6324215 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -26,7 +26,7 @@ use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::{Context, SubgraphArg}; use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; -use crate::expr::error::{DataTypeSnafu, InternalSnafu}; +use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; @@ -301,9 +301,13 @@ fn update_reduce_distinct_arrange( // Deal with output: // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. - let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = arrange.read().last_compaction_time(); let from = from.unwrap_or(repr::Timestamp::MIN); - let output_kv = arrange.read().get_updates_in_range(from..=now); + let range = ( + std::ops::Bound::Excluded(from), + std::ops::Bound::Included(now), + ); + let output_kv = arrange.read().get_updates_in_range(range); // 2. Truncate all updates stored in arrangement within that range. let run_compaction = || { @@ -397,6 +401,29 @@ fn reduce_accum_subgraph( // TODO(discord9): consider key-based lock let mut arrange = arrange.write(); for (key, value_diffs) in key_to_vals { + if let Some(expire_man) = &arrange.get_expire_state() { + let mut is_expired = false; + err_collector.run(|| { + if let Some(expired) = expire_man.get_expire_duration(now, &key)? { + is_expired = true; + // expired data is ignored in computation, and a simple warning is logged + common_telemetry::warn!( + "Data already expired: {}", + DataAlreadyExpiredSnafu { + expired_by: expired, + } + .build() + ); + Ok(()) + } else { + Ok(()) + } + }); + if is_expired { + // errors already collected, we can just continue to next key + continue; + } + } let col_diffs = { let row_len = value_diffs[0].0.len(); let res = err_collector.run(|| get_col_diffs(value_diffs, row_len)); diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 33ecb9670caa..96411b6d04b0 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -20,12 +20,14 @@ use common_telemetry::{debug, info}; use hydroflow::scheduled::graph_ext::GraphExt; use itertools::Itertools; use snafu::OptionExt; +use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, mpsc}; use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::Context; use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; -use crate::expr::GlobalId; +use crate::expr::error::InternalSnafu; +use crate::expr::{EvalError, GlobalId}; use crate::repr::{DiffRow, Row, BROADCAST_CAP}; #[allow(clippy::mutable_key_type)] @@ -55,18 +57,43 @@ impl<'referred, 'df> Context<'referred, 'df> { .df .add_subgraph_source("source", send_port, move |_ctx, send| { let now = *now.borrow(); - let arr = arrange_handler_inner.write().get_updates_in_range(..=now); - err_collector.run(|| arrange_handler_inner.write().compact_to(now)); + // write lock to prevent unexpected mutation + let mut arranged = arrange_handler_inner.write(); + let arr = arranged.get_updates_in_range(..=now); + err_collector.run(|| arranged.compact_to(now)); + debug!("Call source"); let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); let mut to_send = Vec::new(); let mut to_arrange = Vec::new(); // TODO(discord9): handling tokio broadcast error - while let Ok((r, t, d)) = src_recv.try_recv() { - if t <= now { - to_send.push((r, t, d)); - } else { - to_arrange.push(((r, Row::empty()), t, d)); + loop { + match src_recv.try_recv() { + Ok((r, t, d)) => { + if t <= now { + to_send.push((r, t, d)); + } else { + to_arrange.push(((r, Row::empty()), t, d)); + } + } + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Lagged(lag_offset)) => { + common_telemetry::error!("Flow missing {} rows behind", lag_offset); + break; + } + Err(err) => { + err_collector.run(|| -> Result<(), EvalError> { + InternalSnafu { + reason: format!( + "Error receiving from broadcast channel: {}", + err + ), + } + .fail() + }); + } } } let all = prev_avail.chain(to_send).collect_vec(); @@ -77,10 +104,10 @@ impl<'referred, 'df> Context<'referred, 'df> { to_arrange.len() ); } - err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); + err_collector.run(|| arranged.apply_updates(now, to_arrange)); send.give(all); - // always schedule source to run at next tick - inner_schd.schedule_at(now + 1); + // always schedule source to run at now so we can repeatedly run source if needed + inner_schd.schedule_at(now); }); schd.set_cur_subgraph(sub); let arranged = Arranged::new(arrange_handler); diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 5a2823423974..09ad758056ba 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -100,4 +100,11 @@ pub enum EvalError { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Incoming data already expired by {} ms", expired_by))] + DataAlreadyExpired { + expired_by: i64, + #[snafu(implicit)] + location: Location, + }, } diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 7957f70cb6c4..31131a2758eb 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -76,6 +76,13 @@ impl UnmaterializableFunc { } } + pub fn is_valid_func_name(name: &str) -> bool { + matches!( + name.to_lowercase().as_str(), + "now" | "current_schema" | "tumble" + ) + } + /// Create a UnmaterializableFunc from a string of the function name pub fn from_str_args(name: &str, args: Vec) -> Result { match name.to_lowercase().as_str() { @@ -183,6 +190,13 @@ impl UnaryFunc { } } + pub fn is_valid_func_name(name: &str) -> bool { + matches!( + name.to_lowercase().as_str(), + "not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast" + ) + } + /// Create a UnaryFunc from a string of the function name and given argument type(optional) pub fn from_str_and_type( name: &str, @@ -278,9 +292,9 @@ impl UnaryFunc { start_time, } => { let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()).unwrap_or(0); + let start_time = start_time.map(|t| t.val()); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = start_time + (ts - start_time) / window_size * window_size; + let window_start = get_window_start(ts, window_size, start_time); let ret = Timestamp::new_millisecond(window_start); Ok(Value::from(ret)) @@ -290,9 +304,9 @@ impl UnaryFunc { start_time, } => { let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()).unwrap_or(0); + let start_time = start_time.map(|t| t.val()); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = start_time + (ts - start_time) / window_size * window_size; + let window_start = get_window_start(ts, window_size, start_time); let window_end = window_start + window_size; let ret = Timestamp::new_millisecond(window_end); @@ -302,6 +316,35 @@ impl UnaryFunc { } } +fn get_window_start( + ts: repr::Timestamp, + window_size: repr::Duration, + start_time: Option, +) -> repr::Timestamp { + let start_time = start_time.unwrap_or(0); + // left close right open + if ts >= start_time { + start_time + (ts - start_time) / window_size * window_size + } else { + start_time + (ts - start_time) / window_size * window_size + - if ((start_time - ts) % window_size) != 0 { + window_size + } else { + 0 + } + } +} + +#[test] +fn test_get_window_start() { + assert_eq!(get_window_start(1, 3, None), 0); + assert_eq!(get_window_start(3, 3, None), 3); + assert_eq!(get_window_start(0, 3, None), 0); + + assert_eq!(get_window_start(-1, 3, None), -3); + assert_eq!(get_window_start(-3, 3, None), -3); +} + fn get_ts_as_millisecond(arg: Value) -> Result { let ts = if let Some(ts) = arg.as_timestamp() { ts.convert_to(TimeUnit::Millisecond) @@ -550,6 +593,27 @@ impl BinaryFunc { Ok(ret) } + pub fn is_valid_func_name(name: &str) -> bool { + matches!( + name.to_lowercase().as_str(), + "eq" | "equal" + | "not_eq" + | "not_equal" + | "lt" + | "lte" + | "gt" + | "gte" + | "add" + | "sub" + | "subtract" + | "mul" + | "multiply" + | "div" + | "divide" + | "mod" + ) + } + /// choose the appropriate specialization based on the input types /// return a specialization of the binary function and it's actual input and output type(so no null type present) /// @@ -741,6 +805,10 @@ impl VariadicFunc { } } + pub fn is_valid_func_name(name: &str) -> bool { + matches!(name.to_lowercase().as_str(), "and" | "or") + } + /// Create a VariadicFunc from a string of the function name and given argument types(optional) pub fn from_str_and_types( name: &str, diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index dfd5fcd0f214..984d6f1a44a6 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -45,6 +45,8 @@ impl TypedExpr { impl TypedExpr { /// expand multi-value expression to multiple expressions with new indices + /// + /// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling` pub fn expand_multi_value( input_typ: &RelationType, exprs: &[TypedExpr], diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 59edb31616fa..09e0b88344b7 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -262,6 +262,19 @@ impl RelationType { true } + + /// Return relation describe with column names + pub fn into_named(self, names: Vec) -> RelationDesc { + RelationDesc { typ: self, names } + } + + /// Return relation describe without column names + pub fn into_unnamed(self) -> RelationDesc { + RelationDesc { + typ: self, + names: vec![], + } + } } /// The type of a `Value` @@ -325,8 +338,8 @@ fn return_true() -> bool { /// Individual column names are optional. #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] pub struct RelationDesc { - typ: RelationType, - names: Vec, + pub typ: RelationType, + pub names: Vec, } impl RelationDesc { diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 9fe0b73d3642..6f93e36e9682 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -211,7 +211,7 @@ mod test { let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema); + schemas.insert(gid, schema.into_unnamed()); } { @@ -225,7 +225,7 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), ColumnType::new(CDT::datetime_datatype(), false), ]); - schemas.insert(gid, schema); + schemas.insert(gid, schema.into_unnamed()); tri_map.insert(Some(name.clone()), Some(1025), gid); } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index d21df2cf6907..8b69146c153a 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -435,6 +435,236 @@ mod test { use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + /// TODO(discord9): add more illegal sql tests + #[tokio::test] + async fn test_tumble_composite() { + let engine = create_test_query_engine(); + let sql = + "SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_exprs = vec![ + AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }, + AggregateExpr { + func: AggregateFunc::Count, + expr: ScalarExpr::Column(0), + distinct: false, + }, + ]; + let avg_expr = ScalarExpr::If { + cond: Box::new(ScalarExpr::Column(4).call_binary( + ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()), + BinaryFunc::NotEq, + )), + then: Box::new(ScalarExpr::Column(3).call_binary( + ScalarExpr::Column(4).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())), + BinaryFunc::DivUInt64, + )), + els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), + }; + let expected = TypedPlan { + // TODO(discord9): mfp indirectly ref to key columns + /* + .with_key(vec![1]) + .with_time_index(Some(0)),*/ + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(0), + ]) + .unwrap() + .project(vec![2, 3, 4]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .project(vec![0, 1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: aggr_exprs.clone(), + simple_aggrs: vec![ + AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0), + AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1), + ], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + // keys + ColumnType::new(CDT::datetime_datatype(), false), // window start(time index) + ColumnType::new(CDT::datetime_datatype(), false), // window end(pk) + ColumnType::new(CDT::uint32_datatype(), false), // number(pk) + // values + ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) + ColumnType::new(CDT::int64_datatype(), true), // avg.count(number) + ]) + .with_key(vec![1, 2]) + .with_time_index(Some(0)), + ), + ), + mfp: MapFilterProject::new(5) + .map(vec![ + avg_expr, + ScalarExpr::Column(2), // number(pk) + ScalarExpr::Column(5), // avg.sum(number) + ScalarExpr::Column(0), // window start + ScalarExpr::Column(1), // window end + ]) + .unwrap() + .project(vec![6, 7, 8, 9]) + .unwrap(), + }, + typ: RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), // number + ColumnType::new(CDT::uint64_datatype(), true), // avg(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]), + }; + assert_eq!(flow_plan, expected); + } + + #[tokio::test] + async fn test_tumble_parse_optional() { + let engine = create_test_query_engine(); + let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour')"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_expr = AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }; + let expected = TypedPlan { + typ: RelationType::new(vec![ + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]), + // TODO(discord9): mfp indirectly ref to key columns + /* + .with_key(vec![1]) + .with_time_index(Some(0)),*/ + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ]) + .unwrap() + .project(vec![2, 3]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .project(vec![0, 1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ]) + .with_key(vec![1]) + .with_time_index(Some(0)), + ), + ), + mfp: MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2), + ScalarExpr::Column(3), + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![4, 5, 6]) + .unwrap(), + }, + }; + assert_eq!(flow_plan, expected); + } + #[tokio::test] async fn test_tumble_parse() { let engine = create_test_query_engine(); diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 7e0dc2df3b62..74fc7ef61753 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -101,8 +101,7 @@ impl TypedExpr { .unzip(); match arg_len { - // because variadic function can also have 1 arguments, we need to check if it's a variadic function first - 1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => { + 1 if UnaryFunc::is_valid_func_name(fn_name) => { let func = UnaryFunc::from_str_and_type(fn_name, None)?; let arg = arg_exprs[0].clone(); let ret_type = ColumnType::new_nullable(func.signature().output.clone()); @@ -124,8 +123,7 @@ impl TypedExpr { Ok(TypedExpr::new(arg.call_unary(func), ret_type)) } - // because variadic function can also have 2 arguments, we need to check if it's a variadic function first - 2 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => { + 2 if BinaryFunc::is_valid_func_name(fn_name) => { let (func, signature) = BinaryFunc::from_str_expr_and_type(fn_name, &arg_exprs, &arg_types[0..2])?; @@ -167,7 +165,8 @@ impl TypedExpr { Ok(TypedExpr::new(ret_expr, ret_type)) } _var => { - if let Ok(func) = VariadicFunc::from_str_and_types(fn_name, &arg_types) { + if VariadicFunc::is_valid_func_name(fn_name) { + let func = VariadicFunc::from_str_and_types(fn_name, &arg_types)?; let ret_type = ColumnType::new_nullable(func.signature().output.clone()); let mut expr = ScalarExpr::CallVariadic { func, @@ -175,9 +174,8 @@ impl TypedExpr { }; expr.optimize(); Ok(TypedExpr::new(expr, ret_type)) - } else if let Ok(func) = - UnmaterializableFunc::from_str_args(fn_name, arg_typed_exprs) - { + } else if UnmaterializableFunc::is_valid_func_name(fn_name) { + let func = UnmaterializableFunc::from_str_args(fn_name, arg_typed_exprs)?; let ret_type = ColumnType::new_nullable(func.signature().output.clone()); Ok(TypedExpr::new( ScalarExpr::CallUnmaterializable(func), @@ -324,8 +322,12 @@ impl TypedExpr { #[cfg(test)] mod test { + use std::collections::HashMap; + + use common_time::{DateTime, Interval}; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; + use pretty_assertions::assert_eq; use super::*; use crate::expr::{GlobalId, MapFilterProject}; @@ -510,4 +512,162 @@ mod test { assert_eq!(flow_plan.unwrap(), expected); } + + #[test] + fn test_func_sig() { + fn lit(v: impl ToString) -> substrait_proto::proto::FunctionArgument { + use substrait_proto::proto::expression; + let expr = Expression { + rex_type: Some(expression::RexType::Literal(expression::Literal { + nullable: false, + type_variation_reference: 0, + literal_type: Some(expression::literal::LiteralType::String(v.to_string())), + })), + }; + substrait_proto::proto::FunctionArgument { + arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( + expr, + )), + } + } + fn col(i: usize) -> substrait_proto::proto::FunctionArgument { + use substrait_proto::proto::expression; + let expr = Expression { + rex_type: Some(expression::RexType::Selection(Box::new( + expression::FieldReference { + reference_type: Some( + expression::field_reference::ReferenceType::DirectReference( + expression::ReferenceSegment { + reference_type: Some( + expression::reference_segment::ReferenceType::StructField( + Box::new(expression::reference_segment::StructField { + field: i as i32, + child: None, + }), + ), + ), + }, + ), + ), + root_type: None, + }, + ))), + }; + substrait_proto::proto::FunctionArgument { + arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( + expr, + )), + } + } + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0)], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "is_null".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + assert_eq!( + res, + TypedExpr { + expr: ScalarExpr::Column(0).call_unary(UnaryFunc::IsNull), + typ: ColumnType { + scalar_type: CDT::boolean_datatype(), + nullable: true, + }, + } + ); + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0), col(1)], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::uint32_datatype(), false), + ]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "add".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + assert_eq!( + res, + TypedExpr { + expr: ScalarExpr::Column(0) + .call_binary(ScalarExpr::Column(1), BinaryFunc::AddUInt32,), + typ: ColumnType { + scalar_type: CDT::uint32_datatype(), + nullable: true, + }, + } + ); + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0), lit("1 second"), lit("2021-07-01 00:00:00")], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ + ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), + ColumnType::new(CDT::string_datatype(), false), + ]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "tumble".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + assert_eq!( + res, + ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { + ts: Box::new( + ScalarExpr::Column(0) + .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) + ), + window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), + start_time: Some(DateTime::new(1625097600000)) + }) + .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), + ); + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0), lit("1 second")], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ + ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), + ColumnType::new(CDT::string_datatype(), false), + ]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "tumble".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + assert_eq!( + res, + ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { + ts: Box::new( + ScalarExpr::Column(0) + .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) + ), + window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), + start_time: None + }) + .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), + ) + } } diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 0dedc9e5356b..337eba7eef45 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -269,7 +269,7 @@ impl TypedPlan { id: crate::expr::Id::Global(table.0), }; let get_table = TypedPlan { - typ: table.1, + typ: table.1.typ().clone(), plan: get_table, }; diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 93edf176e77a..30d48f0319d4 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Bound; use std::sync::Arc; +use common_telemetry::debug; use itertools::Itertools; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; @@ -86,7 +87,7 @@ impl KeyExpiryManager { /// /// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired. /// - If it's not expired, return None - pub fn update_event_ts( + pub fn get_expire_duration_and_update_event_ts( &mut self, now: Timestamp, row: &Row, @@ -95,6 +96,33 @@ impl KeyExpiryManager { return Ok(None); }; + self.event_ts_to_key + .entry(event_ts) + .or_default() + .insert(row.clone()); + + if let Some(expire_time) = self.compute_expiration_timestamp(now) { + if expire_time > event_ts { + // return how much time it's expired + return Ok(Some(expire_time - event_ts)); + } + } + + Ok(None) + } + + /// Get the expire duration of a key, if it's expired by now. + /// + /// Return None if the key is not expired + pub fn get_expire_duration( + &self, + now: Timestamp, + row: &Row, + ) -> Result, EvalError> { + let Some(event_ts) = self.extract_event_ts(row)? else { + return Ok(None); + }; + if let Some(expire_time) = self.compute_expiration_timestamp(now) { if expire_time > event_ts { // return how much time it's expired @@ -102,10 +130,6 @@ impl KeyExpiryManager { } } - self.event_ts_to_key - .entry(event_ts) - .or_default() - .insert(row.clone()); Ok(None) } @@ -189,6 +213,10 @@ impl Arrangement { } } + pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> { + self.expire_state.as_ref() + } + pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) { self.expire_state = Some(expire_state); } @@ -208,8 +236,12 @@ impl Arrangement { for ((key, val), update_ts, diff) in updates { // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(expired_by) = s.update_event_ts(now, &key)? { + if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? { max_expired_by = max_expired_by.max(Some(expired_by)); + debug!( + "Expired key: {:?}, expired by: {:?} with time being now={}", + key, expired_by, now + ); continue; } } @@ -335,7 +367,9 @@ impl Arrangement { for (key, updates) in batch { // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(expired_by) = s.update_event_ts(now, &key)? { + if let Some(expired_by) = + s.get_expire_duration_and_update_event_ts(now, &key)? + { max_expired_by = max_expired_by.max(Some(expired_by)); continue; } @@ -540,6 +574,10 @@ impl ArrangeHandler { pub fn set_full_arrangement(&self, full: bool) { self.write().full_arrangement = full; } + + pub fn is_full_arrangement(&self) -> bool { + self.read().full_arrangement + } } #[cfg(test)]