diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 31f4fadf03..80c3a9ff78 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -265,7 +265,7 @@ impl FlownodeManager { let ctx = Arc::new(QueryContext::with(&catalog, &schema)); // TODO(discord9): instead of auto build table from request schema, actually build table // before `create flow` to be able to assign pk and ts etc. - let (primary_keys, schema, is_auto_create) = if let Some(table_id) = self + let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self .table_info_source .get_table_id_from_name(&table_name) .await? @@ -317,7 +317,12 @@ impl FlownodeManager { .map(|v| { v.column_indices .iter() - .map(|i| format!("Col_{i}")) + .map(|i| { + schema + .get_name(*i) + .clone() + .unwrap_or_else(|| format!("Col_{i}")) + }) .collect_vec() }) .unwrap_or_default(); @@ -326,15 +331,8 @@ impl FlownodeManager { ConcreteDataType::timestamp_millisecond_datatype(), true, ); - // TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one - let ts_col = ColumnSchema::new( - AUTO_CREATED_PLACEHOLDER_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true); - let wout_ts = schema + let original_schema = schema .typ() .column_types .clone() @@ -345,16 +343,33 @@ impl FlownodeManager { .names .get(idx) .cloned() + .flatten() .unwrap_or(format!("Col_{}", idx)); - ColumnSchema::new(name, typ.scalar_type, typ.nullable) + let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); + if schema.typ().time_index == Some(idx) { + ret.with_time_index(true) + } else { + ret + } }) .collect_vec(); - let mut with_ts = wout_ts.clone(); - with_ts.push(update_at); - with_ts.push(ts_col); + let mut with_auto_added_col = original_schema.clone(); + with_auto_added_col.push(update_at); - (primary_keys, with_ts, true) + // if no time index, add one as placeholder + let no_time_index = schema.typ().time_index.is_none(); + if no_time_index { + let ts_col = ColumnSchema::new( + AUTO_CREATED_PLACEHOLDER_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + with_auto_added_col.push(ts_col); + } + + (primary_keys, with_auto_added_col, no_time_index) }; let schema_len = schema.len(); let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; @@ -377,7 +392,7 @@ impl FlownodeManager { now, ))]); // ts col, if auto create - if is_auto_create { + if is_ts_placeholder { ensure!( row.len() == schema_len - 1, InternalSnafu { @@ -508,12 +523,13 @@ impl FlownodeManager { debug!("Starting to run"); loop { // TODO(discord9): only run when new inputs arrive or scheduled to - debug!("call run_available in run every second"); - self.run_available(true).await.unwrap(); - debug!("call send_writeback_requests in run every second"); + if let Err(err) = self.run_available(true).await { + common_telemetry::error!(err;"Run available errors"); + } // TODO(discord9): error handling - self.send_writeback_requests().await.unwrap(); - debug!("call log_all_errors in run every second"); + if let Err(err) = self.send_writeback_requests().await { + common_telemetry::error!(err;"Send writeback request errors"); + }; self.log_all_errors().await; tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -596,6 +612,7 @@ impl FlownodeManager { break; } } + self.node_context.write().await.remove_flow(flow_id); Ok(()) } @@ -642,8 +659,9 @@ impl FlownodeManager { node_ctx.query_context = query_ctx.map(Arc::new); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; + debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); - node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; + node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 11b2f6d04f..e337e96a0a 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -26,6 +26,7 @@ use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; +use crate::adapter::error::InternalSnafu; use crate::adapter::FlownodeManager; use crate::repr::{self, DiffRow}; @@ -126,6 +127,16 @@ impl Flownode for FlownodeManager { .context(UnexpectedSnafu { err_msg: format!("Table not found: {}", table_id), })?; + let table_col_names = table_col_names + .iter().enumerate() + .map(|(idx,name)| match name { + Some(name) => Ok(name.clone()), + None => InternalSnafu { + reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"), + } + .fail().map_err(BoxedError::new).context(ExternalSnafu), + }) + .collect::>>()?; let name_to_col = HashMap::<_, _>::from_iter( insert_schema .iter() diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index fdcc150697..dcbfb65719 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; -use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP}; +use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP}; /// A context that holds the information of the dataflow #[derive(Default, Debug)] @@ -36,6 +36,7 @@ pub struct FlownodeContext { pub source_to_tasks: BTreeMap>, /// mapping from task to sink table, useful for sending data back to the client when a task is done running pub flow_to_sink: BTreeMap, + pub sink_to_flow: BTreeMap, /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender /// /// Note that we are getting insert requests with table id, so we should use table id as the key @@ -184,7 +185,21 @@ impl FlownodeContext { } self.add_sink_receiver(sink_table_name.clone()); - self.flow_to_sink.insert(task_id, sink_table_name); + self.flow_to_sink.insert(task_id, sink_table_name.clone()); + self.sink_to_flow.insert(sink_table_name, task_id); + } + + /// remove flow from worker context + pub fn remove_flow(&mut self, task_id: FlowId) { + if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) { + self.sink_to_flow.remove(&sink_table_name); + } + for (source_table_id, tasks) in self.source_to_tasks.iter_mut() { + tasks.remove(&task_id); + if tasks.is_empty() { + self.source_sender.remove(source_table_id); + } + } } /// try add source sender, if already exist, do nothing @@ -307,14 +322,14 @@ impl FlownodeContext { pub fn assign_table_schema( &mut self, table_name: &TableName, - schema: RelationType, + schema: RelationDesc, ) -> Result<(), Error> { let gid = self .table_repr .get_by_name(table_name) .map(|(_, gid)| gid) .unwrap(); - self.schema.insert(gid, schema.into_unnamed()); + self.schema.insert(gid, schema); Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 53932cd692..821f960198 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -132,7 +132,7 @@ impl TableSource { nullable: col.is_nullable(), scalar_type: col.data_type, }, - col.name, + Some(col.name), ) }) .unzip(); diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 0f948c48b5..076e681bed 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -514,7 +514,7 @@ mod test { plan: Plan::Get { id: Id::Global(GlobalId::User(1)), }, - typ: RelationType::new(vec![]), + schema: RelationType::new(vec![]).into_unnamed(), }, ); let create_reqs = Request::Create { diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index 44b0253598..4a4704d28e 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -111,7 +111,7 @@ impl<'referred, 'df> Context<'referred, 'df> { input, key_val_plan, reduce_plan, - } => self.render_reduce(input, key_val_plan, reduce_plan, plan.typ), + } => self.render_reduce(input, key_val_plan, reduce_plan, plan.schema.typ), Plan::Join { .. } => NotImplementedSnafu { reason: "Join is still WIP", } diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 91cb37c6cf..2a6b49cb7c 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -252,7 +252,7 @@ mod test { .unwrap(); let bundle = ctx - .render_mfp(Box::new(input_plan.with_types(typ)), mfp) + .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp) .unwrap(); let output = get_output_handle(&mut ctx, bundle); // drop ctx here to simulate actual process of compile first, run later scenario @@ -312,7 +312,7 @@ mod test { )]) .unwrap(); let bundle = ctx - .render_mfp(Box::new(input_plan.with_types(typ)), mfp) + .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp) .unwrap(); let output = get_output_handle(&mut ctx, bundle); @@ -348,7 +348,7 @@ mod test { )]) .unwrap(); let bundle = ctx - .render_mfp(Box::new(input_plan.with_types(typ)), mfp) + .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp) .unwrap(); let output = get_output_handle(&mut ctx, bundle); diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index fa29a63242..b55ed58f90 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -813,11 +813,12 @@ mod test { distinct: false, }; let expected = TypedPlan { - typ: RelationType::new(vec![ + schema: RelationType::new(vec![ ColumnType::new(CDT::uint64_datatype(), true), // sum(number) ColumnType::new(CDT::datetime_datatype(), false), // window start ColumnType::new(CDT::datetime_datatype(), false), // window end - ]), + ]) + .into_unnamed(), // TODO(discord9): mfp indirectly ref to key columns /* .with_key(vec![1]) @@ -829,10 +830,13 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(1)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_unnamed(), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(2) @@ -880,7 +884,8 @@ mod test { ColumnType::new(CDT::uint64_datatype(), true), //sum(number) ]) .with_key(vec![1]) - .with_time_index(Some(0)), + .with_time_index(Some(0)) + .into_unnamed(), ), ), mfp: MapFilterProject::new(3) @@ -977,7 +982,8 @@ mod test { els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), }; let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]) + .into_unnamed(), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -985,9 +991,13 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(1)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::int64_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::int64_datatype(), + false, + )]) + .into_unnamed(), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(1) @@ -1008,10 +1018,13 @@ mod test { distinct_aggrs: vec![], }), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), true), - ColumnType::new(ConcreteDataType::int64_datatype(), true), - ])), + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), true), + ColumnType::new(ConcreteDataType::int64_datatype(), true), + ]) + .into_unnamed(), + ), ), mfp: MapFilterProject::new(2) .map(vec![ @@ -1068,7 +1081,7 @@ mod test { let reduce_plan = ReducePlan::Distinct; let bundle = ctx .render_reduce( - Box::new(input_plan.with_types(typ)), + Box::new(input_plan.with_types(typ.into_unnamed())), key_val_plan, reduce_plan, RelationType::empty(), @@ -1143,7 +1156,7 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx .render_reduce( - Box::new(input_plan.with_types(typ)), + Box::new(input_plan.with_types(typ.into_unnamed())), key_val_plan, reduce_plan, RelationType::empty(), @@ -1224,7 +1237,7 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx .render_reduce( - Box::new(input_plan.with_types(typ)), + Box::new(input_plan.with_types(typ.into_unnamed())), key_val_plan, reduce_plan, RelationType::empty(), @@ -1301,7 +1314,7 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx .render_reduce( - Box::new(input_plan.with_types(typ)), + Box::new(input_plan.with_types(typ.into_unnamed())), key_val_plan, reduce_plan, RelationType::empty(), @@ -1393,7 +1406,7 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx .render_reduce( - Box::new(input_plan.with_types(typ)), + Box::new(input_plan.with_types(typ.into_unnamed())), key_val_plan, reduce_plan, RelationType::empty(), diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 96411b6d04..2b9fd5e601 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -62,7 +62,6 @@ impl<'referred, 'df> Context<'referred, 'df> { let arr = arranged.get_updates_in_range(..=now); err_collector.run(|| arranged.compact_to(now)); - debug!("Call source"); let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); let mut to_send = Vec::new(); let mut to_arrange = Vec::new(); diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 2c49e4efbf..dcfed4eb0d 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -321,6 +321,38 @@ impl MapFilterProject { pub fn optimize(&mut self) { // TODO(discord9): optimize } + /// get the mapping of old columns to new columns after the mfp + pub fn get_old_to_new_mapping(&self) -> BTreeMap { + BTreeMap::from_iter( + self.projection + .clone() + .into_iter() + .enumerate() + .map(|(new, old)| { + // `projection` give the new -> old mapping + let mut old = old; + // trace back to the original column + // since there maybe indirect ref to old columns like + // col 2 <- expr=col(2) at pos col 4 <- expr=col(4) at pos col 6 + // ideally such indirect ref should be optimize away + // TODO(discord9): refactor this after impl `optimize()` + while let Some(ScalarExpr::Column(prev)) = if old >= self.input_arity { + // get the correspond expr if not a original column + self.expressions.get(old - self.input_arity) + } else { + // we don't care about non column ref case since only need old to new column mapping + // in which case, the old->new mapping remain the same + None + } { + old = *prev; + if old < self.input_arity { + break; + } + } + (old, new) + }), + ) + } /// Convert the `MapFilterProject` into a staged evaluation plan. /// diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 1e83d13043..6e4b136733 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -30,21 +30,22 @@ use crate::expr::{ }; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; -use crate::repr::{ColumnType, DiffRow, RelationType}; +use crate::repr::{ColumnType, DiffRow, RelationDesc, RelationType}; /// A plan for a dataflow component. But with type to indicate the output type of the relation. #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] pub struct TypedPlan { /// output type of the relation - pub typ: RelationType, + pub schema: RelationDesc, /// The untyped plan. pub plan: Plan, } impl TypedPlan { /// directly apply a mfp to the plan - pub fn mfp(self, mfp: MapFilterProject) -> Result { - let new_type = self.typ.apply_mfp(&mfp)?; + pub fn mfp(self, mfp: SafeMfpPlan) -> Result { + let new_type = self.schema.apply_mfp(&mfp)?; + let mfp = mfp.mfp; let plan = match self.plan { Plan::Mfp { input, @@ -59,14 +60,14 @@ impl TypedPlan { }, }; Ok(TypedPlan { - typ: new_type, + schema: new_type, plan, }) } /// project the plan to the given expressions pub fn projection(self, exprs: Vec) -> Result { - let input_arity = self.typ.column_types.len(); + let input_arity = self.schema.typ.column_types.len(); let output_arity = exprs.len(); let (exprs, _expr_typs): (Vec<_>, Vec<_>) = exprs .into_iter() @@ -74,8 +75,10 @@ impl TypedPlan { .unzip(); let mfp = MapFilterProject::new(input_arity) .map(exprs)? - .project(input_arity..input_arity + output_arity)?; - let out_typ = self.typ.apply_mfp(&mfp)?; + .project(input_arity..input_arity + output_arity)? + .into_safe(); + let out_typ = self.schema.apply_mfp(&mfp)?; + let mfp = mfp.mfp; // special case for mfp to compose when the plan is already mfp let plan = match self.plan { Plan::Mfp { @@ -90,12 +93,15 @@ impl TypedPlan { mfp, }, }; - Ok(TypedPlan { typ: out_typ, plan }) + Ok(TypedPlan { + schema: out_typ, + plan, + }) } /// Add a new filter to the plan, will filter out the records that do not satisfy the filter pub fn filter(self, filter: TypedExpr) -> Result { - let typ = self.typ.clone(); + let typ = self.schema.clone(); let plan = match self.plan { Plan::Mfp { input, @@ -106,10 +112,10 @@ impl TypedPlan { }, _ => Plan::Mfp { input: Box::new(self), - mfp: MapFilterProject::new(typ.column_types.len()).filter(vec![filter.expr])?, + mfp: MapFilterProject::new(typ.typ.column_types.len()).filter(vec![filter.expr])?, }, }; - Ok(TypedPlan { typ, plan }) + Ok(TypedPlan { schema: typ, plan }) } } @@ -227,7 +233,7 @@ impl Plan { } impl Plan { - pub fn with_types(self, typ: RelationType) -> TypedPlan { - TypedPlan { typ, plan: self } + pub fn with_types(self, schema: RelationDesc) -> TypedPlan { + TypedPlan { schema, plan: self } } } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 09e0b88344..8d7fdd9a33 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; use crate::adapter::error::{InvalidQuerySnafu, Result, UnexpectedSnafu}; -use crate::expr::MapFilterProject; +use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr}; /// a set of column indices that are "keys" for the collection. #[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] @@ -111,7 +111,8 @@ impl RelationType { /// then new key=`[1]`, new time index=`[0]` /// /// note that this function will remove empty keys like key=`[]` will be removed - pub fn apply_mfp(&self, mfp: &MapFilterProject) -> Result { + pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result { + let mfp = &mfp.mfp; let mut all_types = self.column_types.clone(); for expr in &mfp.expressions { let expr_typ = expr.typ(&self.column_types)?; @@ -132,13 +133,7 @@ impl RelationType { }) .try_collect()?; - let old_to_new_col = BTreeMap::from_iter( - mfp.projection - .clone() - .into_iter() - .enumerate() - .map(|(new, old)| (old, new)), - ); + let old_to_new_col = mfp.get_old_to_new_mapping(); // since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform let keys = self @@ -264,15 +259,15 @@ impl RelationType { } /// Return relation describe with column names - pub fn into_named(self, names: Vec) -> RelationDesc { + pub fn into_named(self, names: Vec>) -> RelationDesc { RelationDesc { typ: self, names } } /// Return relation describe without column names pub fn into_unnamed(self) -> RelationDesc { RelationDesc { + names: vec![None; self.column_types.len()], typ: self, - names: vec![], } } } @@ -336,10 +331,36 @@ fn return_true() -> bool { /// /// It bundles a [`RelationType`] with the name of each column in the relation. /// Individual column names are optional. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] pub struct RelationDesc { pub typ: RelationType, - pub names: Vec, + pub names: Vec>, +} + +impl RelationDesc { + /// apply mfp, and also project col names for the projected columns + pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result { + // TODO: find a way to deduce name at best effect + let names = { + let mfp = &mfp.mfp; + let mut names = self.names.clone(); + for expr in &mfp.expressions { + if let ScalarExpr::Column(i) = expr { + names.push(self.names.get(*i).cloned().flatten()); + } else { + names.push(None); + } + } + mfp.projection + .iter() + .map(|i| names.get(*i).cloned().flatten()) + .collect_vec() + }; + Ok(Self { + typ: self.typ.apply_mfp(mfp)?, + names, + }) + } } impl RelationDesc { @@ -358,7 +379,7 @@ impl RelationDesc { pub fn try_new(typ: RelationType, names: I) -> Result where I: IntoIterator, - N: Into, + N: Into>, { let names: Vec<_> = names.into_iter().map(|name| name.into()).collect(); ensure!( @@ -383,7 +404,7 @@ impl RelationDesc { pub fn new_unchecked(typ: RelationType, names: I) -> Self where I: IntoIterator, - N: Into, + N: Into>, { let names: Vec<_> = names.into_iter().map(|name| name.into()).collect(); assert_eq!(typ.arity(), names.len()); @@ -394,7 +415,7 @@ impl RelationDesc { where I: IntoIterator, T: Into, - N: Into, + N: Into>, { let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip(); let types = types.into_iter().map(Into::into).collect(); @@ -420,7 +441,7 @@ impl RelationDesc { /// Appends a column with the specified name and type. pub fn with_column(mut self, name: N, column_type: ColumnType) -> Self where - N: Into, + N: Into>, { self.typ.column_types.push(column_type); self.names.push(name.into()); @@ -445,7 +466,7 @@ impl RelationDesc { pub fn try_with_names(self, names: I) -> Result where I: IntoIterator, - N: Into, + N: Into>, { Self::try_new(self.typ, names) } @@ -461,7 +482,7 @@ impl RelationDesc { } /// Returns an iterator over the columns in this relation. - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator, &ColumnType)> { self.iter_names().zip(self.iter_types()) } @@ -471,7 +492,7 @@ impl RelationDesc { } /// Returns an iterator over the names of the columns in this relation. - pub fn iter_names(&self) -> impl Iterator { + pub fn iter_names(&self) -> impl Iterator> { self.names.iter() } @@ -482,7 +503,7 @@ impl RelationDesc { /// specified name, the leftmost column is returned. pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> { self.iter_names() - .position(|n| n == name) + .position(|n| n.as_ref() == Some(name)) .map(|i| (i, &self.typ.column_types[i])) } @@ -491,7 +512,7 @@ impl RelationDesc { /// # Panics /// /// Panics if `i` is not a valid column index. - pub fn get_name(&self, i: usize) -> &ColumnName { + pub fn get_name(&self, i: usize) -> &Option { &self.names[i] } @@ -500,7 +521,7 @@ impl RelationDesc { /// # Panics /// /// Panics if `i` is not a valid column index. - pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName { + pub fn get_name_mut(&mut self, i: usize) -> &mut Option { &mut self.names[i] } @@ -515,7 +536,7 @@ impl RelationDesc { pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> { let name = &self.names[i]; if self.iter_names().filter(|n| *n == name).count() == 1 { - Some(name) + name.as_ref() } else { None } diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index bb28c8630b..39166cce13 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -212,7 +212,7 @@ mod test { let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema.into_unnamed()); + schemas.insert(gid, schema.into_named(vec![Some("number".to_string())])); } { @@ -226,7 +226,10 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), ColumnType::new(CDT::datetime_datatype(), false), ]); - schemas.insert(gid, schema.into_unnamed()); + schemas.insert( + gid, + schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]), + ); tri_map.insert(Some(name.clone()), Some(1025), gid); } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 8b69146c15..688f616ebf 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -285,6 +285,20 @@ impl KeyValPlan { } } +/// find out the column that should be time index in group exprs(which is all columns that should be keys) +/// TODO(discord9): better ways to assign time index +fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { + group_exprs.iter().position(|expr| { + matches!( + &expr.expr, + ScalarExpr::CallUnary { + func: UnaryFunc::TumbleWindowFloor { .. }, + expr: _ + } + ) + }) +} + impl TypedPlan { /// Convert AggregateRel into Flow's TypedPlan /// @@ -306,44 +320,57 @@ impl TypedPlan { let group_exprs = TypedExpr::from_substrait_agg_grouping( ctx, &agg.groupings, - &input.typ, + &input.schema.typ, extensions, )?; - TypedExpr::expand_multi_value(&input.typ, &group_exprs)? + TypedExpr::expand_multi_value(&input.schema.typ, &group_exprs)? }; - let time_index = group_exprs.iter().position(|expr| { - matches!( - &expr.expr, - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowFloor { .. }, - expr: _ - } - ) - }); + let time_index = find_time_index_in_group_exprs(&group_exprs); - let (mut aggr_exprs, post_mfp) = - AggregateExpr::from_substrait_agg_measures(ctx, &agg.measures, &input.typ, extensions)?; + let (mut aggr_exprs, post_mfp) = AggregateExpr::from_substrait_agg_measures( + ctx, + &agg.measures, + &input.schema.typ, + extensions, + )?; let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan( &mut aggr_exprs, &group_exprs, - input.typ.column_types.len(), + input.schema.typ.column_types.len(), )?; // output type is group_exprs + aggr_exprs let output_type = { let mut output_types = Vec::new(); + // give best effort to get column name + let mut output_names = Vec::new(); // first append group_expr as key, then aggr_expr as value for expr in &group_exprs { output_types.push(expr.typ.clone()); + let col_name = match &expr.expr { + ScalarExpr::CallUnary { + func: UnaryFunc::TumbleWindowFloor { .. }, + .. + } => Some("window_start".to_string()), + ScalarExpr::CallUnary { + func: UnaryFunc::TumbleWindowCeiling { .. }, + .. + } => Some("window_end".to_string()), + ScalarExpr::Column(col) => input.schema.get_name(*col).clone(), + _ => None, + }; + output_names.push(col_name) } for aggr in &aggr_exprs { output_types.push(ColumnType::new_nullable( aggr.func.signature().output.clone(), )); + // TODO(discord9): find a clever way to name them? + output_names.push(None); } // TODO(discord9): try best to get time if group_exprs.is_empty() { @@ -351,8 +378,9 @@ impl TypedPlan { } else { RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec()) } - } - .with_time_index(time_index); + .with_time_index(time_index) + .into_named(output_names) + }; // copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs // also set them input/output column @@ -390,13 +418,13 @@ impl TypedPlan { // FIX(discord9): deal with key first if post_mfp.is_identity() { Ok(TypedPlan { - typ: output_type, + schema: output_type, plan, }) } else { // make post_mfp map identical mapping of keys let input = TypedPlan { - typ: output_type.clone(), + schema: output_type.clone(), plan, }; let key_arity = group_exprs.len(); @@ -414,7 +442,7 @@ impl TypedPlan { .filter(f)? .project(p)?; Ok(TypedPlan { - typ: output_type.apply_mfp(&post_mfp)?, + schema: output_type.apply_mfp(&post_mfp.clone().into_safe())?, plan: Plan::Mfp { input: Box::new(input), mfp: post_mfp, @@ -434,7 +462,16 @@ mod test { use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + /// TODO(discord9): add more illegal sql tests + #[tokio::test] + async fn tes_missing_key_check() { + let engine = create_test_query_engine(); + let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; + let plan = sql_to_substrait(engine.clone(), sql).await; + let mut ctx = create_test_ctx(); + assert!(TypedPlan::from_substrait_plan(&mut ctx, &plan).is_err()); + } /// TODO(discord9): add more illegal sql tests #[tokio::test] async fn test_tumble_composite() { @@ -470,10 +507,6 @@ mod test { els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), }; let expected = TypedPlan { - // TODO(discord9): mfp indirectly ref to key columns - /* - .with_key(vec![1]) - .with_time_index(Some(0)),*/ plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -481,10 +514,16 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(1)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_named(vec![ + Some("number".to_string()), + Some("ts".to_string()), + ]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(2) @@ -540,7 +579,14 @@ mod test { ColumnType::new(CDT::int64_datatype(), true), // avg.count(number) ]) .with_key(vec![1, 2]) - .with_time_index(Some(0)), + .with_time_index(Some(0)) + .into_named(vec![ + Some("window_start".to_string()), + Some("window_end".to_string()), + Some("number".to_string()), + None, + None, + ]), ), ), mfp: MapFilterProject::new(5) @@ -555,11 +601,19 @@ mod test { .project(vec![6, 7, 8, 9]) .unwrap(), }, - typ: RelationType::new(vec![ + schema: RelationType::new(vec![ ColumnType::new(CDT::uint32_datatype(), false), // number ColumnType::new(CDT::uint64_datatype(), true), // avg(number) ColumnType::new(CDT::datetime_datatype(), false), // window start ColumnType::new(CDT::datetime_datatype(), false), // window end + ]) + .with_key(vec![0, 3]) + .with_time_index(Some(2)) + .into_named(vec![ + Some("number".to_string()), + None, + Some("window_start".to_string()), + Some("window_end".to_string()), ]), }; assert_eq!(flow_plan, expected); @@ -580,10 +634,17 @@ mod test { distinct: false, }; let expected = TypedPlan { - typ: RelationType::new(vec![ + schema: RelationType::new(vec![ ColumnType::new(CDT::uint64_datatype(), true), // sum(number) ColumnType::new(CDT::datetime_datatype(), false), // window start ColumnType::new(CDT::datetime_datatype(), false), // window end + ]) + .with_key(vec![2]) + .with_time_index(Some(1)) + .into_named(vec![ + None, + Some("window_start".to_string()), + Some("window_end".to_string()), ]), // TODO(discord9): mfp indirectly ref to key columns /* @@ -596,10 +657,16 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(1)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_named(vec![ + Some("number".to_string()), + Some("ts".to_string()), + ]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(2) @@ -647,7 +714,12 @@ mod test { ColumnType::new(CDT::uint64_datatype(), true), //sum(number) ]) .with_key(vec![1]) - .with_time_index(Some(0)), + .with_time_index(Some(0)) + .into_named(vec![ + Some("window_start".to_string()), + Some("window_end".to_string()), + None, + ]), ), ), mfp: MapFilterProject::new(3) @@ -680,15 +752,18 @@ mod test { distinct: false, }; let expected = TypedPlan { - typ: RelationType::new(vec![ + schema: RelationType::new(vec![ ColumnType::new(CDT::uint64_datatype(), true), // sum(number) ColumnType::new(CDT::datetime_datatype(), false), // window start ColumnType::new(CDT::datetime_datatype(), false), // window end + ]) + .with_key(vec![2]) + .with_time_index(Some(1)) + .into_named(vec![ + None, + Some("window_start".to_string()), + Some("window_end".to_string()), ]), - // TODO(discord9): mfp indirectly ref to key columns - /* - .with_key(vec![1]) - .with_time_index(Some(0)),*/ plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -696,10 +771,16 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(1)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_named(vec![ + Some("number".to_string()), + Some("ts".to_string()), + ]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(2) @@ -747,7 +828,12 @@ mod test { ColumnType::new(CDT::uint64_datatype(), true), //sum(number) ]) .with_key(vec![1]) - .with_time_index(Some(0)), + .with_time_index(Some(0)) + .into_named(vec![ + Some("window_start".to_string()), + Some("window_end".to_string()), + None, + ]), ), ), mfp: MapFilterProject::new(3) @@ -798,10 +884,12 @@ mod test { els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), }; let expected = TypedPlan { - typ: RelationType::new(vec![ + schema: RelationType::new(vec![ ColumnType::new(CDT::uint64_datatype(), true), // sum(number) -> u64 ColumnType::new(CDT::uint32_datatype(), false), // number - ]), + ]) + .with_key(vec![1]) + .into_named(vec![None, Some("number".to_string())]), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -809,9 +897,13 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(1) @@ -840,7 +932,12 @@ mod test { ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum ColumnType::new(ConcreteDataType::int64_datatype(), true), // count ]) - .with_key(vec![0]), + .with_key(vec![0]) + .into_named(vec![ + Some("number".to_string()), + None, + None, + ]), ), ), mfp: MapFilterProject::new(3) @@ -892,7 +989,8 @@ mod test { els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), }; let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]) + .into_named(vec![None]), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -900,9 +998,13 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(1) @@ -923,10 +1025,13 @@ mod test { distinct_aggrs: vec![], }), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint64_datatype(), true), - ColumnType::new(ConcreteDataType::int64_datatype(), true), - ])), + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum + ColumnType::new(ConcreteDataType::int64_datatype(), true), // count + ]) + .into_named(vec![None, None]), + ), ), mfp: MapFilterProject::new(2) .map(vec![ @@ -960,7 +1065,8 @@ mod test { distinct: false, }; let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]) + .into_unnamed(), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -968,9 +1074,13 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(1) @@ -988,7 +1098,7 @@ mod test { distinct_aggrs: vec![], }), } - .with_types(typ), + .with_types(typ.into_unnamed()), ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0), ScalarExpr::Column(1)]) @@ -1015,10 +1125,12 @@ mod test { distinct: false, }; let expected = TypedPlan { - typ: RelationType::new(vec![ + schema: RelationType::new(vec![ ColumnType::new(CDT::uint64_datatype(), true), // col sum(number) ColumnType::new(CDT::uint32_datatype(), false), // col number - ]), + ]) + .with_key(vec![1]) + .into_named(vec![None, Some("number".to_string())]), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -1026,9 +1138,13 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(1) @@ -1053,7 +1169,8 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), // col number ColumnType::new(CDT::uint64_datatype(), true), // col sum(number) ]) - .with_key(vec![0]), + .with_key(vec![0]) + .into_named(vec![Some("number".to_string()), None]), ), ), mfp: MapFilterProject::new(2) @@ -1086,7 +1203,8 @@ mod test { distinct: false, }; let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]) + .into_unnamed(), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -1094,9 +1212,13 @@ mod test { Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(1) @@ -1117,10 +1239,10 @@ mod test { distinct_aggrs: vec![], }), } - .with_types(RelationType::new(vec![ColumnType::new( - CDT::uint64_datatype(), - true, - )])), + .with_types( + RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]) + .into_unnamed(), + ), ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0), ScalarExpr::Column(1)]) diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 74fc7ef617..eb3f9bafc3 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -363,16 +363,20 @@ mod test { ], }; let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]) + .into_named(vec![Some("number".to_string())]), plan: Plan::Mfp { input: Box::new( Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0)]) @@ -397,7 +401,8 @@ mod test { let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan); let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::boolean_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::boolean_datatype(), true)]) + .into_unnamed(), plan: Plan::Constant { rows: vec![( repr::Row::new(vec![Value::from(true)]), @@ -421,16 +426,20 @@ mod test { let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan); let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]) + .into_unnamed(), plan: Plan::Mfp { input: Box::new( Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0).call_binary( @@ -455,16 +464,20 @@ mod test { let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan); let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)]) + .into_unnamed(), plan: Plan::Mfp { input: Box::new( Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Literal( @@ -490,16 +503,20 @@ mod test { let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan); let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]) + .into_unnamed(), plan: Plan::Mfp { input: Box::new( Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0) diff --git a/src/flow/src/transform/literal.rs b/src/flow/src/transform/literal.rs index 41008fd992..d13d74393b 100644 --- a/src/flow/src/transform/literal.rs +++ b/src/flow/src/transform/literal.rs @@ -175,7 +175,8 @@ mod test { let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan); let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]), + schema: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]) + .into_unnamed(), plan: Plan::Constant { rows: vec![( repr::Row::new(vec![Value::Int64(1)]), diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 337eba7eef..813266ee4d 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use itertools::Itertools; use snafu::OptionExt; @@ -22,11 +22,11 @@ use substrait_proto::proto::rel::RelType; use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel}; use crate::adapter::error::{ - Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, + Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, }; use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc}; use crate::plan::{KeyValPlan, Plan, ReducePlan, TypedPlan}; -use crate::repr::{self, RelationType}; +use crate::repr::{self, RelationDesc, RelationType}; use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions}; impl TypedPlan { @@ -80,7 +80,7 @@ impl TypedPlan { let mut exprs: Vec = vec![]; for e in &p.expressions { - let expr = TypedExpr::from_substrait_rex(e, &input.typ, extensions)?; + let expr = TypedExpr::from_substrait_rex(e, &input.schema.typ, extensions)?; exprs.push(expr); } let is_literal = exprs.iter().all(|expr| expr.expr.is_literal()); @@ -98,123 +98,24 @@ impl TypedPlan { let plan = Plan::Constant { rows: vec![(row, repr::Timestamp::MIN, 1)], }; - Ok(TypedPlan { typ, plan }) + Ok(TypedPlan { + schema: typ.into_unnamed(), + plan, + }) } else { - /// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs - fn rewrite_projection_after_reduce( - key_val_plan: KeyValPlan, - _reduce_plan: ReducePlan, - reduce_output_type: &RelationType, - proj_exprs: &mut Vec, - ) -> Result<(), Error> { - // TODO: get keys correctly - let key_exprs = key_val_plan - .key_plan - .projection - .clone() - .into_iter() - .map(|i| { - if i < key_val_plan.key_plan.input_arity { - ScalarExpr::Column(i) - } else { - key_val_plan.key_plan.expressions - [i - key_val_plan.key_plan.input_arity] - .clone() - } - }) - .collect_vec(); - let mut shift_offset = 0; - let special_keys = key_exprs - .into_iter() - .enumerate() - .filter(|(_idx, p)| { - if matches!( - p, - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowFloor { .. }, - .. - } | ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowCeiling { .. }, - .. - } - ) { - if matches!( - p, - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowFloor { .. }, - .. - } - ) { - shift_offset += 1; - } - true - } else { - false - } - }) - .collect_vec(); - let spec_key_arity = special_keys.len(); - if spec_key_arity == 0 { - return Ok(()); - } - - { - // shift proj_exprs to the right by spec_key_arity - let max_used_col_in_proj = proj_exprs - .iter() - .map(|expr| { - expr.expr - .get_all_ref_columns() - .into_iter() - .max() - .unwrap_or_default() - }) - .max() - .unwrap_or_default(); - - let shuffle = (0..=max_used_col_in_proj) - .map(|col| (col, col + shift_offset)) - .collect::>(); - for proj_expr in proj_exprs.iter_mut() { - proj_expr.expr.permute_map(&shuffle)?; - } // add key to the end - for (key_idx, _key_expr) in special_keys { - // here we assume the output type of reduce operator is just first keys columns, then append value columns - proj_exprs.push( - ScalarExpr::Column(key_idx).with_type( - reduce_output_type.column_types[key_idx].clone(), - ), - ); - } - } - - Ok(()) - } - match input.plan.clone() { - Plan::Reduce { - key_val_plan, - reduce_plan, - .. - } => { + Plan::Reduce { key_val_plan, .. } => { rewrite_projection_after_reduce( key_val_plan, - reduce_plan, - &input.typ, + &input.schema, &mut exprs, )?; } Plan::Mfp { input, mfp: _ } => { - if let Plan::Reduce { - key_val_plan, - reduce_plan, - .. - } = input.plan - { + if let Plan::Reduce { key_val_plan, .. } = input.plan { rewrite_projection_after_reduce( key_val_plan, - reduce_plan, - &input.typ, + &input.schema, &mut exprs, )?; } @@ -232,7 +133,7 @@ impl TypedPlan { }; let expr = if let Some(condition) = filter.condition.as_ref() { - TypedExpr::from_substrait_rex(condition, &input.typ, extensions)? + TypedExpr::from_substrait_rex(condition, &input.schema.typ, extensions)? } else { return not_impl_err!("Filter without an condition is not valid"); }; @@ -269,7 +170,7 @@ impl TypedPlan { id: crate::expr::Id::Global(table.0), }; let get_table = TypedPlan { - typ: table.1.typ().clone(), + schema: table.1, plan: get_table, }; @@ -283,10 +184,10 @@ impl TypedPlan { .iter() .map(|item| item.field as usize) .collect(); - let input_arity = get_table.typ.column_types.len(); + let input_arity = get_table.schema.typ().column_types.len(); let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?; - get_table.mfp(mfp) + get_table.mfp(mfp.into_safe()) } else { Ok(get_table) } @@ -302,9 +203,117 @@ impl TypedPlan { } } +/// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs +/// so the effect is the window_start, window_end column are auto added to output rows +/// +/// This is to fix a problem that we have certain functions that return two values, but since substrait doesn't know that, it will assume it return one value +/// this function fix that and rewrite `proj_exprs` to correct form +fn rewrite_projection_after_reduce( + key_val_plan: KeyValPlan, + reduce_output_type: &RelationDesc, + proj_exprs: &mut Vec, +) -> Result<(), Error> { + // TODO: get keys correctly + let key_exprs = key_val_plan + .key_plan + .projection + .clone() + .into_iter() + .map(|i| { + if i < key_val_plan.key_plan.input_arity { + ScalarExpr::Column(i) + } else { + key_val_plan.key_plan.expressions[i - key_val_plan.key_plan.input_arity].clone() + } + }) + .collect_vec(); + let mut shift_offset = 0; + let mut shuffle: BTreeMap = BTreeMap::new(); + let special_keys = key_exprs + .clone() + .into_iter() + .enumerate() + .filter(|(idx, p)| { + shuffle.insert(*idx, *idx + shift_offset); + if matches!( + p, + ScalarExpr::CallUnary { + func: UnaryFunc::TumbleWindowFloor { .. }, + .. + } | ScalarExpr::CallUnary { + func: UnaryFunc::TumbleWindowCeiling { .. }, + .. + } + ) { + if matches!( + p, + ScalarExpr::CallUnary { + func: UnaryFunc::TumbleWindowFloor { .. }, + .. + } + ) { + shift_offset += 1; + } + true + } else { + false + } + }) + .collect_vec(); + let spec_key_arity = special_keys.len(); + if spec_key_arity == 0 { + return Ok(()); + } + + // shuffle proj_exprs + // because substrait use offset while assume `tumble` only return one value + for proj_expr in proj_exprs.iter_mut() { + proj_expr.expr.permute_map(&shuffle)?; + } // add key to the end + for (key_idx, _key_expr) in special_keys { + // here we assume the output type of reduce operator(`reduce_output_type`) is just first keys columns, then append value columns + // so we can use `key_idx` to index `reduce_output_type` and get the keys we need to append to `proj_exprs` + proj_exprs.push( + ScalarExpr::Column(key_idx) + .with_type(reduce_output_type.typ().column_types[key_idx].clone()), + ); + } + + // check if normal expr in group exprs are all in proj_exprs + let all_cols_ref_in_proj: BTreeSet = proj_exprs + .iter() + .filter_map(|e| { + if let ScalarExpr::Column(i) = &e.expr { + Some(*i) + } else { + None + } + }) + .collect(); + for (key_idx, key_expr) in key_exprs.iter().enumerate() { + if let ScalarExpr::Column(_) = key_expr { + if !all_cols_ref_in_proj.contains(&key_idx) { + let err_msg = format!( + "Expect normal column in group by also appear in projection, but column {}(name is {}) is missing", + key_idx, + reduce_output_type + .get_name(key_idx) + .clone() + .map(|s|format!("'{}'",s)) + .unwrap_or("unknown".to_string()) + ); + return InvalidQuerySnafu { reason: err_msg }.fail(); + } + } + } + + Ok(()) +} + #[cfg(test)] mod test { use datatypes::prelude::ConcreteDataType; + use pretty_assertions::assert_eq; use super::*; use crate::expr::{GlobalId, ScalarExpr}; @@ -323,16 +332,20 @@ mod test { let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan); let expected = TypedPlan { - typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]), + schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]) + .into_named(vec![Some("number".to_string())]), plan: Plan::Mfp { input: Box::new( Plan::Get { id: crate::expr::Id::Global(GlobalId::User(0)), } - .with_types(RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )])), + .with_types( + RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_named(vec![Some("number".to_string())]), + ), ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0)])