mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
fix(flow): fix call df func bug&sqlness test (#4165)
* tests: flow sqlness tests * tests: WIP df func test * fix: use schema before expand for transform expr * tests: some basic flow tests * tests: unit test * chore: dep use rev not patch * fix: wired sqlness error? * refactor: per review * fix: temp sqlness bug * fix: use fixed sqlness * fix: impl drop as async shutdown * refactor: per bot's review * tests: drop worker handler both sync/async * docs: add rationale for test * refactor: per review * chore: fmt
This commit is contained in:
30
Cargo.lock
generated
30
Cargo.lock
generated
@@ -3584,6 +3584,20 @@ version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
|
||||
|
||||
[[package]]
|
||||
name = "duration-str"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "709d653e7c92498eb29fb86a2a6f0f3502b97530f33aedb32ef848d4d28b31a3"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"rust_decimal",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"time",
|
||||
"winnow 0.6.13",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dyn-clone"
|
||||
version = "1.0.17"
|
||||
@@ -6216,6 +6230,15 @@ dependencies = [
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minijinja"
|
||||
version = "1.0.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55e877d961d4f96ce13615862322df7c0b6d169d40cab71a7ef3f9b9e594451e"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minimal-lexical"
|
||||
version = "0.2.1"
|
||||
@@ -10576,14 +10599,17 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness"
|
||||
version = "0.5.0"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0860f149718809371602b42573693e1ed2b1d0aed35fe69e04e4e4e9918d81f7"
|
||||
checksum = "308a7338f2211813d6e9da117e9b9b7aee5d072872d11a934002fd2bd4ab5276"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"derive_builder 0.11.2",
|
||||
"duration-str",
|
||||
"minijinja",
|
||||
"prettydiff",
|
||||
"regex",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"toml 0.5.11",
|
||||
"walkdir",
|
||||
|
||||
@@ -321,7 +321,7 @@ impl FlownodeManager {
|
||||
schema
|
||||
.get_name(*i)
|
||||
.clone()
|
||||
.unwrap_or_else(|| format!("Col_{i}"))
|
||||
.unwrap_or_else(|| format!("col_{i}"))
|
||||
})
|
||||
.collect_vec()
|
||||
})
|
||||
@@ -344,7 +344,7 @@ impl FlownodeManager {
|
||||
.get(idx)
|
||||
.cloned()
|
||||
.flatten()
|
||||
.unwrap_or(format!("Col_{}", idx));
|
||||
.unwrap_or(format!("col_{}", idx));
|
||||
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
|
||||
if schema.typ().time_index == Some(idx) {
|
||||
ret.with_time_index(true)
|
||||
|
||||
@@ -148,6 +148,8 @@ impl TableSource {
|
||||
column_types,
|
||||
keys,
|
||||
time_index,
|
||||
// by default table schema's column are all non-auto
|
||||
auto_columns: vec![],
|
||||
},
|
||||
names: col_names,
|
||||
},
|
||||
|
||||
@@ -206,10 +206,15 @@ impl WorkerHandle {
|
||||
|
||||
impl Drop for WorkerHandle {
|
||||
fn drop(&mut self) {
|
||||
if let Err(err) = self.shutdown_blocking() {
|
||||
common_telemetry::error!("Fail to shutdown worker: {:?}", err)
|
||||
let ret = futures::executor::block_on(async { self.shutdown().await });
|
||||
if let Err(ret) = ret {
|
||||
common_telemetry::error!(
|
||||
ret;
|
||||
"While dropping Worker Handle, failed to shutdown worker, worker might be in inconsistent state."
|
||||
);
|
||||
} else {
|
||||
info!("Flow Worker shutdown due to Worker Handle dropped.")
|
||||
}
|
||||
info!("Flow Worker shutdown due to Worker Handle dropped.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -496,6 +501,19 @@ mod test {
|
||||
use crate::plan::Plan;
|
||||
use crate::repr::{RelationType, Row};
|
||||
|
||||
#[test]
|
||||
fn drop_handle() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let worker_thread_handle = std::thread::spawn(move || {
|
||||
let (handle, mut worker) = create_worker();
|
||||
tx.send(handle).unwrap();
|
||||
worker.run();
|
||||
});
|
||||
let handle = rx.blocking_recv().unwrap();
|
||||
drop(handle);
|
||||
worker_thread_handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_simple_get_with_worker_and_handle() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -532,7 +550,7 @@ mod test {
|
||||
tx.send((Row::empty(), 0, 0)).unwrap();
|
||||
handle.run_available(0).await.unwrap();
|
||||
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
|
||||
handle.shutdown().await.unwrap();
|
||||
drop(handle);
|
||||
worker_thread_handle.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,10 +21,10 @@ use bytes::BytesMut;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::DfRecordBatch;
|
||||
use datafusion_physical_expr::PhysicalExpr;
|
||||
use datatypes::arrow_array;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::{arrow_array, value};
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -155,8 +155,10 @@ pub enum ScalarExpr {
|
||||
exprs: Vec<ScalarExpr>,
|
||||
},
|
||||
CallDf {
|
||||
// TODO(discord9): support shuffle
|
||||
/// invariant: the input args set inside this [`DfScalarFunction`] is
|
||||
/// always col(0) to col(n-1) where n is the length of `expr`
|
||||
df_scalar_fn: DfScalarFunction,
|
||||
exprs: Vec<ScalarExpr>,
|
||||
},
|
||||
/// Conditionally evaluated expressions.
|
||||
///
|
||||
@@ -189,8 +191,27 @@ impl DfScalarFunction {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn try_from_raw_fn(raw_fn: RawDfScalarFn) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
fn_impl: raw_fn.get_fn_impl()?,
|
||||
df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?),
|
||||
raw_fn,
|
||||
})
|
||||
}
|
||||
|
||||
/// eval a list of expressions using input values
|
||||
fn eval_args(values: &[Value], exprs: &[ScalarExpr]) -> Result<Vec<Value>, EvalError> {
|
||||
exprs
|
||||
.iter()
|
||||
.map(|expr| expr.eval(values))
|
||||
.collect::<Result<_, _>>()
|
||||
}
|
||||
|
||||
// TODO(discord9): add RecordBatch support
|
||||
pub fn eval(&self, values: &[Value]) -> Result<Value, EvalError> {
|
||||
pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
|
||||
// first eval exprs to construct values to feed to datafusion
|
||||
let values: Vec<_> = Self::eval_args(values, exprs)?;
|
||||
|
||||
if values.is_empty() {
|
||||
return InvalidArgumentSnafu {
|
||||
reason: "values is empty".to_string(),
|
||||
@@ -259,16 +280,18 @@ impl<'de> serde::de::Deserialize<'de> for DfScalarFunction {
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
let raw_fn = RawDfScalarFn::deserialize(deserializer)?;
|
||||
let fn_impl = raw_fn.get_fn_impl().map_err(serde::de::Error::custom)?;
|
||||
DfScalarFunction::new(raw_fn, fn_impl).map_err(serde::de::Error::custom)
|
||||
DfScalarFunction::try_from_raw_fn(raw_fn).map_err(serde::de::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct RawDfScalarFn {
|
||||
f: bytes::BytesMut,
|
||||
input_schema: RelationDesc,
|
||||
extensions: FunctionExtensions,
|
||||
/// The raw bytes encoded datafusion scalar function
|
||||
pub(crate) f: bytes::BytesMut,
|
||||
/// The input schema of the function
|
||||
pub(crate) input_schema: RelationDesc,
|
||||
/// Extension contains mapping from function reference to function name
|
||||
pub(crate) extensions: FunctionExtensions,
|
||||
}
|
||||
|
||||
impl RawDfScalarFn {
|
||||
@@ -354,7 +377,7 @@ impl ScalarExpr {
|
||||
Ok(ColumnType::new_nullable(func.signature().output))
|
||||
}
|
||||
ScalarExpr::If { then, .. } => then.typ(context),
|
||||
ScalarExpr::CallDf { df_scalar_fn } => {
|
||||
ScalarExpr::CallDf { df_scalar_fn, .. } => {
|
||||
let arrow_typ = df_scalar_fn
|
||||
.fn_impl
|
||||
// TODO(discord9): get scheme from args instead?
|
||||
@@ -445,7 +468,10 @@ impl ScalarExpr {
|
||||
}
|
||||
.fail(),
|
||||
},
|
||||
ScalarExpr::CallDf { df_scalar_fn } => df_scalar_fn.eval(values),
|
||||
ScalarExpr::CallDf {
|
||||
df_scalar_fn,
|
||||
exprs,
|
||||
} => df_scalar_fn.eval(values, exprs),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -614,7 +640,15 @@ impl ScalarExpr {
|
||||
f(then)?;
|
||||
f(els)
|
||||
}
|
||||
_ => Ok(()),
|
||||
ScalarExpr::CallDf {
|
||||
df_scalar_fn: _,
|
||||
exprs,
|
||||
} => {
|
||||
for expr in exprs {
|
||||
f(expr)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -650,7 +684,15 @@ impl ScalarExpr {
|
||||
f(then)?;
|
||||
f(els)
|
||||
}
|
||||
_ => Ok(()),
|
||||
ScalarExpr::CallDf {
|
||||
df_scalar_fn: _,
|
||||
exprs,
|
||||
} => {
|
||||
for expr in exprs {
|
||||
f(expr)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -852,11 +894,15 @@ mod test {
|
||||
.unwrap();
|
||||
let extensions = FunctionExtensions::from_iter(vec![(0, "abs")]);
|
||||
let raw_fn = RawDfScalarFn::from_proto(&raw_scalar_func, input_schema, extensions).unwrap();
|
||||
let fn_impl = raw_fn.get_fn_impl().unwrap();
|
||||
let df_func = DfScalarFunction::new(raw_fn, fn_impl).unwrap();
|
||||
let df_func = DfScalarFunction::try_from_raw_fn(raw_fn).unwrap();
|
||||
let as_str = serde_json::to_string(&df_func).unwrap();
|
||||
let from_str: DfScalarFunction = serde_json::from_str(&as_str).unwrap();
|
||||
assert_eq!(df_func, from_str);
|
||||
assert_eq!(df_func.eval(&[Value::Null]).unwrap(), Value::Int64(1));
|
||||
assert_eq!(
|
||||
df_func
|
||||
.eval(&[Value::Null], &[ScalarExpr::Column(0)])
|
||||
.unwrap(),
|
||||
Value::Int64(1)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::adapter::error::{DatafusionSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu};
|
||||
use crate::adapter::error::{
|
||||
DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu,
|
||||
};
|
||||
use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr};
|
||||
|
||||
/// a set of column indices that are "keys" for the collection.
|
||||
@@ -93,13 +95,19 @@ pub struct RelationType {
|
||||
///
|
||||
/// A collection can contain multiple sets of keys, although it is common to
|
||||
/// have either zero or one sets of key indices.
|
||||
#[serde(default)]
|
||||
pub keys: Vec<Key>,
|
||||
/// optionally indicate the column that is TIME INDEX
|
||||
pub time_index: Option<usize>,
|
||||
/// mark all the columns that are added automatically by flow, but are not present in original sql
|
||||
pub auto_columns: Vec<usize>,
|
||||
}
|
||||
|
||||
impl RelationType {
|
||||
pub fn with_autos(mut self, auto_cols: &[usize]) -> Self {
|
||||
self.auto_columns = auto_cols.to_vec();
|
||||
self
|
||||
}
|
||||
|
||||
/// Trying to apply a mpf on current types, will return a new RelationType
|
||||
/// with the new types, will also try to preserve keys&time index information
|
||||
/// if the old key&time index columns are preserve in given mfp
|
||||
@@ -155,10 +163,16 @@ impl RelationType {
|
||||
let time_index = self
|
||||
.time_index
|
||||
.and_then(|old| old_to_new_col.get(&old).cloned());
|
||||
let auto_columns = self
|
||||
.auto_columns
|
||||
.iter()
|
||||
.filter_map(|old| old_to_new_col.get(old).cloned())
|
||||
.collect_vec();
|
||||
Ok(Self {
|
||||
column_types: mfp_out_types,
|
||||
keys,
|
||||
time_index,
|
||||
auto_columns,
|
||||
})
|
||||
}
|
||||
/// Constructs a `RelationType` representing the relation with no columns and
|
||||
@@ -175,6 +189,7 @@ impl RelationType {
|
||||
column_types,
|
||||
keys: Vec::new(),
|
||||
time_index: None,
|
||||
auto_columns: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,6 +355,16 @@ pub struct RelationDesc {
|
||||
}
|
||||
|
||||
impl RelationDesc {
|
||||
pub fn len(&self) -> Result<usize> {
|
||||
ensure!(
|
||||
self.typ.column_types.len() == self.names.len(),
|
||||
InternalSnafu {
|
||||
reason: "Expect typ and names field to be of same length"
|
||||
}
|
||||
);
|
||||
Ok(self.names.len())
|
||||
}
|
||||
|
||||
pub fn to_df_schema(&self) -> Result<DFSchema> {
|
||||
let fields: Vec<_> = self
|
||||
.iter()
|
||||
|
||||
@@ -348,8 +348,10 @@ impl TypedPlan {
|
||||
let mut output_types = Vec::new();
|
||||
// give best effort to get column name
|
||||
let mut output_names = Vec::new();
|
||||
// mark all auto added cols
|
||||
let mut auto_cols = vec![];
|
||||
// first append group_expr as key, then aggr_expr as value
|
||||
for expr in &group_exprs {
|
||||
for (idx, expr) in group_exprs.iter().enumerate() {
|
||||
output_types.push(expr.typ.clone());
|
||||
let col_name = match &expr.expr {
|
||||
ScalarExpr::CallUnary {
|
||||
@@ -359,7 +361,10 @@ impl TypedPlan {
|
||||
ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowCeiling { .. },
|
||||
..
|
||||
} => Some("window_end".to_string()),
|
||||
} => {
|
||||
auto_cols.push(idx);
|
||||
Some("window_end".to_string())
|
||||
}
|
||||
ScalarExpr::Column(col) => input.schema.get_name(*col).clone(),
|
||||
_ => None,
|
||||
};
|
||||
@@ -380,6 +385,7 @@ impl TypedPlan {
|
||||
RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
|
||||
}
|
||||
.with_time_index(time_index)
|
||||
.with_autos(&auto_cols)
|
||||
.into_named(output_names)
|
||||
};
|
||||
|
||||
@@ -455,17 +461,19 @@ impl TypedPlan {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use bytes::BytesMut;
|
||||
use common_time::{DateTime, Interval};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use pretty_assertions::{assert_eq, assert_ne};
|
||||
|
||||
use super::*;
|
||||
use crate::expr::{DfScalarFunction, RawDfScalarFn};
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
|
||||
/// TODO(discord9): add more illegal sql tests
|
||||
#[tokio::test]
|
||||
async fn tes_missing_key_check() {
|
||||
async fn test_missing_key_check() {
|
||||
let engine = create_test_query_engine();
|
||||
let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
|
||||
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||
@@ -473,6 +481,282 @@ mod test {
|
||||
let mut ctx = create_test_ctx();
|
||||
assert!(TypedPlan::from_substrait_plan(&mut ctx, &plan).is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_df_func_basic() {
|
||||
let engine = create_test_query_engine();
|
||||
let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');";
|
||||
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||
|
||||
let mut ctx = create_test_ctx();
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
|
||||
|
||||
let aggr_expr = AggregateExpr {
|
||||
func: AggregateFunc::SumUInt32,
|
||||
expr: ScalarExpr::Column(0),
|
||||
distinct: false,
|
||||
};
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![
|
||||
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
])
|
||||
.with_key(vec![2])
|
||||
.with_time_index(Some(1))
|
||||
.with_autos(&[2])
|
||||
.into_named(vec![
|
||||
None,
|
||||
Some("window_start".to_string()),
|
||||
Some("window_end".to_string()),
|
||||
]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(1)),
|
||||
}
|
||||
.with_types(
|
||||
RelationType::new(vec![
|
||||
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
|
||||
])
|
||||
.into_named(vec![
|
||||
Some("number".to_string()),
|
||||
Some("ts".to_string()),
|
||||
]),
|
||||
),
|
||||
),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(2)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowFloor {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
1_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowCeiling {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
1_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![2, 3])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(2)
|
||||
.map(vec![ScalarExpr::CallDf {
|
||||
df_scalar_fn: DfScalarFunction::try_from_raw_fn(
|
||||
RawDfScalarFn {
|
||||
f: BytesMut::from(
|
||||
b"\x08\x01\"\x08\x1a\x06\x12\x04\n\x02\x12\0"
|
||||
.as_ref(),
|
||||
),
|
||||
input_schema: RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
)])
|
||||
.into_unnamed(),
|
||||
extensions: FunctionExtensions {
|
||||
anchor_to_name: BTreeMap::from([
|
||||
(0, "tumble".to_string()),
|
||||
(1, "abs".to_string()),
|
||||
(2, "sum".to_string()),
|
||||
]),
|
||||
},
|
||||
},
|
||||
)
|
||||
.unwrap(),
|
||||
exprs: vec![ScalarExpr::Column(0)],
|
||||
}])
|
||||
.unwrap()
|
||||
.project(vec![2])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}
|
||||
.with_types(
|
||||
RelationType::new(vec![
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
|
||||
])
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0))
|
||||
.with_autos(&[1])
|
||||
.into_named(vec![
|
||||
Some("window_start".to_string()),
|
||||
Some("window_end".to_string()),
|
||||
None,
|
||||
]),
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(3)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(2),
|
||||
ScalarExpr::Column(3),
|
||||
ScalarExpr::Column(0),
|
||||
ScalarExpr::Column(1),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![4, 5, 6])
|
||||
.unwrap(),
|
||||
},
|
||||
};
|
||||
assert_eq!(expected, flow_plan);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_df_func_expr_tree() {
|
||||
let engine = create_test_query_engine();
|
||||
let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');";
|
||||
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||
|
||||
let mut ctx = create_test_ctx();
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
|
||||
|
||||
let aggr_expr = AggregateExpr {
|
||||
func: AggregateFunc::SumUInt32,
|
||||
expr: ScalarExpr::Column(0),
|
||||
distinct: false,
|
||||
};
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![
|
||||
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
])
|
||||
.with_key(vec![2])
|
||||
.with_time_index(Some(1))
|
||||
.with_autos(&[2])
|
||||
.into_named(vec![
|
||||
None,
|
||||
Some("window_start".to_string()),
|
||||
Some("window_end".to_string()),
|
||||
]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(1)),
|
||||
}
|
||||
.with_types(
|
||||
RelationType::new(vec![
|
||||
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
|
||||
])
|
||||
.into_named(vec![
|
||||
Some("number".to_string()),
|
||||
Some("ts".to_string()),
|
||||
]),
|
||||
),
|
||||
),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(2)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowFloor {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
1_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowCeiling {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
1_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![2, 3])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(2)
|
||||
.project(vec![0, 1])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}
|
||||
.with_types(
|
||||
RelationType::new(vec![
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
|
||||
])
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0))
|
||||
.with_autos(&[1])
|
||||
.into_named(vec![
|
||||
Some("window_start".to_string()),
|
||||
Some("window_end".to_string()),
|
||||
None,
|
||||
]),
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(3)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(2),
|
||||
ScalarExpr::CallDf {
|
||||
df_scalar_fn: DfScalarFunction::try_from_raw_fn(RawDfScalarFn {
|
||||
f: BytesMut::from(b"\"\x08\x1a\x06\x12\x04\n\x02\x12\0".as_ref()),
|
||||
input_schema: RelationType::new(vec![ColumnType::new(
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
true,
|
||||
)])
|
||||
.into_unnamed(),
|
||||
extensions: FunctionExtensions {
|
||||
anchor_to_name: BTreeMap::from([
|
||||
(0, "abs".to_string()),
|
||||
(1, "tumble".to_string()),
|
||||
(2, "sum".to_string()),
|
||||
]),
|
||||
},
|
||||
})
|
||||
.unwrap(),
|
||||
exprs: vec![ScalarExpr::Column(3)],
|
||||
},
|
||||
ScalarExpr::Column(0),
|
||||
ScalarExpr::Column(1),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![4, 5, 6])
|
||||
.unwrap(),
|
||||
},
|
||||
};
|
||||
assert_eq!(expected, flow_plan);
|
||||
}
|
||||
|
||||
/// TODO(discord9): add more illegal sql tests
|
||||
#[tokio::test]
|
||||
async fn test_tumble_composite() {
|
||||
@@ -581,6 +865,7 @@ mod test {
|
||||
])
|
||||
.with_key(vec![1, 2])
|
||||
.with_time_index(Some(0))
|
||||
.with_autos(&[1])
|
||||
.into_named(vec![
|
||||
Some("window_start".to_string()),
|
||||
Some("window_end".to_string()),
|
||||
@@ -610,6 +895,7 @@ mod test {
|
||||
])
|
||||
.with_key(vec![0, 3])
|
||||
.with_time_index(Some(2))
|
||||
.with_autos(&[3])
|
||||
.into_named(vec![
|
||||
Some("number".to_string()),
|
||||
None,
|
||||
@@ -642,15 +928,12 @@ mod test {
|
||||
])
|
||||
.with_key(vec![2])
|
||||
.with_time_index(Some(1))
|
||||
.with_autos(&[2])
|
||||
.into_named(vec![
|
||||
None,
|
||||
Some("window_start".to_string()),
|
||||
Some("window_end".to_string()),
|
||||
]),
|
||||
// TODO(discord9): mfp indirectly ref to key columns
|
||||
/*
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0)),*/
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
@@ -716,6 +999,7 @@ mod test {
|
||||
])
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0))
|
||||
.with_autos(&[1])
|
||||
.into_named(vec![
|
||||
Some("window_start".to_string()),
|
||||
Some("window_end".to_string()),
|
||||
@@ -760,6 +1044,7 @@ mod test {
|
||||
])
|
||||
.with_key(vec![2])
|
||||
.with_time_index(Some(1))
|
||||
.with_autos(&[2])
|
||||
.into_named(vec![
|
||||
None,
|
||||
Some("window_start".to_string()),
|
||||
@@ -830,6 +1115,7 @@ mod test {
|
||||
])
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0))
|
||||
.with_autos(&[1])
|
||||
.into_named(vec that references the i-th column of the input relation
|
||||
pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument {
|
||||
use substrait_proto::proto::expression;
|
||||
let expr = Expression {
|
||||
rex_type: Some(expression::RexType::Selection(Box::new(
|
||||
expression::FieldReference {
|
||||
reference_type: Some(expression::field_reference::ReferenceType::DirectReference(
|
||||
expression::ReferenceSegment {
|
||||
reference_type: Some(
|
||||
expression::reference_segment::ReferenceType::StructField(Box::new(
|
||||
expression::reference_segment::StructField {
|
||||
field: i as i32,
|
||||
child: None,
|
||||
},
|
||||
)),
|
||||
),
|
||||
},
|
||||
)),
|
||||
root_type: None,
|
||||
},
|
||||
))),
|
||||
};
|
||||
substrait_proto::proto::FunctionArgument {
|
||||
arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value(
|
||||
expr,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion
|
||||
fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction {
|
||||
let mut f_rewrite = f.clone();
|
||||
for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() {
|
||||
*raw_expr = proto_col(idx);
|
||||
}
|
||||
f_rewrite
|
||||
}
|
||||
|
||||
impl TypedExpr {
|
||||
pub fn from_substrait_to_datafusion_scalar_func(
|
||||
f: &ScalarFunction,
|
||||
input_schema: &RelationDesc,
|
||||
arg_exprs_typed: Vec<TypedExpr>,
|
||||
extensions: &FunctionExtensions,
|
||||
) -> Result<TypedExpr, Error> {
|
||||
let phy_expr = from_scalar_fn_to_df_fn_impl(f, input_schema, extensions)?;
|
||||
let raw_fn = RawDfScalarFn::from_proto(f, input_schema.clone(), extensions.clone())?;
|
||||
let expr = DfScalarFunction::new(raw_fn, phy_expr)?;
|
||||
let expr = ScalarExpr::CallDf { df_scalar_fn: expr };
|
||||
let (arg_exprs, arg_types): (Vec<_>, Vec<_>) =
|
||||
arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip();
|
||||
|
||||
let f_rewrite = rewrite_scalar_function(f);
|
||||
|
||||
let input_schema = RelationType::new(arg_types).into_unnamed();
|
||||
let raw_fn =
|
||||
RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?;
|
||||
|
||||
let df_func = DfScalarFunction::try_from_raw_fn(raw_fn)?;
|
||||
let expr = ScalarExpr::CallDf {
|
||||
df_scalar_fn: df_func,
|
||||
exprs: arg_exprs,
|
||||
};
|
||||
// df already know it's own schema, so not providing here
|
||||
let ret_type = expr.typ(&[])?;
|
||||
Ok(TypedExpr::new(expr, ret_type))
|
||||
}
|
||||
|
||||
/// Convert ScalarFunction into Flow's ScalarExpr
|
||||
pub fn from_substrait_scalar_func(
|
||||
f: &ScalarFunction,
|
||||
@@ -242,7 +291,7 @@ impl TypedExpr {
|
||||
} else {
|
||||
let try_as_df = Self::from_substrait_to_datafusion_scalar_func(
|
||||
f,
|
||||
input_schema,
|
||||
arg_typed_exprs,
|
||||
extensions,
|
||||
)?;
|
||||
Ok(try_as_df)
|
||||
@@ -395,6 +444,7 @@ mod test {
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
|
||||
|
||||
/// test if `WHERE` condition can be converted to Flow's ScalarExpr in mfp's filter
|
||||
#[tokio::test]
|
||||
async fn test_where_and() {
|
||||
@@ -608,39 +658,10 @@ mod test {
|
||||
)),
|
||||
}
|
||||
}
|
||||
fn col(i: usize) -> substrait_proto::proto::FunctionArgument {
|
||||
use substrait_proto::proto::expression;
|
||||
let expr = Expression {
|
||||
rex_type: Some(expression::RexType::Selection(Box::new(
|
||||
expression::FieldReference {
|
||||
reference_type: Some(
|
||||
expression::field_reference::ReferenceType::DirectReference(
|
||||
expression::ReferenceSegment {
|
||||
reference_type: Some(
|
||||
expression::reference_segment::ReferenceType::StructField(
|
||||
Box::new(expression::reference_segment::StructField {
|
||||
field: i as i32,
|
||||
child: None,
|
||||
}),
|
||||
),
|
||||
),
|
||||
},
|
||||
),
|
||||
),
|
||||
root_type: None,
|
||||
},
|
||||
))),
|
||||
};
|
||||
substrait_proto::proto::FunctionArgument {
|
||||
arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value(
|
||||
expr,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
let f = substrait_proto::proto::expression::ScalarFunction {
|
||||
function_reference: 0,
|
||||
arguments: vec![col(0)],
|
||||
arguments: vec![proto_col(0)],
|
||||
options: vec![],
|
||||
output_type: None,
|
||||
..Default::default()
|
||||
@@ -663,7 +684,7 @@ mod test {
|
||||
|
||||
let f = substrait_proto::proto::expression::ScalarFunction {
|
||||
function_reference: 0,
|
||||
arguments: vec![col(0), col(1)],
|
||||
arguments: vec![proto_col(0), proto_col(1)],
|
||||
options: vec![],
|
||||
output_type: None,
|
||||
..Default::default()
|
||||
@@ -690,7 +711,7 @@ mod test {
|
||||
|
||||
let f = substrait_proto::proto::expression::ScalarFunction {
|
||||
function_reference: 0,
|
||||
arguments: vec![col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
|
||||
arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
|
||||
options: vec![],
|
||||
output_type: None,
|
||||
..Default::default()
|
||||
@@ -718,7 +739,7 @@ mod test {
|
||||
|
||||
let f = substrait_proto::proto::expression::ScalarFunction {
|
||||
function_reference: 0,
|
||||
arguments: vec![col(0), lit("1 second")],
|
||||
arguments: vec![proto_col(0), lit("1 second")],
|
||||
options: vec![],
|
||||
output_type: None,
|
||||
..Default::default()
|
||||
|
||||
@@ -12,14 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
use substrait::substrait_proto_df::proto::{FilterRel, ReadRel};
|
||||
use substrait_proto::proto::expression::MaskExpression;
|
||||
use substrait_proto::proto::read_rel::ReadType;
|
||||
use substrait_proto::proto::rel::RelType;
|
||||
use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel};
|
||||
use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel};
|
||||
|
||||
use crate::adapter::error::{
|
||||
Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
|
||||
@@ -63,6 +64,152 @@ impl TypedPlan {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_substrait_project(
|
||||
ctx: &mut FlownodeContext,
|
||||
p: &ProjectRel,
|
||||
extensions: &FunctionExtensions,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
let input = if let Some(input) = p.input.as_ref() {
|
||||
TypedPlan::from_substrait_rel(ctx, input, extensions)?
|
||||
} else {
|
||||
return not_impl_err!("Projection without an input is not supported");
|
||||
};
|
||||
|
||||
// because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value
|
||||
// function to correctly transform it, and late rewrite it
|
||||
let schema_before_expand = {
|
||||
let input_schema = input.schema.clone();
|
||||
let auto_columns: HashSet<usize> =
|
||||
HashSet::from_iter(input_schema.typ().auto_columns.clone());
|
||||
let not_auto_added_columns = (0..input_schema.len()?)
|
||||
.filter(|i| !auto_columns.contains(i))
|
||||
.collect_vec();
|
||||
let mfp = MapFilterProject::new(input_schema.len()?)
|
||||
.project(not_auto_added_columns)?
|
||||
.into_safe();
|
||||
|
||||
input_schema.apply_mfp(&mfp)?
|
||||
};
|
||||
|
||||
let mut exprs: Vec<TypedExpr> = Vec::with_capacity(p.expressions.len());
|
||||
for e in &p.expressions {
|
||||
let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions)?;
|
||||
exprs.push(expr);
|
||||
}
|
||||
let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
|
||||
if is_literal {
|
||||
let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
|
||||
.into_iter()
|
||||
.map(|TypedExpr { expr, typ }| (expr, typ))
|
||||
.unzip();
|
||||
let typ = RelationType::new(lit_types);
|
||||
let row = literals
|
||||
.into_iter()
|
||||
.map(|lit| lit.as_literal().expect("A literal"))
|
||||
.collect_vec();
|
||||
let row = repr::Row::new(row);
|
||||
let plan = Plan::Constant {
|
||||
rows: vec![(row, repr::Timestamp::MIN, 1)],
|
||||
};
|
||||
Ok(TypedPlan {
|
||||
schema: typ.into_unnamed(),
|
||||
plan,
|
||||
})
|
||||
} else {
|
||||
match input.plan.clone() {
|
||||
Plan::Reduce { key_val_plan, .. } => {
|
||||
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
|
||||
}
|
||||
Plan::Mfp { input, mfp: _ } => {
|
||||
if let Plan::Reduce { key_val_plan, .. } = input.plan {
|
||||
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
input.projection(exprs)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_substrait_filter(
|
||||
ctx: &mut FlownodeContext,
|
||||
filter: &FilterRel,
|
||||
extensions: &FunctionExtensions,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
let input = if let Some(input) = filter.input.as_ref() {
|
||||
TypedPlan::from_substrait_rel(ctx, input, extensions)?
|
||||
} else {
|
||||
return not_impl_err!("Filter without an input is not supported");
|
||||
};
|
||||
|
||||
let expr = if let Some(condition) = filter.condition.as_ref() {
|
||||
TypedExpr::from_substrait_rex(condition, &input.schema, extensions)?
|
||||
} else {
|
||||
return not_impl_err!("Filter without an condition is not valid");
|
||||
};
|
||||
input.filter(expr)
|
||||
}
|
||||
|
||||
pub fn from_substrait_read(
|
||||
ctx: &mut FlownodeContext,
|
||||
read: &ReadRel,
|
||||
_extensions: &FunctionExtensions,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
if let Some(ReadType::NamedTable(nt)) = &read.read_type {
|
||||
let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
|
||||
reason: "Query context not found",
|
||||
})?;
|
||||
let table_reference = match nt.names.len() {
|
||||
1 => [
|
||||
query_ctx.current_catalog().to_string(),
|
||||
query_ctx.current_schema().to_string(),
|
||||
nt.names[0].clone(),
|
||||
],
|
||||
2 => [
|
||||
query_ctx.current_catalog().to_string(),
|
||||
nt.names[0].clone(),
|
||||
nt.names[1].clone(),
|
||||
],
|
||||
3 => [
|
||||
nt.names[0].clone(),
|
||||
nt.names[1].clone(),
|
||||
nt.names[2].clone(),
|
||||
],
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: "Expect table to have name",
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
let table = ctx.table(&table_reference)?;
|
||||
let get_table = Plan::Get {
|
||||
id: crate::expr::Id::Global(table.0),
|
||||
};
|
||||
let get_table = TypedPlan {
|
||||
schema: table.1,
|
||||
plan: get_table,
|
||||
};
|
||||
|
||||
if let Some(MaskExpression {
|
||||
select: Some(projection),
|
||||
..
|
||||
}) = &read.projection
|
||||
{
|
||||
let column_indices: Vec<usize> = projection
|
||||
.struct_items
|
||||
.iter()
|
||||
.map(|item| item.field as usize)
|
||||
.collect();
|
||||
let input_arity = get_table.schema.typ().column_types.len();
|
||||
let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?;
|
||||
get_table.mfp(mfp.into_safe())
|
||||
} else {
|
||||
Ok(get_table)
|
||||
}
|
||||
} else {
|
||||
not_impl_err!("Only NamedTable reads are supported")
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert Substrait Rel into Flow's TypedPlan
|
||||
/// TODO(discord9): SELECT DISTINCT(does it get compile with something else?)
|
||||
pub fn from_substrait_rel(
|
||||
@@ -71,133 +218,10 @@ impl TypedPlan {
|
||||
extensions: &FunctionExtensions,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
match &rel.rel_type {
|
||||
Some(RelType::Project(p)) => {
|
||||
let input = if let Some(input) = p.input.as_ref() {
|
||||
TypedPlan::from_substrait_rel(ctx, input, extensions)?
|
||||
} else {
|
||||
return not_impl_err!("Projection without an input is not supported");
|
||||
};
|
||||
|
||||
let mut exprs: Vec<TypedExpr> = vec![];
|
||||
for e in &p.expressions {
|
||||
let expr = TypedExpr::from_substrait_rex(e, &input.schema, extensions)?;
|
||||
exprs.push(expr);
|
||||
}
|
||||
let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
|
||||
if is_literal {
|
||||
let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
|
||||
.into_iter()
|
||||
.map(|TypedExpr { expr, typ }| (expr, typ))
|
||||
.unzip();
|
||||
let typ = RelationType::new(lit_types);
|
||||
let row = literals
|
||||
.into_iter()
|
||||
.map(|lit| lit.as_literal().expect("A literal"))
|
||||
.collect_vec();
|
||||
let row = repr::Row::new(row);
|
||||
let plan = Plan::Constant {
|
||||
rows: vec![(row, repr::Timestamp::MIN, 1)],
|
||||
};
|
||||
Ok(TypedPlan {
|
||||
schema: typ.into_unnamed(),
|
||||
plan,
|
||||
})
|
||||
} else {
|
||||
match input.plan.clone() {
|
||||
Plan::Reduce { key_val_plan, .. } => {
|
||||
rewrite_projection_after_reduce(
|
||||
key_val_plan,
|
||||
&input.schema,
|
||||
&mut exprs,
|
||||
)?;
|
||||
}
|
||||
Plan::Mfp { input, mfp: _ } => {
|
||||
if let Plan::Reduce { key_val_plan, .. } = input.plan {
|
||||
rewrite_projection_after_reduce(
|
||||
key_val_plan,
|
||||
&input.schema,
|
||||
&mut exprs,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
input.projection(exprs)
|
||||
}
|
||||
}
|
||||
Some(RelType::Filter(filter)) => {
|
||||
let input = if let Some(input) = filter.input.as_ref() {
|
||||
TypedPlan::from_substrait_rel(ctx, input, extensions)?
|
||||
} else {
|
||||
return not_impl_err!("Filter without an input is not supported");
|
||||
};
|
||||
|
||||
let expr = if let Some(condition) = filter.condition.as_ref() {
|
||||
TypedExpr::from_substrait_rex(condition, &input.schema, extensions)?
|
||||
} else {
|
||||
return not_impl_err!("Filter without an condition is not valid");
|
||||
};
|
||||
input.filter(expr)
|
||||
}
|
||||
Some(RelType::Read(read)) => {
|
||||
if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type {
|
||||
let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
|
||||
reason: "Query context not found",
|
||||
})?;
|
||||
let table_reference = match nt.names.len() {
|
||||
1 => [
|
||||
query_ctx.current_catalog().to_string(),
|
||||
query_ctx.current_schema().to_string(),
|
||||
nt.names[0].clone(),
|
||||
],
|
||||
2 => [
|
||||
query_ctx.current_catalog().to_string(),
|
||||
nt.names[0].clone(),
|
||||
nt.names[1].clone(),
|
||||
],
|
||||
3 => [
|
||||
nt.names[0].clone(),
|
||||
nt.names[1].clone(),
|
||||
nt.names[2].clone(),
|
||||
],
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: "Expect table to have name",
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
let table = ctx.table(&table_reference)?;
|
||||
let get_table = Plan::Get {
|
||||
id: crate::expr::Id::Global(table.0),
|
||||
};
|
||||
let get_table = TypedPlan {
|
||||
schema: table.1,
|
||||
plan: get_table,
|
||||
};
|
||||
|
||||
if let Some(MaskExpression {
|
||||
select: Some(projection),
|
||||
..
|
||||
}) = &read.projection
|
||||
{
|
||||
let column_indices: Vec<usize> = projection
|
||||
.struct_items
|
||||
.iter()
|
||||
.map(|item| item.field as usize)
|
||||
.collect();
|
||||
let input_arity = get_table.schema.typ().column_types.len();
|
||||
let mfp =
|
||||
MapFilterProject::new(input_arity).project(column_indices.clone())?;
|
||||
get_table.mfp(mfp.into_safe())
|
||||
} else {
|
||||
Ok(get_table)
|
||||
}
|
||||
} else {
|
||||
not_impl_err!("Only NamedTable reads are supported")
|
||||
}
|
||||
}
|
||||
Some(RelType::Aggregate(agg)) => {
|
||||
TypedPlan::from_substrait_agg_rel(ctx, agg, extensions)
|
||||
}
|
||||
Some(RelType::Project(p)) => Self::from_substrait_project(ctx, p.as_ref(), extensions),
|
||||
Some(RelType::Filter(filter)) => Self::from_substrait_filter(ctx, filter, extensions),
|
||||
Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions),
|
||||
Some(RelType::Aggregate(agg)) => Self::from_substrait_agg_rel(ctx, agg, extensions),
|
||||
_ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type),
|
||||
}
|
||||
}
|
||||
|
||||
61
tests/cases/standalone/flow/basic.result
Normal file
61
tests/cases/standalone/flow/basic.result
Normal file
@@ -0,0 +1,61 @@
|
||||
CREATE TABLE numbers_input (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE FLOW test_numbers
|
||||
SINK TO out_num_cnt
|
||||
AS
|
||||
SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO numbers_input
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+-------+---------------------+---------------------+
|
||||
|
||||
INSERT INTO numbers_input
|
||||
VALUES
|
||||
(23,"2021-07-01 00:00:01.000"),
|
||||
(24,"2021-07-01 00:00:01.500");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+-------+---------------------+---------------------+
|
||||
|
||||
DROP FLOW test_numbers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE numbers_input;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE out_num_cnt;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
31
tests/cases/standalone/flow/basic.sql
Normal file
31
tests/cases/standalone/flow/basic.sql
Normal file
@@ -0,0 +1,31 @@
|
||||
CREATE TABLE numbers_input (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
CREATE FLOW test_numbers
|
||||
SINK TO out_num_cnt
|
||||
AS
|
||||
SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
||||
|
||||
INSERT INTO numbers_input
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt;
|
||||
|
||||
INSERT INTO numbers_input
|
||||
VALUES
|
||||
(23,"2021-07-01 00:00:01.000"),
|
||||
(24,"2021-07-01 00:00:01.500");
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt;
|
||||
|
||||
DROP FLOW test_numbers;
|
||||
DROP TABLE numbers_input;
|
||||
DROP TABLE out_num_cnt;
|
||||
126
tests/cases/standalone/flow/df_func.result
Normal file
126
tests/cases/standalone/flow/df_func.result
Normal file
@@ -0,0 +1,126 @@
|
||||
CREATE TABLE numbers_input_df_func (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt_df_func
|
||||
AS
|
||||
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(-20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
-- sleep a little bit longer to make sure that table is created and data is inserted
|
||||
-- SQLNESS SLEEP 3s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+-------+---------------------+---------------------+
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(23,"2021-07-01 00:00:01.000"),
|
||||
(-24,"2021-07-01 00:00:01.500");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+-------+---------------------+---------------------+
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE numbers_input_df_func;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE out_num_cnt_df_func;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE numbers_input_df_func (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt_df_func
|
||||
AS
|
||||
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(-20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
-- sleep a little bit longer to make sure that table is created and data is inserted
|
||||
-- SQLNESS SLEEP 3s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+-------+---------------------+---------------------+
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(23,"2021-07-01 00:00:01.000"),
|
||||
(-24,"2021-07-01 00:00:01.500");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+-------+---------------------+---------------------+
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE numbers_input_df_func;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE out_num_cnt_df_func;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
67
tests/cases/standalone/flow/df_func.sql
Normal file
67
tests/cases/standalone/flow/df_func.sql
Normal file
@@ -0,0 +1,67 @@
|
||||
CREATE TABLE numbers_input_df_func (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt_df_func
|
||||
AS
|
||||
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(-20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
-- sleep a little bit longer to make sure that table is created and data is inserted
|
||||
-- SQLNESS SLEEP 3s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(23,"2021-07-01 00:00:01.000"),
|
||||
(-24,"2021-07-01 00:00:01.500");
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
DROP TABLE numbers_input_df_func;
|
||||
DROP TABLE out_num_cnt_df_func;
|
||||
|
||||
CREATE TABLE numbers_input_df_func (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt_df_func
|
||||
AS
|
||||
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(-20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
-- sleep a little bit longer to make sure that table is created and data is inserted
|
||||
-- SQLNESS SLEEP 3s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
INSERT INTO numbers_input_df_func
|
||||
VALUES
|
||||
(23,"2021-07-01 00:00:01.000"),
|
||||
(-24,"2021-07-01 00:00:01.500");
|
||||
|
||||
-- SQLNESS SLEEP 2s
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
DROP TABLE numbers_input_df_func;
|
||||
DROP TABLE out_num_cnt_df_func;
|
||||
@@ -17,7 +17,8 @@ common-recordbatch.workspace = true
|
||||
common-time.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sqlness = { version = "0.5" }
|
||||
# sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1
|
||||
sqlness = "0.6.1"
|
||||
tempfile.workspace = true
|
||||
tinytemplate = "1.2"
|
||||
tokio.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user