mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat(flow): add types for every plan enum variant (#3938)
* feat: Plan with types * chore: per review advices
This commit is contained in:
17
Cargo.lock
generated
17
Cargo.lock
generated
@@ -3824,6 +3824,7 @@ dependencies = [
|
||||
"minstant",
|
||||
"nom",
|
||||
"num-traits",
|
||||
"pretty_assertions",
|
||||
"prost 0.12.4",
|
||||
"query",
|
||||
"serde",
|
||||
@@ -7364,6 +7365,16 @@ dependencies = [
|
||||
"termtree",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pretty_assertions"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66"
|
||||
dependencies = [
|
||||
"diff",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prettydiff"
|
||||
version = "0.6.4"
|
||||
@@ -12603,6 +12614,12 @@ dependencies = [
|
||||
"linked-hash-map",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "yansi"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.7.34"
|
||||
|
||||
@@ -50,6 +50,7 @@ tonic.workspace = true
|
||||
[dev-dependencies]
|
||||
catalog.workspace = true
|
||||
common-catalog.workspace = true
|
||||
pretty_assertions = "1.4.0"
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
serde_json = "1.0"
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::expr::error::{DataTypeSnafu, InternalSnafu};
|
||||
use crate::expr::{
|
||||
self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr,
|
||||
};
|
||||
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan};
|
||||
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan};
|
||||
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
|
||||
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};
|
||||
|
||||
@@ -101,8 +101,8 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
/// Interpret and execute plan
|
||||
///
|
||||
/// return the output of this plan
|
||||
pub fn render_plan(&mut self, plan: Plan) -> Result<CollectionBundle, Error> {
|
||||
match plan {
|
||||
pub fn render_plan(&mut self, plan: TypedPlan) -> Result<CollectionBundle, Error> {
|
||||
match plan.plan {
|
||||
Plan::Constant { rows } => Ok(self.render_constant(rows)),
|
||||
Plan::Get { id } => self.get_by_id(id),
|
||||
Plan::Let { id, value, body } => self.eval_let(id, value, body),
|
||||
@@ -193,8 +193,8 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
pub fn eval_let(
|
||||
&mut self,
|
||||
id: LocalId,
|
||||
value: Box<Plan>,
|
||||
body: Box<Plan>,
|
||||
value: Box<TypedPlan>,
|
||||
body: Box<TypedPlan>,
|
||||
) -> Result<CollectionBundle, Error> {
|
||||
let value = self.render_plan(*value)?;
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::compute::render::Context;
|
||||
use crate::compute::state::Scheduler;
|
||||
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
|
||||
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
|
||||
use crate::plan::Plan;
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
|
||||
use crate::utils::ArrangeHandler;
|
||||
|
||||
@@ -38,7 +38,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
pub fn render_mfp(
|
||||
&mut self,
|
||||
input: Box<Plan>,
|
||||
input: Box<TypedPlan>,
|
||||
mfp: MapFilterProject,
|
||||
) -> Result<CollectionBundle, Error> {
|
||||
let input = self.render_plan(*input)?;
|
||||
@@ -184,6 +184,7 @@ mod test {
|
||||
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
|
||||
use crate::compute::state::DataflowState;
|
||||
use crate::expr::{self, BinaryFunc, GlobalId};
|
||||
use crate::repr::{ColumnType, RelationType};
|
||||
|
||||
/// test if temporal filter works properly
|
||||
/// namely: if mfp operator can schedule a delete at the correct time
|
||||
@@ -203,6 +204,9 @@ mod test {
|
||||
let input_plan = Plan::Get {
|
||||
id: expr::Id::Global(GlobalId::User(1)),
|
||||
};
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
// temporal filter: now <= col(0) < now + 4
|
||||
let mfp = MapFilterProject::new(1)
|
||||
.filter(vec![
|
||||
@@ -225,7 +229,9 @@ mod test {
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap();
|
||||
let bundle = ctx
|
||||
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
|
||||
.unwrap();
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
// drop ctx here to simulate actual process of compile first, run later scenario
|
||||
drop(ctx);
|
||||
@@ -273,6 +279,9 @@ mod test {
|
||||
let input_plan = Plan::Get {
|
||||
id: expr::Id::Global(GlobalId::User(1)),
|
||||
};
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
// filter: col(0)>1
|
||||
let mfp = MapFilterProject::new(1)
|
||||
.filter(vec![ScalarExpr::Column(0).call_binary(
|
||||
@@ -280,7 +289,9 @@ mod test {
|
||||
BinaryFunc::Gt,
|
||||
)])
|
||||
.unwrap();
|
||||
let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap();
|
||||
let bundle = ctx
|
||||
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
|
||||
.unwrap();
|
||||
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
drop(ctx);
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::compute::state::Scheduler;
|
||||
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
|
||||
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
|
||||
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
|
||||
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan};
|
||||
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
|
||||
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
|
||||
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter};
|
||||
|
||||
@@ -39,7 +39,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
pub fn render_reduce(
|
||||
&mut self,
|
||||
input: Box<Plan>,
|
||||
input: Box<TypedPlan>,
|
||||
key_val_plan: KeyValPlan,
|
||||
reduce_plan: ReducePlan,
|
||||
) -> Result<CollectionBundle, Error> {
|
||||
@@ -736,6 +736,7 @@ mod test {
|
||||
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
|
||||
use crate::compute::state::DataflowState;
|
||||
use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject};
|
||||
use crate::repr::{ColumnType, RelationType};
|
||||
|
||||
/// SELECT DISTINCT col FROM table
|
||||
///
|
||||
@@ -762,13 +763,20 @@ mod test {
|
||||
let input_plan = Plan::Get {
|
||||
id: expr::Id::Global(GlobalId::User(1)),
|
||||
};
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
let key_val_plan = KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
|
||||
val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
|
||||
};
|
||||
let reduce_plan = ReducePlan::Distinct;
|
||||
let bundle = ctx
|
||||
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
|
||||
.render_reduce(
|
||||
Box::new(input_plan.with_types(typ)),
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
@@ -809,6 +817,9 @@ mod test {
|
||||
let input_plan = Plan::Get {
|
||||
id: expr::Id::Global(GlobalId::User(1)),
|
||||
};
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
let key_val_plan = KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
|
||||
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
|
||||
@@ -835,7 +846,11 @@ mod test {
|
||||
|
||||
let reduce_plan = ReducePlan::Accumulable(accum_plan);
|
||||
let bundle = ctx
|
||||
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
|
||||
.render_reduce(
|
||||
Box::new(input_plan.with_types(typ)),
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
@@ -882,6 +897,9 @@ mod test {
|
||||
let input_plan = Plan::Get {
|
||||
id: expr::Id::Global(GlobalId::User(1)),
|
||||
};
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
let key_val_plan = KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
|
||||
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
|
||||
@@ -908,7 +926,11 @@ mod test {
|
||||
|
||||
let reduce_plan = ReducePlan::Accumulable(accum_plan);
|
||||
let bundle = ctx
|
||||
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
|
||||
.render_reduce(
|
||||
Box::new(input_plan.with_types(typ)),
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
@@ -951,6 +973,9 @@ mod test {
|
||||
let input_plan = Plan::Get {
|
||||
id: expr::Id::Global(GlobalId::User(1)),
|
||||
};
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
let key_val_plan = KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
|
||||
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
|
||||
@@ -977,7 +1002,11 @@ mod test {
|
||||
|
||||
let reduce_plan = ReducePlan::Accumulable(accum_plan);
|
||||
let bundle = ctx
|
||||
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
|
||||
.render_reduce(
|
||||
Box::new(input_plan.with_types(typ)),
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
@@ -1020,6 +1049,9 @@ mod test {
|
||||
let input_plan = Plan::Get {
|
||||
id: expr::Id::Global(GlobalId::User(1)),
|
||||
};
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
let key_val_plan = KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
|
||||
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
|
||||
@@ -1061,7 +1093,11 @@ mod test {
|
||||
|
||||
let reduce_plan = ReducePlan::Accumulable(accum_plan);
|
||||
let bundle = ctx
|
||||
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
|
||||
.render_reduce(
|
||||
Box::new(input_plan.with_types(typ)),
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
|
||||
@@ -21,7 +21,9 @@ use datatypes::value::Value;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::adapter::error::{Error, InvalidQuerySnafu, UnsupportedTemporalFilterSnafu};
|
||||
use crate::adapter::error::{
|
||||
Error, InvalidQuerySnafu, UnexpectedSnafu, UnsupportedTemporalFilterSnafu,
|
||||
};
|
||||
use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
|
||||
use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
|
||||
use crate::repr::ColumnType;
|
||||
@@ -80,6 +82,34 @@ pub enum ScalarExpr {
|
||||
},
|
||||
}
|
||||
|
||||
impl ScalarExpr {
|
||||
/// try to determine the type of the expression
|
||||
pub fn typ(&self, context: &[ColumnType]) -> Result<ColumnType, Error> {
|
||||
match self {
|
||||
ScalarExpr::Column(i) => context.get(*i).cloned().ok_or_else(|| {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("column index {} out of range of len={}", i, context.len()),
|
||||
}
|
||||
.build()
|
||||
}),
|
||||
ScalarExpr::Literal(_, typ) => Ok(ColumnType::new_nullable(typ.clone())),
|
||||
ScalarExpr::CallUnmaterializable(func) => {
|
||||
Ok(ColumnType::new_nullable(func.signature().output))
|
||||
}
|
||||
ScalarExpr::CallUnary { func, .. } => {
|
||||
Ok(ColumnType::new_nullable(func.signature().output))
|
||||
}
|
||||
ScalarExpr::CallBinary { func, .. } => {
|
||||
Ok(ColumnType::new_nullable(func.signature().output))
|
||||
}
|
||||
ScalarExpr::CallVariadic { func, .. } => {
|
||||
Ok(ColumnType::new_nullable(func.signature().output))
|
||||
}
|
||||
ScalarExpr::If { then, .. } => then.typ(context),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarExpr {
|
||||
/// apply optimization to the expression, like flatten variadic function
|
||||
pub fn optimize(&mut self) {
|
||||
|
||||
@@ -44,6 +44,7 @@ pub struct TypedPlan {
|
||||
impl TypedPlan {
|
||||
/// directly apply a mfp to the plan
|
||||
pub fn mfp(self, mfp: MapFilterProject) -> Result<Self, Error> {
|
||||
let new_type = self.typ.apply_mfp(&mfp, &[])?;
|
||||
let plan = match self.plan {
|
||||
Plan::Mfp {
|
||||
input,
|
||||
@@ -53,12 +54,12 @@ impl TypedPlan {
|
||||
mfp: MapFilterProject::compose(old_mfp, mfp)?,
|
||||
},
|
||||
_ => Plan::Mfp {
|
||||
input: Box::new(self.plan),
|
||||
input: Box::new(self),
|
||||
mfp,
|
||||
},
|
||||
};
|
||||
Ok(TypedPlan {
|
||||
typ: self.typ,
|
||||
typ: new_type,
|
||||
plan,
|
||||
})
|
||||
}
|
||||
@@ -85,7 +86,7 @@ impl TypedPlan {
|
||||
mfp: MapFilterProject::compose(old_mfp, mfp)?,
|
||||
},
|
||||
_ => Plan::Mfp {
|
||||
input: Box::new(self.plan),
|
||||
input: Box::new(self),
|
||||
mfp,
|
||||
},
|
||||
};
|
||||
@@ -94,6 +95,7 @@ impl TypedPlan {
|
||||
|
||||
/// Add a new filter to the plan, will filter out the records that do not satisfy the filter
|
||||
pub fn filter(self, filter: TypedExpr) -> Result<Self, Error> {
|
||||
let typ = self.typ.clone();
|
||||
let plan = match self.plan {
|
||||
Plan::Mfp {
|
||||
input,
|
||||
@@ -103,15 +105,11 @@ impl TypedPlan {
|
||||
mfp: old_mfp.filter(vec![filter.expr])?,
|
||||
},
|
||||
_ => Plan::Mfp {
|
||||
input: Box::new(self.plan),
|
||||
mfp: MapFilterProject::new(self.typ.column_types.len())
|
||||
.filter(vec![filter.expr])?,
|
||||
input: Box::new(self),
|
||||
mfp: MapFilterProject::new(typ.column_types.len()).filter(vec![filter.expr])?,
|
||||
},
|
||||
};
|
||||
Ok(TypedPlan {
|
||||
typ: self.typ,
|
||||
plan,
|
||||
})
|
||||
Ok(TypedPlan { typ, plan })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,20 +133,20 @@ pub enum Plan {
|
||||
/// }
|
||||
Let {
|
||||
id: LocalId,
|
||||
value: Box<Plan>,
|
||||
body: Box<Plan>,
|
||||
value: Box<TypedPlan>,
|
||||
body: Box<TypedPlan>,
|
||||
},
|
||||
/// Map, Filter, and Project operators. Chained together.
|
||||
Mfp {
|
||||
/// The input collection.
|
||||
input: Box<Plan>,
|
||||
input: Box<TypedPlan>,
|
||||
/// Linear operator to apply to each record.
|
||||
mfp: MapFilterProject,
|
||||
},
|
||||
/// Reduce operator, aggregation by key assembled from KeyValPlan
|
||||
Reduce {
|
||||
/// The input collection.
|
||||
input: Box<Plan>,
|
||||
input: Box<TypedPlan>,
|
||||
/// A plan for changing input records into key, value pairs.
|
||||
key_val_plan: KeyValPlan,
|
||||
/// A plan for performing the reduce.
|
||||
@@ -164,7 +162,7 @@ pub enum Plan {
|
||||
/// strategy we will use, and any pushed down per-record work.
|
||||
Join {
|
||||
/// An ordered list of inputs that will be joined.
|
||||
inputs: Vec<Plan>,
|
||||
inputs: Vec<TypedPlan>,
|
||||
/// Detailed information about the implementation of the join.
|
||||
///
|
||||
/// This includes information about the implementation strategy, but also
|
||||
@@ -180,7 +178,7 @@ pub enum Plan {
|
||||
/// implementing the "distinct" operator.
|
||||
Union {
|
||||
/// The input collections
|
||||
inputs: Vec<Plan>,
|
||||
inputs: Vec<TypedPlan>,
|
||||
/// Whether to consolidate the output, e.g., cancel negated records.
|
||||
consolidate_output: bool,
|
||||
},
|
||||
@@ -200,23 +198,23 @@ impl Plan {
|
||||
};
|
||||
}
|
||||
Plan::Let { value, body, .. } => {
|
||||
recur_find_use(value, used);
|
||||
recur_find_use(body, used);
|
||||
recur_find_use(&value.plan, used);
|
||||
recur_find_use(&body.plan, used);
|
||||
}
|
||||
Plan::Mfp { input, .. } => {
|
||||
recur_find_use(input, used);
|
||||
recur_find_use(&input.plan, used);
|
||||
}
|
||||
Plan::Reduce { input, .. } => {
|
||||
recur_find_use(input, used);
|
||||
recur_find_use(&input.plan, used);
|
||||
}
|
||||
Plan::Join { inputs, .. } => {
|
||||
for input in inputs {
|
||||
recur_find_use(input, used);
|
||||
recur_find_use(&input.plan, used);
|
||||
}
|
||||
}
|
||||
Plan::Union { inputs, .. } => {
|
||||
for input in inputs {
|
||||
recur_find_use(input, used);
|
||||
recur_find_use(&input.plan, used);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
@@ -227,3 +225,9 @@ impl Plan {
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl Plan {
|
||||
pub fn with_types(self, typ: RelationType) -> TypedPlan {
|
||||
TypedPlan { typ, plan: self }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,7 +181,12 @@ impl RelationType {
|
||||
}
|
||||
|
||||
/// Adds a new key for the relation. Also sorts the key indices.
|
||||
///
|
||||
/// will ignore empty key
|
||||
pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
|
||||
if indices.is_empty() {
|
||||
return self;
|
||||
}
|
||||
indices.sort_unstable();
|
||||
let key = Key::from(indices);
|
||||
if !self.keys.contains(&key) {
|
||||
@@ -191,6 +196,8 @@ impl RelationType {
|
||||
}
|
||||
|
||||
/// Adds new keys for the relation. Also sorts the key indices.
|
||||
///
|
||||
/// will ignore empty keys
|
||||
pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
|
||||
for key in keys {
|
||||
self = self.with_key(key)
|
||||
|
||||
@@ -285,7 +285,7 @@ impl TypedPlan {
|
||||
distinct_aggrs,
|
||||
};
|
||||
let plan = Plan::Reduce {
|
||||
input: Box::new(input.plan),
|
||||
input: Box::new(input),
|
||||
key_val_plan,
|
||||
reduce_plan: ReducePlan::Accumulable(accum_plan),
|
||||
};
|
||||
@@ -298,6 +298,9 @@ impl TypedPlan {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use pretty_assertions::{assert_eq, assert_ne};
|
||||
|
||||
use super::*;
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
@@ -311,7 +314,10 @@ mod test {
|
||||
|
||||
let mut ctx = create_test_ctx();
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
|
||||
|
||||
let typ = RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
true,
|
||||
)]);
|
||||
let aggr_expr = AggregateExpr {
|
||||
func: AggregateFunc::SumUInt32,
|
||||
expr: ScalarExpr::Column(0),
|
||||
@@ -320,26 +326,34 @@ mod test {
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Reduce {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1)
|
||||
.project(vec![])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(1)
|
||||
.project(vec![0])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![
|
||||
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||
])),
|
||||
),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1)
|
||||
.project(vec![])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(1)
|
||||
.project(vec![0])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}
|
||||
.with_types(typ),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
@@ -370,28 +384,42 @@ mod test {
|
||||
ColumnType::new(CDT::uint32_datatype(), false), // col number
|
||||
]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Reduce {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
.project(vec![1])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(1)
|
||||
.project(vec![0])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![
|
||||
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||
])),
|
||||
),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
.project(vec![1])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(1)
|
||||
.project(vec![0])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}
|
||||
.with_types(
|
||||
RelationType::new(vec![
|
||||
ColumnType::new(CDT::uint32_datatype(), false), // col number
|
||||
ColumnType::new(CDT::uint32_datatype(), true), // col sum(number)
|
||||
])
|
||||
.with_key(vec![0]),
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(2)
|
||||
.map(vec![ScalarExpr::Column(1), ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
@@ -420,29 +448,40 @@ mod test {
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Reduce {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1)
|
||||
.project(vec![])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)
|
||||
.call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)])
|
||||
.unwrap()
|
||||
.project(vec![1])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![
|
||||
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||
])),
|
||||
),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1)
|
||||
.project(vec![])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)
|
||||
.call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)])
|
||||
.unwrap()
|
||||
.project(vec![1])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}
|
||||
.with_types(RelationType::new(vec![ColumnType::new(
|
||||
CDT::uint32_datatype(),
|
||||
true,
|
||||
)])),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
|
||||
@@ -321,6 +321,7 @@ impl TypedExpr {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
|
||||
use super::*;
|
||||
@@ -359,9 +360,15 @@ mod test {
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
)])),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
@@ -411,9 +418,15 @@ mod test {
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
)])),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0).call_binary(
|
||||
ScalarExpr::Literal(Value::from(1u32), CDT::uint32_datatype()),
|
||||
@@ -439,9 +452,15 @@ mod test {
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
)])),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Literal(
|
||||
Value::Int64(1),
|
||||
@@ -468,9 +487,15 @@ mod test {
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
)])),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)
|
||||
.call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)])
|
||||
|
||||
@@ -180,6 +180,8 @@ impl TypedPlan {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
|
||||
use super::*;
|
||||
use crate::expr::{GlobalId, ScalarExpr};
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
@@ -199,9 +201,15 @@ mod test {
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}),
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(0)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
)])),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
|
||||
Reference in New Issue
Block a user