fix(flow): infer table schema correctly (#4113)

* refactor: make individual col name optional

* chore: rename TypedPlan's `typ` to `schema`

* feat: add optional col name to typed plan

* feat: pass col name all along

* feat: correct infer output table schema

* chore: unused import

* fix: error when key is not projected

* refactor: per review

* chore: fmt
This commit is contained in:
discord9
2024-06-11 16:57:47 +08:00
committed by GitHub
parent 5533bd9293
commit 1b00526de5
17 changed files with 584 additions and 313 deletions

View File

@@ -265,7 +265,7 @@ impl FlownodeManager {
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
let (primary_keys, schema, is_auto_create) = if let Some(table_id) = self
let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(&table_name)
.await?
@@ -317,7 +317,12 @@ impl FlownodeManager {
.map(|v| {
v.column_indices
.iter()
.map(|i| format!("Col_{i}"))
.map(|i| {
schema
.get_name(*i)
.clone()
.unwrap_or_else(|| format!("Col_{i}"))
})
.collect_vec()
})
.unwrap_or_default();
@@ -326,15 +331,8 @@ impl FlownodeManager {
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
let ts_col = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
let wout_ts = schema
let original_schema = schema
.typ()
.column_types
.clone()
@@ -345,16 +343,33 @@ impl FlownodeManager {
.names
.get(idx)
.cloned()
.flatten()
.unwrap_or(format!("Col_{}", idx));
ColumnSchema::new(name, typ.scalar_type, typ.nullable)
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
if schema.typ().time_index == Some(idx) {
ret.with_time_index(true)
} else {
ret
}
})
.collect_vec();
let mut with_ts = wout_ts.clone();
with_ts.push(update_at);
with_ts.push(ts_col);
let mut with_auto_added_col = original_schema.clone();
with_auto_added_col.push(update_at);
(primary_keys, with_ts, true)
// if no time index, add one as placeholder
let no_time_index = schema.typ().time_index.is_none();
if no_time_index {
let ts_col = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
with_auto_added_col.push(ts_col);
}
(primary_keys, with_auto_added_col, no_time_index)
};
let schema_len = schema.len();
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
@@ -377,7 +392,7 @@ impl FlownodeManager {
now,
))]);
// ts col, if auto create
if is_auto_create {
if is_ts_placeholder {
ensure!(
row.len() == schema_len - 1,
InternalSnafu {
@@ -508,12 +523,13 @@ impl FlownodeManager {
debug!("Starting to run");
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
debug!("call run_available in run every second");
self.run_available(true).await.unwrap();
debug!("call send_writeback_requests in run every second");
if let Err(err) = self.run_available(true).await {
common_telemetry::error!(err;"Run available errors");
}
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
debug!("call log_all_errors in run every second");
if let Err(err) = self.send_writeback_requests().await {
common_telemetry::error!(err;"Send writeback request errors");
};
self.log_all_errors().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
@@ -596,6 +612,7 @@ impl FlownodeManager {
break;
}
}
self.node_context.write().await.remove_flow(flow_id);
Ok(())
}
@@ -642,8 +659,9 @@ impl FlownodeManager {
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
let _ = comment;
let _ = flow_options;

View File

@@ -26,6 +26,7 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::adapter::error::InternalSnafu;
use crate::adapter::FlownodeManager;
use crate::repr::{self, DiffRow};
@@ -126,6 +127,16 @@ impl Flownode for FlownodeManager {
.context(UnexpectedSnafu {
err_msg: format!("Table not found: {}", table_id),
})?;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
Some(name) => Ok(name.clone()),
None => InternalSnafu {
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
}
.fail().map_err(BoxedError::new).context(ExternalSnafu),
})
.collect::<Result<Vec<_>>>()?;
let name_to_col = HashMap::<_, _>::from_iter(
insert_schema
.iter()

View File

@@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP};
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP};
/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
@@ -36,6 +36,7 @@ pub struct FlownodeContext {
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub flow_to_sink: BTreeMap<FlowId, TableName>,
pub sink_to_flow: BTreeMap<TableName, FlowId>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
@@ -184,7 +185,21 @@ impl FlownodeContext {
}
self.add_sink_receiver(sink_table_name.clone());
self.flow_to_sink.insert(task_id, sink_table_name);
self.flow_to_sink.insert(task_id, sink_table_name.clone());
self.sink_to_flow.insert(sink_table_name, task_id);
}
/// remove flow from worker context
pub fn remove_flow(&mut self, task_id: FlowId) {
if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) {
self.sink_to_flow.remove(&sink_table_name);
}
for (source_table_id, tasks) in self.source_to_tasks.iter_mut() {
tasks.remove(&task_id);
if tasks.is_empty() {
self.source_sender.remove(source_table_id);
}
}
}
/// try add source sender, if already exist, do nothing
@@ -307,14 +322,14 @@ impl FlownodeContext {
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationType,
schema: RelationDesc,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema.into_unnamed());
self.schema.insert(gid, schema);
Ok(())
}

View File

@@ -132,7 +132,7 @@ impl TableSource {
nullable: col.is_nullable(),
scalar_type: col.data_type,
},
col.name,
Some(col.name),
)
})
.unzip();

View File

@@ -514,7 +514,7 @@ mod test {
plan: Plan::Get {
id: Id::Global(GlobalId::User(1)),
},
typ: RelationType::new(vec![]),
schema: RelationType::new(vec![]).into_unnamed(),
},
);
let create_reqs = Request::Create {

View File

@@ -111,7 +111,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce(input, key_val_plan, reduce_plan, plan.typ),
} => self.render_reduce(input, key_val_plan, reduce_plan, plan.schema.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}

View File

@@ -252,7 +252,7 @@ mod test {
.unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
// drop ctx here to simulate actual process of compile first, run later scenario
@@ -312,7 +312,7 @@ mod test {
)])
.unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
@@ -348,7 +348,7 @@ mod test {
)])
.unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);

View File

@@ -813,11 +813,12 @@ mod test {
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
]),
])
.into_unnamed(),
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
@@ -829,10 +830,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])
.into_unnamed(),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
@@ -880,7 +884,8 @@ mod test {
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0)),
.with_time_index(Some(0))
.into_unnamed(),
),
),
mfp: MapFilterProject::new(3)
@@ -977,7 +982,8 @@ mod test {
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -985,9 +991,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::int64_datatype(), false),
])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::int64_datatype(),
false,
)])
.into_unnamed(),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
@@ -1008,10 +1018,13 @@ mod test {
distinct_aggrs: vec![],
}),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), true),
ColumnType::new(ConcreteDataType::int64_datatype(), true),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), true),
ColumnType::new(ConcreteDataType::int64_datatype(), true),
])
.into_unnamed(),
),
),
mfp: MapFilterProject::new(2)
.map(vec![
@@ -1068,7 +1081,7 @@ mod test {
let reduce_plan = ReducePlan::Distinct;
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
@@ -1143,7 +1156,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
@@ -1224,7 +1237,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
@@ -1301,7 +1314,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
@@ -1393,7 +1406,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),

View File

@@ -62,7 +62,6 @@ impl<'referred, 'df> Context<'referred, 'df> {
let arr = arranged.get_updates_in_range(..=now);
err_collector.run(|| arranged.compact_to(now));
debug!("Call source");
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
let mut to_send = Vec::new();
let mut to_arrange = Vec::new();

View File

@@ -321,6 +321,38 @@ impl MapFilterProject {
pub fn optimize(&mut self) {
// TODO(discord9): optimize
}
/// get the mapping of old columns to new columns after the mfp
pub fn get_old_to_new_mapping(&self) -> BTreeMap<usize, usize> {
BTreeMap::from_iter(
self.projection
.clone()
.into_iter()
.enumerate()
.map(|(new, old)| {
// `projection` give the new -> old mapping
let mut old = old;
// trace back to the original column
// since there maybe indirect ref to old columns like
// col 2 <- expr=col(2) at pos col 4 <- expr=col(4) at pos col 6
// ideally such indirect ref should be optimize away
// TODO(discord9): refactor this after impl `optimize()`
while let Some(ScalarExpr::Column(prev)) = if old >= self.input_arity {
// get the correspond expr if not a original column
self.expressions.get(old - self.input_arity)
} else {
// we don't care about non column ref case since only need old to new column mapping
// in which case, the old->new mapping remain the same
None
} {
old = *prev;
if old < self.input_arity {
break;
}
}
(old, new)
}),
)
}
/// Convert the `MapFilterProject` into a staged evaluation plan.
///

View File

@@ -30,21 +30,22 @@ use crate::expr::{
};
use crate::plan::join::JoinPlan;
pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan};
use crate::repr::{ColumnType, DiffRow, RelationType};
use crate::repr::{ColumnType, DiffRow, RelationDesc, RelationType};
/// A plan for a dataflow component. But with type to indicate the output type of the relation.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub struct TypedPlan {
/// output type of the relation
pub typ: RelationType,
pub schema: RelationDesc,
/// The untyped plan.
pub plan: Plan,
}
impl TypedPlan {
/// directly apply a mfp to the plan
pub fn mfp(self, mfp: MapFilterProject) -> Result<Self, Error> {
let new_type = self.typ.apply_mfp(&mfp)?;
pub fn mfp(self, mfp: SafeMfpPlan) -> Result<Self, Error> {
let new_type = self.schema.apply_mfp(&mfp)?;
let mfp = mfp.mfp;
let plan = match self.plan {
Plan::Mfp {
input,
@@ -59,14 +60,14 @@ impl TypedPlan {
},
};
Ok(TypedPlan {
typ: new_type,
schema: new_type,
plan,
})
}
/// project the plan to the given expressions
pub fn projection(self, exprs: Vec<TypedExpr>) -> Result<Self, Error> {
let input_arity = self.typ.column_types.len();
let input_arity = self.schema.typ.column_types.len();
let output_arity = exprs.len();
let (exprs, _expr_typs): (Vec<_>, Vec<_>) = exprs
.into_iter()
@@ -74,8 +75,10 @@ impl TypedPlan {
.unzip();
let mfp = MapFilterProject::new(input_arity)
.map(exprs)?
.project(input_arity..input_arity + output_arity)?;
let out_typ = self.typ.apply_mfp(&mfp)?;
.project(input_arity..input_arity + output_arity)?
.into_safe();
let out_typ = self.schema.apply_mfp(&mfp)?;
let mfp = mfp.mfp;
// special case for mfp to compose when the plan is already mfp
let plan = match self.plan {
Plan::Mfp {
@@ -90,12 +93,15 @@ impl TypedPlan {
mfp,
},
};
Ok(TypedPlan { typ: out_typ, plan })
Ok(TypedPlan {
schema: out_typ,
plan,
})
}
/// Add a new filter to the plan, will filter out the records that do not satisfy the filter
pub fn filter(self, filter: TypedExpr) -> Result<Self, Error> {
let typ = self.typ.clone();
let typ = self.schema.clone();
let plan = match self.plan {
Plan::Mfp {
input,
@@ -106,10 +112,10 @@ impl TypedPlan {
},
_ => Plan::Mfp {
input: Box::new(self),
mfp: MapFilterProject::new(typ.column_types.len()).filter(vec![filter.expr])?,
mfp: MapFilterProject::new(typ.typ.column_types.len()).filter(vec![filter.expr])?,
},
};
Ok(TypedPlan { typ, plan })
Ok(TypedPlan { schema: typ, plan })
}
}
@@ -227,7 +233,7 @@ impl Plan {
}
impl Plan {
pub fn with_types(self, typ: RelationType) -> TypedPlan {
TypedPlan { typ, plan: self }
pub fn with_types(self, schema: RelationDesc) -> TypedPlan {
TypedPlan { schema, plan: self }
}
}

View File

@@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use crate::adapter::error::{InvalidQuerySnafu, Result, UnexpectedSnafu};
use crate::expr::MapFilterProject;
use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr};
/// a set of column indices that are "keys" for the collection.
#[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
@@ -111,7 +111,8 @@ impl RelationType {
/// then new key=`[1]`, new time index=`[0]`
///
/// note that this function will remove empty keys like key=`[]` will be removed
pub fn apply_mfp(&self, mfp: &MapFilterProject) -> Result<Self> {
pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result<Self> {
let mfp = &mfp.mfp;
let mut all_types = self.column_types.clone();
for expr in &mfp.expressions {
let expr_typ = expr.typ(&self.column_types)?;
@@ -132,13 +133,7 @@ impl RelationType {
})
.try_collect()?;
let old_to_new_col = BTreeMap::from_iter(
mfp.projection
.clone()
.into_iter()
.enumerate()
.map(|(new, old)| (old, new)),
);
let old_to_new_col = mfp.get_old_to_new_mapping();
// since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform
let keys = self
@@ -264,15 +259,15 @@ impl RelationType {
}
/// Return relation describe with column names
pub fn into_named(self, names: Vec<ColumnName>) -> RelationDesc {
pub fn into_named(self, names: Vec<Option<ColumnName>>) -> RelationDesc {
RelationDesc { typ: self, names }
}
/// Return relation describe without column names
pub fn into_unnamed(self) -> RelationDesc {
RelationDesc {
names: vec![None; self.column_types.len()],
typ: self,
names: vec![],
}
}
}
@@ -336,10 +331,36 @@ fn return_true() -> bool {
///
/// It bundles a [`RelationType`] with the name of each column in the relation.
/// Individual column names are optional.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct RelationDesc {
pub typ: RelationType,
pub names: Vec<ColumnName>,
pub names: Vec<Option<ColumnName>>,
}
impl RelationDesc {
/// apply mfp, and also project col names for the projected columns
pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result<Self> {
// TODO: find a way to deduce name at best effect
let names = {
let mfp = &mfp.mfp;
let mut names = self.names.clone();
for expr in &mfp.expressions {
if let ScalarExpr::Column(i) = expr {
names.push(self.names.get(*i).cloned().flatten());
} else {
names.push(None);
}
}
mfp.projection
.iter()
.map(|i| names.get(*i).cloned().flatten())
.collect_vec()
};
Ok(Self {
typ: self.typ.apply_mfp(mfp)?,
names,
})
}
}
impl RelationDesc {
@@ -358,7 +379,7 @@ impl RelationDesc {
pub fn try_new<I, N>(typ: RelationType, names: I) -> Result<Self>
where
I: IntoIterator<Item = N>,
N: Into<ColumnName>,
N: Into<Option<ColumnName>>,
{
let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
ensure!(
@@ -383,7 +404,7 @@ impl RelationDesc {
pub fn new_unchecked<I, N>(typ: RelationType, names: I) -> Self
where
I: IntoIterator<Item = N>,
N: Into<ColumnName>,
N: Into<Option<ColumnName>>,
{
let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
assert_eq!(typ.arity(), names.len());
@@ -394,7 +415,7 @@ impl RelationDesc {
where
I: IntoIterator<Item = (N, T)>,
T: Into<ColumnType>,
N: Into<ColumnName>,
N: Into<Option<ColumnName>>,
{
let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
let types = types.into_iter().map(Into::into).collect();
@@ -420,7 +441,7 @@ impl RelationDesc {
/// Appends a column with the specified name and type.
pub fn with_column<N>(mut self, name: N, column_type: ColumnType) -> Self
where
N: Into<ColumnName>,
N: Into<Option<ColumnName>>,
{
self.typ.column_types.push(column_type);
self.names.push(name.into());
@@ -445,7 +466,7 @@ impl RelationDesc {
pub fn try_with_names<I, N>(self, names: I) -> Result<Self>
where
I: IntoIterator<Item = N>,
N: Into<ColumnName>,
N: Into<Option<ColumnName>>,
{
Self::try_new(self.typ, names)
}
@@ -461,7 +482,7 @@ impl RelationDesc {
}
/// Returns an iterator over the columns in this relation.
pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
pub fn iter(&self) -> impl Iterator<Item = (&Option<ColumnName>, &ColumnType)> {
self.iter_names().zip(self.iter_types())
}
@@ -471,7 +492,7 @@ impl RelationDesc {
}
/// Returns an iterator over the names of the columns in this relation.
pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
pub fn iter_names(&self) -> impl Iterator<Item = &Option<ColumnName>> {
self.names.iter()
}
@@ -482,7 +503,7 @@ impl RelationDesc {
/// specified name, the leftmost column is returned.
pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
self.iter_names()
.position(|n| n == name)
.position(|n| n.as_ref() == Some(name))
.map(|i| (i, &self.typ.column_types[i]))
}
@@ -491,7 +512,7 @@ impl RelationDesc {
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_name(&self, i: usize) -> &ColumnName {
pub fn get_name(&self, i: usize) -> &Option<ColumnName> {
&self.names[i]
}
@@ -500,7 +521,7 @@ impl RelationDesc {
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
pub fn get_name_mut(&mut self, i: usize) -> &mut Option<ColumnName> {
&mut self.names[i]
}
@@ -515,7 +536,7 @@ impl RelationDesc {
pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
let name = &self.names[i];
if self.iter_names().filter(|n| *n == name).count() == 1 {
Some(name)
name.as_ref()
} else {
None
}

View File

@@ -212,7 +212,7 @@ mod test {
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
tri_map.insert(Some(name.clone()), Some(1024), gid);
schemas.insert(gid, schema.into_unnamed());
schemas.insert(gid, schema.into_named(vec![Some("number".to_string())]));
}
{
@@ -226,7 +226,10 @@ mod test {
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::datetime_datatype(), false),
]);
schemas.insert(gid, schema.into_unnamed());
schemas.insert(
gid,
schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
);
tri_map.insert(Some(name.clone()), Some(1025), gid);
}

View File

@@ -285,6 +285,20 @@ impl KeyValPlan {
}
}
/// find out the column that should be time index in group exprs(which is all columns that should be keys)
/// TODO(discord9): better ways to assign time index
fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option<usize> {
group_exprs.iter().position(|expr| {
matches!(
&expr.expr,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
expr: _
}
)
})
}
impl TypedPlan {
/// Convert AggregateRel into Flow's TypedPlan
///
@@ -306,44 +320,57 @@ impl TypedPlan {
let group_exprs = TypedExpr::from_substrait_agg_grouping(
ctx,
&agg.groupings,
&input.typ,
&input.schema.typ,
extensions,
)?;
TypedExpr::expand_multi_value(&input.typ, &group_exprs)?
TypedExpr::expand_multi_value(&input.schema.typ, &group_exprs)?
};
let time_index = group_exprs.iter().position(|expr| {
matches!(
&expr.expr,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
expr: _
}
)
});
let time_index = find_time_index_in_group_exprs(&group_exprs);
let (mut aggr_exprs, post_mfp) =
AggregateExpr::from_substrait_agg_measures(ctx, &agg.measures, &input.typ, extensions)?;
let (mut aggr_exprs, post_mfp) = AggregateExpr::from_substrait_agg_measures(
ctx,
&agg.measures,
&input.schema.typ,
extensions,
)?;
let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan(
&mut aggr_exprs,
&group_exprs,
input.typ.column_types.len(),
input.schema.typ.column_types.len(),
)?;
// output type is group_exprs + aggr_exprs
let output_type = {
let mut output_types = Vec::new();
// give best effort to get column name
let mut output_names = Vec::new();
// first append group_expr as key, then aggr_expr as value
for expr in &group_exprs {
output_types.push(expr.typ.clone());
let col_name = match &expr.expr {
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
..
} => Some("window_start".to_string()),
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowCeiling { .. },
..
} => Some("window_end".to_string()),
ScalarExpr::Column(col) => input.schema.get_name(*col).clone(),
_ => None,
};
output_names.push(col_name)
}
for aggr in &aggr_exprs {
output_types.push(ColumnType::new_nullable(
aggr.func.signature().output.clone(),
));
// TODO(discord9): find a clever way to name them?
output_names.push(None);
}
// TODO(discord9): try best to get time
if group_exprs.is_empty() {
@@ -351,8 +378,9 @@ impl TypedPlan {
} else {
RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
}
}
.with_time_index(time_index);
.with_time_index(time_index)
.into_named(output_names)
};
// copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs
// also set them input/output column
@@ -390,13 +418,13 @@ impl TypedPlan {
// FIX(discord9): deal with key first
if post_mfp.is_identity() {
Ok(TypedPlan {
typ: output_type,
schema: output_type,
plan,
})
} else {
// make post_mfp map identical mapping of keys
let input = TypedPlan {
typ: output_type.clone(),
schema: output_type.clone(),
plan,
};
let key_arity = group_exprs.len();
@@ -414,7 +442,7 @@ impl TypedPlan {
.filter(f)?
.project(p)?;
Ok(TypedPlan {
typ: output_type.apply_mfp(&post_mfp)?,
schema: output_type.apply_mfp(&post_mfp.clone().into_safe())?,
plan: Plan::Mfp {
input: Box::new(input),
mfp: post_mfp,
@@ -434,7 +462,16 @@ mod test {
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
/// TODO(discord9): add more illegal sql tests
#[tokio::test]
async fn tes_missing_key_check() {
let engine = create_test_query_engine();
let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
assert!(TypedPlan::from_substrait_plan(&mut ctx, &plan).is_err());
}
/// TODO(discord9): add more illegal sql tests
#[tokio::test]
async fn test_tumble_composite() {
@@ -470,10 +507,6 @@ mod test {
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
.with_time_index(Some(0)),*/
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -481,10 +514,16 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
@@ -540,7 +579,14 @@ mod test {
ColumnType::new(CDT::int64_datatype(), true), // avg.count(number)
])
.with_key(vec![1, 2])
.with_time_index(Some(0)),
.with_time_index(Some(0))
.into_named(vec![
Some("window_start".to_string()),
Some("window_end".to_string()),
Some("number".to_string()),
None,
None,
]),
),
),
mfp: MapFilterProject::new(5)
@@ -555,11 +601,19 @@ mod test {
.project(vec![6, 7, 8, 9])
.unwrap(),
},
typ: RelationType::new(vec![
schema: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false), // number
ColumnType::new(CDT::uint64_datatype(), true), // avg(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
])
.with_key(vec![0, 3])
.with_time_index(Some(2))
.into_named(vec![
Some("number".to_string()),
None,
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
};
assert_eq!(flow_plan, expected);
@@ -580,10 +634,17 @@ mod test {
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
])
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
None,
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
// TODO(discord9): mfp indirectly ref to key columns
/*
@@ -596,10 +657,16 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
@@ -647,7 +714,12 @@ mod test {
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0)),
.with_time_index(Some(0))
.into_named(vec![
Some("window_start".to_string()),
Some("window_end".to_string()),
None,
]),
),
),
mfp: MapFilterProject::new(3)
@@ -680,15 +752,18 @@ mod test {
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
])
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
None,
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
.with_time_index(Some(0)),*/
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -696,10 +771,16 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
@@ -747,7 +828,12 @@ mod test {
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0)),
.with_time_index(Some(0))
.into_named(vec![
Some("window_start".to_string()),
Some("window_end".to_string()),
None,
]),
),
),
mfp: MapFilterProject::new(3)
@@ -798,10 +884,12 @@ mod test {
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
typ: RelationType::new(vec![
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number) -> u64
ColumnType::new(CDT::uint32_datatype(), false), // number
]),
])
.with_key(vec![1])
.into_named(vec![None, Some("number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -809,9 +897,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
@@ -840,7 +932,12 @@ mod test {
ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum
ColumnType::new(ConcreteDataType::int64_datatype(), true), // count
])
.with_key(vec![0]),
.with_key(vec![0])
.into_named(vec![
Some("number".to_string()),
None,
None,
]),
),
),
mfp: MapFilterProject::new(3)
@@ -892,7 +989,8 @@ mod test {
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_named(vec![None]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -900,9 +998,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
@@ -923,10 +1025,13 @@ mod test {
distinct_aggrs: vec![],
}),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint64_datatype(), true),
ColumnType::new(ConcreteDataType::int64_datatype(), true),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum
ColumnType::new(ConcreteDataType::int64_datatype(), true), // count
])
.into_named(vec![None, None]),
),
),
mfp: MapFilterProject::new(2)
.map(vec![
@@ -960,7 +1065,8 @@ mod test {
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -968,9 +1074,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
@@ -988,7 +1098,7 @@ mod test {
distinct_aggrs: vec![],
}),
}
.with_types(typ),
.with_types(typ.into_unnamed()),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0), ScalarExpr::Column(1)])
@@ -1015,10 +1125,12 @@ mod test {
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // col sum(number)
ColumnType::new(CDT::uint32_datatype(), false), // col number
]),
])
.with_key(vec![1])
.into_named(vec![None, Some("number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -1026,9 +1138,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
@@ -1053,7 +1169,8 @@ mod test {
ColumnType::new(CDT::uint32_datatype(), false), // col number
ColumnType::new(CDT::uint64_datatype(), true), // col sum(number)
])
.with_key(vec![0]),
.with_key(vec![0])
.into_named(vec![Some("number".to_string()), None]),
),
),
mfp: MapFilterProject::new(2)
@@ -1086,7 +1203,8 @@ mod test {
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -1094,9 +1212,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
@@ -1117,10 +1239,10 @@ mod test {
distinct_aggrs: vec![],
}),
}
.with_types(RelationType::new(vec![ColumnType::new(
CDT::uint64_datatype(),
true,
)])),
.with_types(
RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_unnamed(),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0), ScalarExpr::Column(1)])

View File

@@ -363,16 +363,20 @@ mod test {
],
};
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0)])
@@ -397,7 +401,8 @@ mod test {
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::boolean_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::boolean_datatype(), true)])
.into_unnamed(),
plan: Plan::Constant {
rows: vec![(
repr::Row::new(vec![Value::from(true)]),
@@ -421,16 +426,20 @@ mod test {
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0).call_binary(
@@ -455,16 +464,20 @@ mod test {
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Literal(
@@ -490,16 +503,20 @@ mod test {
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0)

View File

@@ -175,7 +175,8 @@ mod test {
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)])
.into_unnamed(),
plan: Plan::Constant {
rows: vec![(
repr::Row::new(vec![Value::Int64(1)]),

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use itertools::Itertools;
use snafu::OptionExt;
@@ -22,11 +22,11 @@ use substrait_proto::proto::rel::RelType;
use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel};
use crate::adapter::error::{
Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
};
use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc};
use crate::plan::{KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, RelationType};
use crate::repr::{self, RelationDesc, RelationType};
use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
impl TypedPlan {
@@ -80,7 +80,7 @@ impl TypedPlan {
let mut exprs: Vec<TypedExpr> = vec![];
for e in &p.expressions {
let expr = TypedExpr::from_substrait_rex(e, &input.typ, extensions)?;
let expr = TypedExpr::from_substrait_rex(e, &input.schema.typ, extensions)?;
exprs.push(expr);
}
let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
@@ -98,123 +98,24 @@ impl TypedPlan {
let plan = Plan::Constant {
rows: vec![(row, repr::Timestamp::MIN, 1)],
};
Ok(TypedPlan { typ, plan })
Ok(TypedPlan {
schema: typ.into_unnamed(),
plan,
})
} else {
/// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs
fn rewrite_projection_after_reduce(
key_val_plan: KeyValPlan,
_reduce_plan: ReducePlan,
reduce_output_type: &RelationType,
proj_exprs: &mut Vec<TypedExpr>,
) -> Result<(), Error> {
// TODO: get keys correctly
let key_exprs = key_val_plan
.key_plan
.projection
.clone()
.into_iter()
.map(|i| {
if i < key_val_plan.key_plan.input_arity {
ScalarExpr::Column(i)
} else {
key_val_plan.key_plan.expressions
[i - key_val_plan.key_plan.input_arity]
.clone()
}
})
.collect_vec();
let mut shift_offset = 0;
let special_keys = key_exprs
.into_iter()
.enumerate()
.filter(|(_idx, p)| {
if matches!(
p,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
..
} | ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowCeiling { .. },
..
}
) {
if matches!(
p,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
..
}
) {
shift_offset += 1;
}
true
} else {
false
}
})
.collect_vec();
let spec_key_arity = special_keys.len();
if spec_key_arity == 0 {
return Ok(());
}
{
// shift proj_exprs to the right by spec_key_arity
let max_used_col_in_proj = proj_exprs
.iter()
.map(|expr| {
expr.expr
.get_all_ref_columns()
.into_iter()
.max()
.unwrap_or_default()
})
.max()
.unwrap_or_default();
let shuffle = (0..=max_used_col_in_proj)
.map(|col| (col, col + shift_offset))
.collect::<BTreeMap<_, _>>();
for proj_expr in proj_exprs.iter_mut() {
proj_expr.expr.permute_map(&shuffle)?;
} // add key to the end
for (key_idx, _key_expr) in special_keys {
// here we assume the output type of reduce operator is just first keys columns, then append value columns
proj_exprs.push(
ScalarExpr::Column(key_idx).with_type(
reduce_output_type.column_types[key_idx].clone(),
),
);
}
}
Ok(())
}
match input.plan.clone() {
Plan::Reduce {
key_val_plan,
reduce_plan,
..
} => {
Plan::Reduce { key_val_plan, .. } => {
rewrite_projection_after_reduce(
key_val_plan,
reduce_plan,
&input.typ,
&input.schema,
&mut exprs,
)?;
}
Plan::Mfp { input, mfp: _ } => {
if let Plan::Reduce {
key_val_plan,
reduce_plan,
..
} = input.plan
{
if let Plan::Reduce { key_val_plan, .. } = input.plan {
rewrite_projection_after_reduce(
key_val_plan,
reduce_plan,
&input.typ,
&input.schema,
&mut exprs,
)?;
}
@@ -232,7 +133,7 @@ impl TypedPlan {
};
let expr = if let Some(condition) = filter.condition.as_ref() {
TypedExpr::from_substrait_rex(condition, &input.typ, extensions)?
TypedExpr::from_substrait_rex(condition, &input.schema.typ, extensions)?
} else {
return not_impl_err!("Filter without an condition is not valid");
};
@@ -269,7 +170,7 @@ impl TypedPlan {
id: crate::expr::Id::Global(table.0),
};
let get_table = TypedPlan {
typ: table.1.typ().clone(),
schema: table.1,
plan: get_table,
};
@@ -283,10 +184,10 @@ impl TypedPlan {
.iter()
.map(|item| item.field as usize)
.collect();
let input_arity = get_table.typ.column_types.len();
let input_arity = get_table.schema.typ().column_types.len();
let mfp =
MapFilterProject::new(input_arity).project(column_indices.clone())?;
get_table.mfp(mfp)
get_table.mfp(mfp.into_safe())
} else {
Ok(get_table)
}
@@ -302,9 +203,117 @@ impl TypedPlan {
}
}
/// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs
/// so the effect is the window_start, window_end column are auto added to output rows
///
/// This is to fix a problem that we have certain functions that return two values, but since substrait doesn't know that, it will assume it return one value
/// this function fix that and rewrite `proj_exprs` to correct form
fn rewrite_projection_after_reduce(
key_val_plan: KeyValPlan,
reduce_output_type: &RelationDesc,
proj_exprs: &mut Vec<TypedExpr>,
) -> Result<(), Error> {
// TODO: get keys correctly
let key_exprs = key_val_plan
.key_plan
.projection
.clone()
.into_iter()
.map(|i| {
if i < key_val_plan.key_plan.input_arity {
ScalarExpr::Column(i)
} else {
key_val_plan.key_plan.expressions[i - key_val_plan.key_plan.input_arity].clone()
}
})
.collect_vec();
let mut shift_offset = 0;
let mut shuffle: BTreeMap<usize, usize> = BTreeMap::new();
let special_keys = key_exprs
.clone()
.into_iter()
.enumerate()
.filter(|(idx, p)| {
shuffle.insert(*idx, *idx + shift_offset);
if matches!(
p,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
..
} | ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowCeiling { .. },
..
}
) {
if matches!(
p,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
..
}
) {
shift_offset += 1;
}
true
} else {
false
}
})
.collect_vec();
let spec_key_arity = special_keys.len();
if spec_key_arity == 0 {
return Ok(());
}
// shuffle proj_exprs
// because substrait use offset while assume `tumble` only return one value
for proj_expr in proj_exprs.iter_mut() {
proj_expr.expr.permute_map(&shuffle)?;
} // add key to the end
for (key_idx, _key_expr) in special_keys {
// here we assume the output type of reduce operator(`reduce_output_type`) is just first keys columns, then append value columns
// so we can use `key_idx` to index `reduce_output_type` and get the keys we need to append to `proj_exprs`
proj_exprs.push(
ScalarExpr::Column(key_idx)
.with_type(reduce_output_type.typ().column_types[key_idx].clone()),
);
}
// check if normal expr in group exprs are all in proj_exprs
let all_cols_ref_in_proj: BTreeSet<usize> = proj_exprs
.iter()
.filter_map(|e| {
if let ScalarExpr::Column(i) = &e.expr {
Some(*i)
} else {
None
}
})
.collect();
for (key_idx, key_expr) in key_exprs.iter().enumerate() {
if let ScalarExpr::Column(_) = key_expr {
if !all_cols_ref_in_proj.contains(&key_idx) {
let err_msg = format!(
"Expect normal column in group by also appear in projection, but column {}(name is {}) is missing",
key_idx,
reduce_output_type
.get_name(key_idx)
.clone()
.map(|s|format!("'{}'",s))
.unwrap_or("unknown".to_string())
);
return InvalidQuerySnafu { reason: err_msg }.fail();
}
}
}
Ok(())
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use pretty_assertions::assert_eq;
use super::*;
use crate::expr::{GlobalId, ScalarExpr};
@@ -323,16 +332,20 @@ mod test {
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0)])