From 2b912d93fbe5a83cbc6ebb78ad52a8dd4b5cb78b Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 15 Jul 2024 17:20:04 +0800 Subject: [PATCH] feat: flow perf&fix df func call (#4347) * feat: flow perf&fix df func call feat: make source sender `send` non-blocking feat: better control of flow worker freq feat: support transform interval fix: const folding df func args&tests tests: update cast const fold chore: adjust flow work's freq refactor: batch split feat: adaptive run freq flow worker&check for errors chore: better debug log * refactor: per review * chore: per zc's review * chore: per bot review * chore: remove some `TODO` completed * docs: add comments for a test --- src/flow/src/adapter.rs | 67 ++++--- src/flow/src/adapter/node_context.rs | 61 +++--- src/flow/src/compute/render/map.rs | 10 + src/flow/src/compute/render/reduce.rs | 41 +++- src/flow/src/compute/render/src_sink.rs | 8 +- src/flow/src/expr/linear.rs | 4 + src/flow/src/repr.rs | 2 + src/flow/src/transform/expr.rs | 135 +++++++++---- src/flow/src/transform/literal.rs | 187 +++++++++++++++++- src/operator/src/insert.rs | 1 - .../cases/standalone/common/flow/basic.result | 61 ++---- tests/cases/standalone/common/flow/basic.sql | 30 +-- .../standalone/common/flow/df_func.result | 124 ++++++++++++ .../cases/standalone/common/flow/df_func.sql | 67 +++++++ 14 files changed, 632 insertions(+), 166 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index e22894c19d..80491a6861 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -18,7 +18,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use std::time::{Instant, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; use common_config::Configurable; @@ -51,7 +51,7 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; use crate::expr::GlobalId; -use crate::repr::{self, DiffRow, Row}; +use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; use crate::transform::sql_to_flow_plan; mod flownode_impl; @@ -67,7 +67,7 @@ mod table_source; use crate::error::Error; use crate::FrontendInvoker; -// TODO(discord9): replace this with `GREPTIME_TIMESTAMP` before v0.9 +// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; pub const UPDATE_AT_TS_COL: &str = "update_at"; @@ -212,8 +212,6 @@ pub fn diff_row_to_request(rows: Vec) -> Vec { /// This impl block contains methods to send writeback requests to frontend impl FlowWorkerManager { - /// TODO(discord9): merge all same type of diff row into one requests - /// /// Return the number of requests it made pub async fn send_writeback_requests(&self) -> Result { let all_reqs = self.generate_writeback_request().await; @@ -464,7 +462,6 @@ impl FlowWorkerManager { shutdown: Option>, ) -> JoinHandle<()> { info!("Starting flownode manager's background task"); - // TODO(discord9): add heartbeat tasks here common_runtime::spawn_bg(async move { self.run(shutdown).await; }) @@ -484,21 +481,31 @@ impl FlowWorkerManager { } } + async fn get_buf_size(&self) -> usize { + self.node_context.read().await.get_send_buf_size().await + } + /// Trigger dataflow running, and then send writeback request to the source sender /// /// note that this method didn't handle input mirror request, as this should be handled by grpc server pub async fn run(&self, mut shutdown: Option>) { debug!("Starting to run"); + let default_interval = Duration::from_secs(1); + let mut avg_spd = 0; // rows/sec + let mut since_last_run = tokio::time::Instant::now(); loop { // TODO(discord9): only run when new inputs arrive or scheduled to - if let Err(err) = self.run_available(true).await { + let row_cnt = self.run_available(true).await.unwrap_or_else(|err| { common_telemetry::error!(err;"Run available errors"); - } - // TODO(discord9): error handling + 0 + }); + if let Err(err) = self.send_writeback_requests().await { common_telemetry::error!(err;"Send writeback request errors"); }; self.log_all_errors().await; + + // determine if need to shutdown match &shutdown.as_mut().map(|s| s.try_recv()) { Some(Ok(())) => { info!("Shutdown flow's main loop"); @@ -515,7 +522,25 @@ impl FlowWorkerManager { } None => (), } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf + // before trigger a run of flow's worker + // (plus one for prevent div by zero) + let wait_for = since_last_run.elapsed(); + + let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize; + // rapid increase, slow decay + avg_spd = if cur_spd > avg_spd { + cur_spd + } else { + (9 * avg_spd + cur_spd) / 10 + }; + debug!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd); + let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms + let new_wait = Duration::from_millis(new_wait as u64).min(default_interval); + debug!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt); + since_last_run = tokio::time::Instant::now(); + tokio::time::sleep(new_wait).await; } // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent: // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter @@ -528,8 +553,10 @@ impl FlowWorkerManager { /// /// set `blocking` to true to wait until lock is acquired /// and false to return immediately if lock is not acquired + /// return numbers of rows send to worker /// TODO(discord9): add flag for subgraph that have input since last run - pub async fn run_available(&self, blocking: bool) -> Result<(), Error> { + pub async fn run_available(&self, blocking: bool) -> Result { + let mut row_cnt = 0; loop { let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { @@ -539,35 +566,33 @@ impl FlowWorkerManager { } else if let Ok(worker) = worker.try_lock() { worker.run_available(now).await?; } else { - return Ok(()); + return Ok(row_cnt); } } - // first check how many inputs were sent + // check row send and rows remain in send buf let (flush_res, buf_len) = if blocking { let ctx = self.node_context.read().await; (ctx.flush_all_sender().await, ctx.get_send_buf_size().await) } else { match self.node_context.try_read() { Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await), - Err(_) => return Ok(()), + Err(_) => return Ok(row_cnt), } }; match flush_res { - Ok(_) => (), + Ok(r) => row_cnt += r, Err(err) => { common_telemetry::error!("Flush send buf errors: {:?}", err); break; } }; - // if no thing in send buf then break - if buf_len == 0 { + // if not enough rows, break + if buf_len < BATCH_SIZE { break; - } else { - debug!("Send buf len = {}", buf_len); } } - Ok(()) + Ok(row_cnt) } /// send write request to related source sender @@ -583,8 +608,6 @@ impl FlowWorkerManager { ); let table_id = region_id.table_id(); self.node_context.read().await.send(table_id, rows).await?; - // TODO(discord9): put it in a background task? - // self.run_available(false).await?; Ok(()) } } diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index e8defc7652..812faa41d9 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -14,7 +14,7 @@ //! Node context, prone to change with every incoming requests -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use common_telemetry::debug; @@ -65,54 +65,64 @@ pub struct FlownodeContext { /// backpressure and adjust dataflow running duration to avoid blocking #[derive(Debug)] pub struct SourceSender { + // TODO(discord9): make it all Vec? sender: broadcast::Sender, - send_buf: RwLock>, + send_buf_tx: mpsc::UnboundedSender>, + send_buf_rx: RwLock>>, } impl Default for SourceSender { fn default() -> Self { + let (send_buf_tx, send_buf_rx) = mpsc::unbounded_channel(); Self { // TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data sender: broadcast::Sender::new(BROADCAST_CAP * 2), - send_buf: Default::default(), + send_buf_tx, + send_buf_rx: RwLock::new(send_buf_rx), } } } impl SourceSender { + /// max number of iterations to try flush send buf + const MAX_ITERATIONS: usize = 16; pub fn get_receiver(&self) -> broadcast::Receiver { self.sender.subscribe() } /// send as many as possible rows from send buf /// until send buf is empty or broadchannel is full - pub async fn try_send_all(&self) -> Result { + pub async fn try_flush(&self) -> Result { let mut row_cnt = 0; - loop { - let mut send_buf = self.send_buf.write().await; + let mut iterations = 0; + while iterations < Self::MAX_ITERATIONS { + let mut send_buf = self.send_buf_rx.write().await; // if inner sender channel is empty or send buf is empty, there // is nothing to do for now, just break if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() { break; } - if let Some(row) = send_buf.pop_front() { - self.sender - .send(row) - .map_err(|err| { - InternalSnafu { - reason: format!("Failed to send row, error = {:?}", err), - } - .build() - }) - .with_context(|_| EvalSnafu)?; - row_cnt += 1; + if let Some(rows) = send_buf.recv().await { + for row in rows { + self.sender + .send(row) + .map_err(|err| { + InternalSnafu { + reason: format!("Failed to send row, error = {:?}", err), + } + .build() + }) + .with_context(|_| EvalSnafu)?; + row_cnt += 1; + } } + iterations += 1; } if row_cnt > 0 { debug!("Send {} rows", row_cnt); debug!( "Remaining Send buf.len() = {}", - self.send_buf.read().await.len() + self.send_buf_rx.read().await.len() ); } @@ -121,11 +131,14 @@ impl SourceSender { /// return number of rows it actual send(including what's in the buffer) pub async fn send_rows(&self, rows: Vec) -> Result { - self.send_buf.write().await.extend(rows); + self.send_buf_tx.send(rows).map_err(|e| { + crate::error::InternalSnafu { + reason: format!("Failed to send row, error = {:?}", e), + } + .build() + })?; - let row_cnt = self.try_send_all().await?; - - Ok(row_cnt) + Ok(0) } } @@ -150,7 +163,7 @@ impl FlownodeContext { pub async fn flush_all_sender(&self) -> Result { let mut sum = 0; for sender in self.source_sender.values() { - sender.try_send_all().await.inspect(|x| sum += x)?; + sender.try_flush().await.inspect(|x| sum += x)?; } Ok(sum) } @@ -159,7 +172,7 @@ impl FlownodeContext { pub async fn get_send_buf_size(&self) -> usize { let mut sum = 0; for sender in self.source_sender.values() { - sum += sender.send_buf.read().await.len(); + sum += sender.send_buf_rx.read().await.len(); } sum } diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index d2278dc3b3..272be4acc6 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -146,6 +146,16 @@ fn mfp_subgraph( // find all updates that need to be send from arrangement let output_kv = arrange.read().get_updates_in_range(range); + err_collector.run(|| { + snafu::ensure!( + mfp_plan.is_temporal() || output_kv.is_empty(), + crate::expr::error::InternalSnafu { + reason: "Output from future should be empty since temporal filter is not applied" + } + ); + Ok(()) + }); + // the output is expected to be key -> empty val let output = output_kv .into_iter() diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index d44c290d94..5d5761656c 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -187,6 +187,39 @@ fn split_row_to_key_val( } } +/// split a row into key and val by evaluate the key and val plan +fn batch_split_rows_to_key_val( + rows: impl IntoIterator, + key_val_plan: KeyValPlan, + err_collector: ErrCollector, +) -> impl IntoIterator { + let mut row_buf = Row::new(vec![]); + rows.into_iter().filter_map( + move |(mut row, sys_time, diff): DiffRow| -> Option { + err_collector.run(|| { + let len = row.len(); + if let Some(key) = key_val_plan + .key_plan + .evaluate_into(&mut row.inner, &mut row_buf)? + { + // reuse the row as buffer + row.inner.resize(len, Value::Null); + // val_plan is not supported to carry any filter predicate, + let val = key_val_plan + .val_plan + .evaluate_into(&mut row.inner, &mut row_buf)? + .context(InternalSnafu { + reason: "val_plan should not contain any filter predicate", + })?; + Ok(Some(((key, val), sys_time, diff))) + } else { + Ok(None) + } + })? + }, + ) +} + /// reduce subgraph, reduce the input data into a single row /// output is concat from key and val fn reduce_subgraph( @@ -204,13 +237,7 @@ fn reduce_subgraph( send, }: SubgraphArg, ) { - let mut row_buf = Row::empty(); - let key_val = data.into_iter().filter_map(|(row, sys_time, diff)| { - // error is collected and then the row is skipped - err_collector - .run(|| split_row_to_key_val(row, sys_time, diff, key_val_plan, &mut row_buf)) - .flatten() - }); + let key_val = batch_split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone()); // from here for distinct reduce and accum reduce, things are drastically different // for distinct reduce the arrange store the output, // but for accum reduce the arrange store the accum state, and output is diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index fd757852ca..8ee6efb1ee 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -96,12 +96,8 @@ impl<'referred, 'df> Context<'referred, 'df> { } } let all = prev_avail.chain(to_send).collect_vec(); - if !all.is_empty() || !to_arrange.is_empty() { - debug!( - "Rendered Source All send: {} rows, not yet send: {} rows", - all.len(), - to_arrange.len() - ); + if !to_arrange.is_empty() { + debug!("Source Operator buffered {} rows", to_arrange.len()); } err_collector.run(|| arranged.apply_updates(now, to_arrange)); send.give(all); diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 5eaf3ebd35..0a2ea7a141 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -587,6 +587,10 @@ pub struct MfpPlan { } impl MfpPlan { + /// Indicates if the `MfpPlan` contains temporal predicates. That is have outputs that may occur in future. + pub fn is_temporal(&self) -> bool { + !self.lower_bounds.is_empty() || !self.upper_bounds.is_empty() + } /// find `now` in `predicates` and put them into lower/upper temporal bounds for temporal filter to use pub fn create_from(mut mfp: MapFilterProject) -> Result { let mut lower_bounds = Vec::new(); diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index e28689be40..06571146f6 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -56,6 +56,8 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); /// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this pub const BROADCAST_CAP: usize = 65535; +pub const BATCH_SIZE: usize = BROADCAST_CAP / 2; + /// Convert a value that is or can be converted to Datetime to internal timestamp /// /// support types are: `Date`, `DateTime`, `TimeStamp`, `i64` diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index b2784b08bc..0eb1460c49 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use common_error::ext::BoxedError; use common_telemetry::debug; use datafusion_physical_expr::PhysicalExpr; use datatypes::data_type::ConcreteDataType as CDT; @@ -27,20 +28,23 @@ use substrait_proto::proto::function_argument::ArgType; use substrait_proto::proto::Expression; use crate::error::{ - DatafusionSnafu, DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, - PlanSnafu, + DatafusionSnafu, DatatypesSnafu, Error, EvalSnafu, ExternalSnafu, InvalidQuerySnafu, + NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, }; use crate::expr::{ BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc, }; use crate::repr::{ColumnType, RelationDesc, RelationType}; -use crate::transform::literal::{from_substrait_literal, from_substrait_type}; +use crate::transform::literal::{ + from_substrait_literal, from_substrait_type, to_substrait_literal, +}; use crate::transform::{substrait_proto, FunctionExtensions}; -// TODO(discord9): found proper place for this + +// TODO(discord9): refactor plan to substrait convert of `arrow_cast` function thus remove this function /// ref to `arrow_schema::datatype` for type name -fn typename_to_cdt(name: &str) -> CDT { - match name { +fn typename_to_cdt(name: &str) -> Result { + let ret = match name { "Int8" => CDT::int8_datatype(), "Int16" => CDT::int16_datatype(), "Int32" => CDT::int32_datatype(), @@ -53,10 +57,22 @@ fn typename_to_cdt(name: &str) -> CDT { "Float64" => CDT::float64_datatype(), "Boolean" => CDT::boolean_datatype(), "String" => CDT::string_datatype(), - "Date" => CDT::date_datatype(), + "Date" | "Date32" | "Date64" => CDT::date_datatype(), "Timestamp" => CDT::timestamp_second_datatype(), - _ => CDT::null_datatype(), - } + "Timestamp(Second, None)" => CDT::timestamp_second_datatype(), + "Timestamp(Millisecond, None)" => CDT::timestamp_millisecond_datatype(), + "Timestamp(Microsecond, None)" => CDT::timestamp_microsecond_datatype(), + "Timestamp(Nanosecond, None)" => CDT::timestamp_nanosecond_datatype(), + "Time32(Second)" | "Time64(Second)" => CDT::time_second_datatype(), + "Time32(Millisecond)" | "Time64(Millisecond)" => CDT::time_millisecond_datatype(), + "Time32(Microsecond)" | "Time64(Microsecond)" => CDT::time_microsecond_datatype(), + "Time32(Nanosecond)" | "Time64(Nanosecond)" => CDT::time_nanosecond_datatype(), + _ => NotImplementedSnafu { + reason: format!("Unrecognized typename: {}", name), + } + .fail()?, + }; + Ok(ret) } /// Convert [`ScalarFunction`] to corresponding Datafusion's [`PhysicalExpr`] @@ -138,29 +154,72 @@ fn is_proto_literal(arg: &substrait_proto::proto::FunctionArgument) -> bool { ) } +fn build_proto_lit( + lit: substrait_proto::proto::expression::Literal, +) -> substrait_proto::proto::FunctionArgument { + use substrait_proto::proto; + proto::FunctionArgument { + arg_type: Some(ArgType::Value(Expression { + rex_type: Some(proto::expression::RexType::Literal(lit)), + })), + } +} + /// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion /// /// specially, if a argument is a literal, the replacement will not happen -fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction { +fn rewrite_scalar_function( + f: &ScalarFunction, + arg_typed_exprs: &[TypedExpr], +) -> Result { let mut f_rewrite = f.clone(); for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() { - if !is_proto_literal(raw_expr) { - *raw_expr = proto_col(idx) + // only replace it with col(idx) if it is not literal + // will try best to determine if it is literal, i.e. for function like `cast()` will try + // in both world to understand if it results in a literal + match ( + is_proto_literal(raw_expr), + arg_typed_exprs[idx].expr.is_literal(), + ) { + (false, false) => *raw_expr = proto_col(idx), + (true, _) => (), + (false, true) => { + if let ScalarExpr::Literal(val, ty) = &arg_typed_exprs[idx].expr { + let df_val = val + .try_to_scalar_value(ty) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let lit_sub = to_substrait_literal(&df_val)?; + // put const-folded literal back to df to simplify stuff + *raw_expr = build_proto_lit(lit_sub); + } else { + UnexpectedSnafu { + reason: format!( + "Expect value to be literal, but found {:?}", + arg_typed_exprs[idx].expr + ), + } + .fail()? + } + } } } - f_rewrite + Ok(f_rewrite) } impl TypedExpr { pub async fn from_substrait_to_datafusion_scalar_func( f: &ScalarFunction, - arg_exprs_typed: Vec, + arg_typed_exprs: Vec, extensions: &FunctionExtensions, ) -> Result { - let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = - arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip(); + let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = arg_typed_exprs + .clone() + .into_iter() + .map(|e| (e.expr, e.typ)) + .unzip(); debug!("Before rewrite: {:?}", f); - let f_rewrite = rewrite_scalar_function(f); + let f_rewrite = rewrite_scalar_function(f, &arg_typed_exprs)?; debug!("After rewrite: {:?}", f_rewrite); let input_schema = RelationType::new(arg_types).into_unnamed(); let raw_fn = @@ -240,12 +299,21 @@ impl TypedExpr { .with_context(|| InvalidQuerySnafu { reason: "array_cast's second argument must be a literal string", })?; - let cast_to = typename_to_cdt(&cast_to); - let func = UnaryFunc::Cast(cast_to); + let cast_to = typename_to_cdt(&cast_to)?; + let func = UnaryFunc::Cast(cast_to.clone()); let arg = arg_exprs[0].clone(); - let ret_type = ColumnType::new_nullable(func.signature().output.clone()); + // constant folding here since some datafusion function require it for constant arg(i.e. `DATE_BIN`) + if arg.is_literal() { + let res = func.eval(&[], &arg).context(EvalSnafu)?; + Ok(TypedExpr::new( + ScalarExpr::Literal(res, cast_to.clone()), + ColumnType::new_nullable(cast_to), + )) + } else { + let ret_type = ColumnType::new_nullable(func.signature().output.clone()); - Ok(TypedExpr::new(arg.call_unary(func), ret_type)) + Ok(TypedExpr::new(arg.call_unary(func), ret_type)) + } } 2 if BinaryFunc::is_valid_func_name(fn_name) => { let (func, signature) = @@ -602,28 +670,9 @@ mod test { let expected = TypedPlan { schema: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)]) .into_unnamed(), - plan: Plan::Mfp { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - } - .with_types( - RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )]) - .into_named(vec![Some("number".to_string())]), - ), - ), - mfp: MapFilterProject::new(1) - .map(vec![ScalarExpr::Literal( - Value::Int64(1), - CDT::int64_datatype(), - ) - .call_unary(UnaryFunc::Cast(CDT::int16_datatype()))]) - .unwrap() - .project(vec![1]) - .unwrap(), + plan: Plan::Constant { + // cast of literal is constant folded + rows: vec![(repr::Row::new(vec![Value::from(1i16)]), i64::MIN, 1)], }, }; assert_eq!(flow_plan.unwrap(), expected); diff --git a/src/flow/src/transform/literal.rs b/src/flow/src/transform/literal.rs index bd0f041dd8..255ceadb54 100644 --- a/src/flow/src/transform/literal.rs +++ b/src/flow/src/transform/literal.rs @@ -12,23 +12,93 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::array::TryFromSliceError; + +use bytes::Bytes; use common_decimal::Decimal128; use common_time::{Date, Timestamp}; +use datafusion_common::ScalarValue; use datatypes::data_type::ConcreteDataType as CDT; use datatypes::value::Value; +use num_traits::FromBytes; +use snafu::ensure; +use substrait::substrait_proto_df::proto::expression::literal::user_defined::Val; +use substrait::substrait_proto_df::proto::expression::literal::UserDefined; use substrait::variation_const::{ DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF, + INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_DAY_TIME_TYPE_URL, INTERVAL_MONTH_DAY_NANO_TYPE_REF, + INTERVAL_MONTH_DAY_NANO_TYPE_URL, INTERVAL_YEAR_MONTH_TYPE_REF, INTERVAL_YEAR_MONTH_TYPE_URL, TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF, TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF, UNSIGNED_INTEGER_TYPE_VARIATION_REF, }; use substrait_proto::proto::expression::literal::LiteralType; use substrait_proto::proto::expression::Literal; -use substrait_proto::proto::r#type::Kind; +use substrait_proto::proto::r#type::{self, parameter, Kind, Parameter}; +use substrait_proto::proto::Type; -use crate::error::{Error, NotImplementedSnafu, PlanSnafu}; +use crate::error::{Error, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu}; use crate::transform::substrait_proto; +/// TODO(discord9): this is copy from datafusion-substrait since the original function is not public, will be replace once is exported +pub(crate) fn to_substrait_literal(value: &ScalarValue) -> Result { + if value.is_null() { + return not_impl_err!("Unsupported literal: {value:?}"); + } + let (literal_type, type_variation_reference) = match value { + ScalarValue::Boolean(Some(b)) => (LiteralType::Boolean(*b), DEFAULT_TYPE_VARIATION_REF), + ScalarValue::Int8(Some(n)) => (LiteralType::I8(*n as i32), DEFAULT_TYPE_VARIATION_REF), + ScalarValue::UInt8(Some(n)) => ( + LiteralType::I8(*n as i32), + UNSIGNED_INTEGER_TYPE_VARIATION_REF, + ), + ScalarValue::Int16(Some(n)) => (LiteralType::I16(*n as i32), DEFAULT_TYPE_VARIATION_REF), + ScalarValue::UInt16(Some(n)) => ( + LiteralType::I16(*n as i32), + UNSIGNED_INTEGER_TYPE_VARIATION_REF, + ), + ScalarValue::Int32(Some(n)) => (LiteralType::I32(*n), DEFAULT_TYPE_VARIATION_REF), + ScalarValue::UInt32(Some(n)) => ( + LiteralType::I32(*n as i32), + UNSIGNED_INTEGER_TYPE_VARIATION_REF, + ), + ScalarValue::Int64(Some(n)) => (LiteralType::I64(*n), DEFAULT_TYPE_VARIATION_REF), + ScalarValue::UInt64(Some(n)) => ( + LiteralType::I64(*n as i64), + UNSIGNED_INTEGER_TYPE_VARIATION_REF, + ), + ScalarValue::Float32(Some(f)) => (LiteralType::Fp32(*f), DEFAULT_TYPE_VARIATION_REF), + ScalarValue::Float64(Some(f)) => (LiteralType::Fp64(*f), DEFAULT_TYPE_VARIATION_REF), + ScalarValue::TimestampSecond(Some(t), _) => ( + LiteralType::Timestamp(*t), + TIMESTAMP_SECOND_TYPE_VARIATION_REF, + ), + ScalarValue::TimestampMillisecond(Some(t), _) => ( + LiteralType::Timestamp(*t), + TIMESTAMP_MILLI_TYPE_VARIATION_REF, + ), + ScalarValue::TimestampMicrosecond(Some(t), _) => ( + LiteralType::Timestamp(*t), + TIMESTAMP_MICRO_TYPE_VARIATION_REF, + ), + ScalarValue::TimestampNanosecond(Some(t), _) => ( + LiteralType::Timestamp(*t), + TIMESTAMP_NANO_TYPE_VARIATION_REF, + ), + ScalarValue::Date32(Some(d)) => (LiteralType::Date(*d), DATE_32_TYPE_VARIATION_REF), + _ => ( + not_impl_err!("Unsupported literal: {value:?}")?, + DEFAULT_TYPE_VARIATION_REF, + ), + }; + + Ok(Literal { + nullable: false, + type_variation_reference, + literal_type: Some(literal_type), + }) +} + /// Convert a Substrait literal into a Value and its ConcreteDataType (So that we can know type even if the value is null) pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Error> { let scalar_value = match &lit.literal_type { @@ -105,11 +175,122 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro ) } Some(LiteralType::Null(ntype)) => (Value::Null, from_substrait_type(ntype)?), - _ => not_impl_err!("unsupported literal_type")?, + Some(LiteralType::IntervalDayToSecond(interval)) => { + let (days, seconds, microseconds) = + (interval.days, interval.seconds, interval.microseconds); + let millis = microseconds / 1000 + seconds * 1000; + let value_interval = common_time::Interval::from_day_time(days, millis); + ( + Value::Interval(value_interval), + CDT::interval_day_time_datatype(), + ) + } + Some(LiteralType::IntervalYearToMonth(interval)) => ( + Value::Interval(common_time::Interval::from_year_month( + interval.years * 12 + interval.months, + )), + CDT::interval_year_month_datatype(), + ), + Some(LiteralType::UserDefined(user_defined)) => { + from_substrait_user_defined_type(user_defined)? + } + _ => not_impl_err!("unsupported literal_type: {:?}", &lit.literal_type)?, }; Ok(scalar_value) } +fn from_bytes(i: &Bytes) -> Result +where + for<'a> &'a ::Bytes: + std::convert::TryFrom<&'a [u8], Error = TryFromSliceError>, +{ + let (int_bytes, _rest) = i.split_at(std::mem::size_of::()); + let i = T::from_le_bytes(int_bytes.try_into().map_err(|e| { + UnexpectedSnafu { + reason: format!( + "Expect slice to be {} bytes, found {} bytes, error={:?}", + std::mem::size_of::(), + int_bytes.len(), + e + ), + } + .build() + })?); + Ok(i) +} + +fn from_substrait_user_defined_type(user_defined: &UserDefined) -> Result<(Value, CDT), Error> { + if let UserDefined { + type_reference, + type_parameters: _, + val: Some(Val::Value(val)), + } = user_defined + { + // see https://github.com/apache/datafusion/blob/146b679aa19c7749cc73d0c27440419d6498142b/datafusion/substrait/src/logical_plan/producer.rs#L1957 + // for interval type's transform to substrait + let ret = match *type_reference { + INTERVAL_YEAR_MONTH_TYPE_REF => { + ensure!( + val.type_url == INTERVAL_YEAR_MONTH_TYPE_URL, + UnexpectedSnafu { + reason: format!( + "Expect {}, found {} in type_url", + INTERVAL_YEAR_MONTH_TYPE_URL, val.type_url + ) + } + ); + let i: i32 = from_bytes(&val.value)?; + let value_interval = common_time::Interval::from_year_month(i); + ( + Value::Interval(value_interval), + CDT::interval_year_month_datatype(), + ) + } + INTERVAL_MONTH_DAY_NANO_TYPE_REF => { + ensure!( + val.type_url == INTERVAL_MONTH_DAY_NANO_TYPE_URL, + UnexpectedSnafu { + reason: format!( + "Expect {}, found {} in type_url", + INTERVAL_MONTH_DAY_NANO_TYPE_URL, val.type_url + ) + } + ); + let i: i128 = from_bytes(&val.value)?; + let (months, days, nsecs) = ((i >> 96) as i32, (i >> 64) as i32, i as i64); + let value_interval = + common_time::Interval::from_month_day_nano(months, days, nsecs); + ( + Value::Interval(value_interval), + CDT::interval_month_day_nano_datatype(), + ) + } + INTERVAL_DAY_TIME_TYPE_REF => { + ensure!( + val.type_url == INTERVAL_DAY_TIME_TYPE_URL, + UnexpectedSnafu { + reason: format!( + "Expect {}, found {} in type_url", + INTERVAL_DAY_TIME_TYPE_URL, val.type_url + ) + } + ); + let i: i64 = from_bytes(&val.value)?; + let (days, millis) = ((i >> 32) as i32, i as i32); + let value_interval = common_time::Interval::from_day_time(days, millis); + ( + Value::Interval(value_interval), + CDT::interval_day_time_datatype(), + ) + } + _ => return not_impl_err!("unsupported user defined type: {:?}", user_defined)?, + }; + Ok(ret) + } else { + not_impl_err!("Expect val to be Some(...)") + } +} + /// convert a Substrait type into a ConcreteDataType pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result { if let Some(kind) = &null_type.kind { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 1ea03f2f1e..38e79de6c9 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -358,7 +358,6 @@ impl Inserter { // already know this is not source table Some(None) => continue, _ => { - // TODO(discord9): query metasrv for actual peer address let peers = self .table_flownode_set_cache .get(table_id) diff --git a/tests/cases/standalone/common/flow/basic.result b/tests/cases/standalone/common/flow/basic.result index 1d480e2f22..9b8273655e 100644 --- a/tests/cases/standalone/common/flow/basic.result +++ b/tests/cases/standalone/common/flow/basic.result @@ -59,6 +59,7 @@ DROP TABLE out_num_cnt; Affected Rows: 0 +-- test interprete interval CREATE TABLE numbers_input ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, @@ -68,55 +69,35 @@ CREATE TABLE numbers_input ( Affected Rows: 0 -CREATE FLOW test_numbers -SINK TO out_num_cnt -AS -SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts); +create table out_num_cnt ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); Affected Rows: 0 -INSERT INTO numbers_input -VALUES - (20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - -Affected Rows: 2 - --- SQLNESS SLEEP 3s -SELECT col_0, col_1 FROM out_num_cnt; - -+---------------------+-------+ -| col_0 | col_1 | -+---------------------+-------+ -| 2021-07-01T00:00:00 | 42 | -+---------------------+-------+ - -INSERT INTO numbers_input -VALUES - (23,"2021-07-01 00:00:01.000"), - (24,"2021-07-01 00:00:01.500"); - -Affected Rows: 2 - --- SQLNESS SLEEP 2s -SELECT col_0, col_1 FROM out_num_cnt; - -+---------------------+-------+ -| col_0 | col_1 | -+---------------------+-------+ -| 2021-07-01T00:00:00 | 42 | -| 2021-07-01T00:00:01 | 47 | -+---------------------+-------+ - -DROP FLOW test_numbers; +CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input where number > 10; Affected Rows: 0 -DROP TABLE numbers_input; +SHOW CREATE FLOW filter_numbers; + ++----------------+----------------------------------------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++----------------+----------------------------------------------------------------------------------------------------------------------------------------+ +| filter_numbers | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers | +| | SINK TO out_num_cnt | +| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input WHERE number > 10 | ++----------------+----------------------------------------------------------------------------------------------------------------------------------------+ + +drop flow filter_numbers; Affected Rows: 0 -DROP TABLE out_num_cnt; +drop table out_num_cnt; + +Affected Rows: 0 + +drop table numbers_input; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/basic.sql b/tests/cases/standalone/common/flow/basic.sql index 8c0c5d038e..d7beba69ce 100644 --- a/tests/cases/standalone/common/flow/basic.sql +++ b/tests/cases/standalone/common/flow/basic.sql @@ -30,34 +30,24 @@ DROP FLOW test_numbers; DROP TABLE numbers_input; DROP TABLE out_num_cnt; +-- test interprete interval + CREATE TABLE numbers_input ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); +create table out_num_cnt ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); -CREATE FLOW test_numbers -SINK TO out_num_cnt -AS -SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts); +CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input where number > 10; -INSERT INTO numbers_input -VALUES - (20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); +SHOW CREATE FLOW filter_numbers; --- SQLNESS SLEEP 3s -SELECT col_0, col_1 FROM out_num_cnt; +drop flow filter_numbers; -INSERT INTO numbers_input -VALUES - (23,"2021-07-01 00:00:01.000"), - (24,"2021-07-01 00:00:01.500"); +drop table out_num_cnt; --- SQLNESS SLEEP 2s -SELECT col_0, col_1 FROM out_num_cnt; - -DROP FLOW test_numbers; -DROP TABLE numbers_input; -DROP TABLE out_num_cnt; +drop table numbers_input; diff --git a/tests/cases/standalone/common/flow/df_func.result b/tests/cases/standalone/common/flow/df_func.result index 7ab393eeb1..6c08f7854e 100644 --- a/tests/cases/standalone/common/flow/df_func.result +++ b/tests/cases/standalone/common/flow/df_func.result @@ -124,3 +124,127 @@ DROP TABLE out_num_cnt_df_func; Affected Rows: 0 +-- test date_bin +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); + +Affected Rows: 0 + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 3s +SELECT col_0, col_1 FROM out_num_cnt; + ++-------+---------------------+ +| col_0 | col_1 | ++-------+---------------------+ +| 2 | 2021-07-01T00:00:00 | ++-------+---------------------+ + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, col_1 FROM out_num_cnt; + ++-------+---------------------+ +| col_0 | col_1 | ++-------+---------------------+ +| 2 | 2021-07-01T00:00:00 | +| 1 | 2021-07-01T00:00:01 | ++-------+---------------------+ + +DROP FLOW test_numbers; + +Affected Rows: 0 + +DROP TABLE numbers_input; + +Affected Rows: 0 + +DROP TABLE out_num_cnt; + +Affected Rows: 0 + +-- test date_trunc +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts); + +Affected Rows: 0 + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 3s +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | ++---------------------+-------+ + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | +| 2021-07-01T00:00:01 | 47 | ++---------------------+-------+ + +DROP FLOW test_numbers; + +Affected Rows: 0 + +DROP TABLE numbers_input; + +Affected Rows: 0 + +DROP TABLE out_num_cnt; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/df_func.sql b/tests/cases/standalone/common/flow/df_func.sql index b9a22cb9da..e04d95e477 100644 --- a/tests/cases/standalone/common/flow/df_func.sql +++ b/tests/cases/standalone/common/flow/df_func.sql @@ -65,3 +65,70 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; DROP FLOW test_numbers_df_func; DROP TABLE numbers_input_df_func; DROP TABLE out_num_cnt_df_func; + +-- test date_bin +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS SLEEP 3s +SELECT col_0, col_1 FROM out_num_cnt; + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, col_1 FROM out_num_cnt; + +DROP FLOW test_numbers; +DROP TABLE numbers_input; +DROP TABLE out_num_cnt; + + +-- test date_trunc +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts); + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS SLEEP 3s +SELECT col_0, col_1 FROM out_num_cnt; + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, col_1 FROM out_num_cnt; + +DROP FLOW test_numbers; +DROP TABLE numbers_input; +DROP TABLE out_num_cnt;