feat: optimizer rule to pass expected output ordering hint (#1675)

* move type convertsion rule into optimizer dir

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement order_hint rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* it works!

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use column name instead

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* accomplish test case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update lock file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-02 11:43:51 +08:00
committed by GitHub
parent ff6d11ddc7
commit e5b6f8654a
18 changed files with 670 additions and 420 deletions

4
Cargo.lock generated
View File

@@ -2275,9 +2275,9 @@ dependencies = [
[[package]]
name = "csv"
version = "1.2.1"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad"
checksum = "626ae34994d3d8d668f4269922248239db4ae42d538b14c398b74a52208e8086"
dependencies = [
"csv-core",
"itoa",

View File

@@ -163,6 +163,7 @@ impl Table for InformationTable {
let stream = RecordBatchStreamAdaptor {
schema: projected_schema,
stream: Box::pin(stream),
output_ordering: None,
};
Ok(Box::pin(stream))
}

View File

@@ -43,9 +43,9 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub struct OrderOption {
pub index: usize,
pub name: String,
pub options: SortOptions,
}
@@ -196,12 +196,17 @@ impl Stream for SimpleRecordBatchStream {
pub struct RecordBatchStreamAdaptor {
pub schema: SchemaRef,
pub stream: Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>,
pub output_ordering: Option<Vec<OrderOption>>,
}
impl RecordBatchStream for RecordBatchStreamAdaptor {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.output_ordering.as_deref()
}
}
impl Stream for RecordBatchStreamAdaptor {

View File

@@ -222,7 +222,11 @@ impl Table for DistTable {
}
},
);
let record_batch_stream = RecordBatchStreamAdaptor { schema, stream };
let record_batch_stream = RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
};
Ok(Box::pin(record_batch_stream))
}

View File

@@ -208,7 +208,11 @@ impl<R: Region> Table for MitoTable<R> {
}
});
let stream = Box::pin(RecordBatchStreamAdaptor { schema, stream });
let stream = Box::pin(RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
});
Ok(Arc::new(StreamScanAdapter::new(stream)))
}
@@ -275,6 +279,7 @@ impl<R: Region> Table for MitoTable<R> {
})?;
let schema = stream_schema.clone();
let output_ordering = readers.get(0).and_then(|reader| reader.output_ordering());
let stream = Box::pin(async_stream::try_stream! {
for mut reader in readers {
@@ -285,7 +290,11 @@ impl<R: Region> Table for MitoTable<R> {
}
});
Ok(Box::pin(RecordBatchStreamAdaptor { schema, stream }))
Ok(Box::pin(RecordBatchStreamAdaptor {
schema,
stream,
output_ordering,
}))
}
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult<Vec<FilterPushDownType>> {

View File

@@ -56,4 +56,5 @@ statrs = "0.16"
stats-cli = "3.0"
store-api = { path = "../store-api" }
streaming-stats = "0.2"
table = { path = "../table", features = ["testing"] }
tokio-stream = "0.1"

View File

@@ -12,399 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue};
use datafusion_expr::expr::InList;
use datafusion_expr::{
Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan,
};
use datafusion_optimizer::analyzer::AnalyzerRule;
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::DataType;
/// TypeConversionRule converts some literal values in logical plan to other types according
/// to data type of corresponding columns.
/// Specifically:
/// - string literal of timestamp is converted to `Expr::Literal(ScalarValue::TimestampMillis)`
/// - string literal of boolean is converted to `Expr::Literal(ScalarValue::Boolean)`
pub struct TypeConversionRule;
impl AnalyzerRule for TypeConversionRule {
// TODO(ruihang): fix this warning
#[allow(deprecated)]
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
let schemas = plan.all_schemas().into_iter().cloned().collect::<Vec<_>>();
plan.transform(&|plan| match plan {
LogicalPlan::Filter(filter) => {
let mut converter = TypeConverter {
schemas: schemas.clone(),
};
let rewritten = filter.predicate.clone().rewrite(&mut converter)?;
Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
rewritten,
filter.input,
)?)))
}
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
projected_schema,
filters,
fetch,
}) => {
let mut converter = TypeConverter {
schemas: schemas.clone(),
};
let rewrite_filters = filters
.into_iter()
.map(|e| e.rewrite(&mut converter))
.collect::<Result<Vec<_>>>()?;
Ok(Transformed::Yes(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection,
projected_schema,
filters: rewrite_filters,
fetch,
})))
}
LogicalPlan::Projection { .. }
| LogicalPlan::Window { .. }
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Distinct { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Analyze { .. } => {
let mut converter = TypeConverter {
schemas: plan.all_schemas().into_iter().cloned().collect(),
};
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
let expr = plan
.expressions()
.into_iter()
.map(|e| e.rewrite(&mut converter))
.collect::<Result<Vec<_>>>()?;
datafusion_expr::utils::from_plan(&plan, &expr, &inputs).map(Transformed::Yes)
}
LogicalPlan::Subquery { .. }
| LogicalPlan::SubqueryAlias { .. }
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Dml(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Ddl(_) => Ok(Transformed::No(plan)),
})
}
fn name(&self) -> &str {
"TypeConversionRule"
}
}
struct TypeConverter {
schemas: Vec<DFSchemaRef>,
}
impl TypeConverter {
fn column_type(&self, expr: &Expr) -> Option<DataType> {
if let Expr::Column(_) = expr {
for schema in &self.schemas {
if let Ok(v) = expr.get_type(schema) {
return Some(v);
}
}
}
None
}
fn cast_scalar_value(value: &ScalarValue, target_type: &DataType) -> Result<ScalarValue> {
match (target_type, value) {
(DataType::Timestamp(_, _), ScalarValue::Utf8(Some(v))) => string_to_timestamp_ms(v),
(DataType::Boolean, ScalarValue::Utf8(Some(v))) => match v.to_lowercase().as_str() {
"true" => Ok(ScalarValue::Boolean(Some(true))),
"false" => Ok(ScalarValue::Boolean(Some(false))),
_ => Ok(ScalarValue::Boolean(None)),
},
(target_type, value) => {
let value_arr = value.to_array();
let arr =
compute::cast(&value_arr, target_type).map_err(DataFusionError::ArrowError)?;
ScalarValue::try_from_array(
&arr,
0, // index: Converts a value in `array` at `index` into a ScalarValue
)
}
}
}
fn convert_type<'b>(&self, mut left: &'b Expr, mut right: &'b Expr) -> Result<(Expr, Expr)> {
let left_type = self.column_type(left);
let right_type = self.column_type(right);
let mut reverse = false;
let left_type = match (&left_type, &right_type) {
(Some(v), None) => v,
(None, Some(v)) => {
reverse = true;
std::mem::swap(&mut left, &mut right);
v
}
_ => return Ok((left.clone(), right.clone())),
};
match (left, right) {
(Expr::Column(col), Expr::Literal(value)) => {
let casted_right = Self::cast_scalar_value(value, left_type)?;
if casted_right.is_null() {
return Err(DataFusionError::Plan(format!(
"column:{col:?} value:{value:?} is invalid",
)));
}
if reverse {
Ok((Expr::Literal(casted_right), left.clone()))
} else {
Ok((left.clone(), Expr::Literal(casted_right)))
}
}
_ => Ok((left.clone(), right.clone())),
}
}
}
impl TreeNodeRewriter for TypeConverter {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
let new_expr = match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq => {
let (left, right) = self.convert_type(&left, &right)?;
Expr::BinaryExpr(BinaryExpr {
left: Box::new(left),
op,
right: Box::new(right),
})
}
_ => Expr::BinaryExpr(BinaryExpr { left, op, right }),
},
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let (expr, low) = self.convert_type(&expr, &low)?;
let (expr, high) = self.convert_type(&expr, &high)?;
Expr::Between(Between {
expr: Box::new(expr),
negated,
low: Box::new(low),
high: Box::new(high),
})
}
Expr::InList(InList {
expr,
list,
negated,
}) => {
let mut list_expr = Vec::with_capacity(list.len());
for e in list {
let (_, expr_conversion) = self.convert_type(&expr, &e)?;
list_expr.push(expr_conversion);
}
Expr::InList(InList {
expr,
list: list_expr,
negated,
})
}
Expr::Literal(value) => match value {
ScalarValue::TimestampSecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Second)
}
ScalarValue::TimestampMillisecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Millisecond)
}
ScalarValue::TimestampMicrosecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Microsecond)
}
ScalarValue::TimestampNanosecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Nanosecond)
}
_ => Expr::Literal(value),
},
expr => expr,
};
Ok(new_expr)
}
}
fn timestamp_to_timestamp_ms_expr(val: i64, unit: TimeUnit) -> Expr {
let timestamp = match unit {
TimeUnit::Second => val * 1_000,
TimeUnit::Millisecond => val,
TimeUnit::Microsecond => val / 1_000,
TimeUnit::Nanosecond => val / 1_000 / 1_000,
};
Expr::Literal(ScalarValue::TimestampMillisecond(Some(timestamp), None))
}
fn string_to_timestamp_ms(string: &str) -> Result<ScalarValue> {
Ok(ScalarValue::TimestampMillisecond(
Some(
Timestamp::from_str(string)
.map(|t| t.value() / 1_000_000)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
),
None,
))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use datafusion_common::{Column, DFField, DFSchema};
use datafusion_sql::TableReference;
use super::*;
#[test]
fn test_string_to_timestamp_ms() {
assert!(matches!(
string_to_timestamp_ms("2022-02-02 19:00:00+08:00").unwrap(),
ScalarValue::TimestampMillisecond(Some(1643799600000), None)
));
assert!(matches!(
string_to_timestamp_ms("2009-02-13 23:31:30Z").unwrap(),
ScalarValue::TimestampMillisecond(Some(1234567890000), None)
));
}
#[test]
fn test_timestamp_to_timestamp_ms_expr() {
assert!(matches!(
timestamp_to_timestamp_ms_expr(123, TimeUnit::Second),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123000), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123, TimeUnit::Millisecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123, TimeUnit::Microsecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(1230, TimeUnit::Microsecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(1), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123000, TimeUnit::Microsecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(1230, TimeUnit::Nanosecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123_000_000, TimeUnit::Nanosecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None))
));
}
#[test]
fn test_convert_timestamp_str() {
use datatypes::arrow::datatypes::TimeUnit as ArrowTimeUnit;
let schema_ref = Arc::new(
DFSchema::new_with_metadata(
vec![DFField::new(
None::<TableReference>,
"ts",
DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
true,
)],
HashMap::new(),
)
.unwrap(),
);
let mut converter = TypeConverter {
schemas: vec![schema_ref],
};
assert_eq!(
Expr::Column(Column::from_name("ts")).gt(Expr::Literal(
ScalarValue::TimestampMillisecond(Some(1599514949000), None)
)),
converter
.mutate(
Expr::Column(Column::from_name("ts")).gt(Expr::Literal(ScalarValue::Utf8(
Some("2020-09-08T05:42:29+08:00".to_string()),
)))
)
.unwrap()
);
}
#[test]
fn test_convert_bool() {
let col_name = "is_valid";
let schema_ref = Arc::new(
DFSchema::new_with_metadata(
vec![DFField::new(
None::<TableReference>,
col_name,
DataType::Boolean,
false,
)],
HashMap::new(),
)
.unwrap(),
);
let mut converter = TypeConverter {
schemas: vec![schema_ref],
};
assert_eq!(
Expr::Column(Column::from_name(col_name))
.eq(Expr::Literal(ScalarValue::Boolean(Some(true)))),
converter
.mutate(
Expr::Column(Column::from_name(col_name))
.eq(Expr::Literal(ScalarValue::Utf8(Some("true".to_string()))))
)
.unwrap()
);
}
}
pub mod order_hint;
pub mod type_conversion;

View File

@@ -0,0 +1,168 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_schema::SortOptions;
use common_recordbatch::OrderOption;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::Result as DataFusionResult;
use datafusion_expr::expr::Sort;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use table::table::adapter::DfTableProviderAdapter;
/// This rule will pass the nearest order requirement to the leaf table
/// scan node as ordering hint.
pub struct OrderHintRule;
impl OptimizerRule for OrderHintRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> DataFusionResult<Option<LogicalPlan>> {
Self::optimize(plan).map(Some)
}
fn name(&self) -> &str {
"OrderHintRule"
}
}
impl OrderHintRule {
fn optimize(plan: &LogicalPlan) -> DataFusionResult<LogicalPlan> {
let mut visitor = OrderHintVisitor::default();
plan.visit(&mut visitor)?;
if let Some(order_expr) = visitor.order_expr.take() {
plan.clone()
.transform_down(&|plan| Self::set_ordering_hint(plan, &order_expr))
} else {
Ok(plan.clone())
}
}
fn set_ordering_hint(
plan: LogicalPlan,
order_expr: &[Sort],
) -> DataFusionResult<Transformed<LogicalPlan>> {
match &plan {
LogicalPlan::TableScan(table_scan) => {
let mut transformed = false;
if let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
if let Some(adapter) = source
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
let mut opts = Vec::with_capacity(order_expr.len());
for sort in order_expr {
let name = match sort.expr.try_into_col() {
Ok(col) => col.name,
Err(_) => return Ok(Transformed::No(plan)),
};
opts.push(OrderOption {
name,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
})
}
adapter.with_ordering_hint(&opts);
transformed = true;
}
}
if transformed {
Ok(Transformed::Yes(plan))
} else {
Ok(Transformed::No(plan))
}
}
_ => Ok(Transformed::No(plan)),
}
}
}
/// Find the most closest order requirement to the leaf node.
#[derive(Default)]
struct OrderHintVisitor {
order_expr: Option<Vec<Sort>>,
}
impl TreeNodeVisitor for OrderHintVisitor {
type N = LogicalPlan;
fn pre_visit(&mut self, node: &Self::N) -> DataFusionResult<VisitRecursion> {
if let LogicalPlan::Sort(sort) = node {
let mut exprs = vec![];
for expr in &sort.expr {
if let Expr::Sort(sort_expr) = expr {
exprs.push(sort_expr.clone());
}
}
self.order_expr = Some(exprs);
}
Ok(VisitRecursion::Continue)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion_expr::{col, LogicalPlanBuilder};
use datafusion_optimizer::OptimizerContext;
use table::table::numbers::NumbersTable;
use super::*;
#[test]
#[allow(clippy::bool_assert_comparison)]
fn set_order_hint() {
let numbers_table = Arc::new(NumbersTable::new(0)) as _;
let adapter = Arc::new(DfTableProviderAdapter::new(numbers_table));
let table_source = Arc::new(DefaultTableSource::new(adapter.clone()));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
.unwrap()
.sort(vec![col("number").sort(true, false)])
.unwrap()
.sort(vec![col("number").sort(false, true)])
.unwrap()
.build()
.unwrap();
let context = OptimizerContext::default();
OrderHintRule.try_optimize(&plan, &context).unwrap();
// should read the first (with `.sort(true, false)`) sort option
let scan_req = adapter.get_scan_req();
assert_eq!("number", &scan_req.output_ordering.clone().unwrap()[0].name);
assert_eq!(
true,
!scan_req.output_ordering.clone().unwrap()[0]
.options
.descending // the previous parameter is `asc`
);
assert_eq!(
false,
scan_req.output_ordering.unwrap()[0].options.nulls_first
);
}
}

View File

@@ -0,0 +1,410 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue};
use datafusion_expr::expr::InList;
use datafusion_expr::{
Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan,
};
use datafusion_optimizer::analyzer::AnalyzerRule;
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::DataType;
/// TypeConversionRule converts some literal values in logical plan to other types according
/// to data type of corresponding columns.
/// Specifically:
/// - string literal of timestamp is converted to `Expr::Literal(ScalarValue::TimestampMillis)`
/// - string literal of boolean is converted to `Expr::Literal(ScalarValue::Boolean)`
pub struct TypeConversionRule;
impl AnalyzerRule for TypeConversionRule {
// TODO(ruihang): fix this warning
#[allow(deprecated)]
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
let schemas = plan.all_schemas().into_iter().cloned().collect::<Vec<_>>();
plan.transform(&|plan| match plan {
LogicalPlan::Filter(filter) => {
let mut converter = TypeConverter {
schemas: schemas.clone(),
};
let rewritten = filter.predicate.clone().rewrite(&mut converter)?;
Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
rewritten,
filter.input,
)?)))
}
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
projected_schema,
filters,
fetch,
}) => {
let mut converter = TypeConverter {
schemas: schemas.clone(),
};
let rewrite_filters = filters
.into_iter()
.map(|e| e.rewrite(&mut converter))
.collect::<Result<Vec<_>>>()?;
Ok(Transformed::Yes(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection,
projected_schema,
filters: rewrite_filters,
fetch,
})))
}
LogicalPlan::Projection { .. }
| LogicalPlan::Window { .. }
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Distinct { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Analyze { .. } => {
let mut converter = TypeConverter {
schemas: plan.all_schemas().into_iter().cloned().collect(),
};
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
let expr = plan
.expressions()
.into_iter()
.map(|e| e.rewrite(&mut converter))
.collect::<Result<Vec<_>>>()?;
datafusion_expr::utils::from_plan(&plan, &expr, &inputs).map(Transformed::Yes)
}
LogicalPlan::Subquery { .. }
| LogicalPlan::SubqueryAlias { .. }
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Dml(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Ddl(_) => Ok(Transformed::No(plan)),
})
}
fn name(&self) -> &str {
"TypeConversionRule"
}
}
struct TypeConverter {
schemas: Vec<DFSchemaRef>,
}
impl TypeConverter {
fn column_type(&self, expr: &Expr) -> Option<DataType> {
if let Expr::Column(_) = expr {
for schema in &self.schemas {
if let Ok(v) = expr.get_type(schema) {
return Some(v);
}
}
}
None
}
fn cast_scalar_value(value: &ScalarValue, target_type: &DataType) -> Result<ScalarValue> {
match (target_type, value) {
(DataType::Timestamp(_, _), ScalarValue::Utf8(Some(v))) => string_to_timestamp_ms(v),
(DataType::Boolean, ScalarValue::Utf8(Some(v))) => match v.to_lowercase().as_str() {
"true" => Ok(ScalarValue::Boolean(Some(true))),
"false" => Ok(ScalarValue::Boolean(Some(false))),
_ => Ok(ScalarValue::Boolean(None)),
},
(target_type, value) => {
let value_arr = value.to_array();
let arr =
compute::cast(&value_arr, target_type).map_err(DataFusionError::ArrowError)?;
ScalarValue::try_from_array(
&arr,
0, // index: Converts a value in `array` at `index` into a ScalarValue
)
}
}
}
fn convert_type<'b>(&self, mut left: &'b Expr, mut right: &'b Expr) -> Result<(Expr, Expr)> {
let left_type = self.column_type(left);
let right_type = self.column_type(right);
let mut reverse = false;
let left_type = match (&left_type, &right_type) {
(Some(v), None) => v,
(None, Some(v)) => {
reverse = true;
std::mem::swap(&mut left, &mut right);
v
}
_ => return Ok((left.clone(), right.clone())),
};
match (left, right) {
(Expr::Column(col), Expr::Literal(value)) => {
let casted_right = Self::cast_scalar_value(value, left_type)?;
if casted_right.is_null() {
return Err(DataFusionError::Plan(format!(
"column:{col:?} value:{value:?} is invalid",
)));
}
if reverse {
Ok((Expr::Literal(casted_right), left.clone()))
} else {
Ok((left.clone(), Expr::Literal(casted_right)))
}
}
_ => Ok((left.clone(), right.clone())),
}
}
}
impl TreeNodeRewriter for TypeConverter {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
let new_expr = match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq => {
let (left, right) = self.convert_type(&left, &right)?;
Expr::BinaryExpr(BinaryExpr {
left: Box::new(left),
op,
right: Box::new(right),
})
}
_ => Expr::BinaryExpr(BinaryExpr { left, op, right }),
},
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let (expr, low) = self.convert_type(&expr, &low)?;
let (expr, high) = self.convert_type(&expr, &high)?;
Expr::Between(Between {
expr: Box::new(expr),
negated,
low: Box::new(low),
high: Box::new(high),
})
}
Expr::InList(InList {
expr,
list,
negated,
}) => {
let mut list_expr = Vec::with_capacity(list.len());
for e in list {
let (_, expr_conversion) = self.convert_type(&expr, &e)?;
list_expr.push(expr_conversion);
}
Expr::InList(InList {
expr,
list: list_expr,
negated,
})
}
Expr::Literal(value) => match value {
ScalarValue::TimestampSecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Second)
}
ScalarValue::TimestampMillisecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Millisecond)
}
ScalarValue::TimestampMicrosecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Microsecond)
}
ScalarValue::TimestampNanosecond(Some(i), _) => {
timestamp_to_timestamp_ms_expr(i, TimeUnit::Nanosecond)
}
_ => Expr::Literal(value),
},
expr => expr,
};
Ok(new_expr)
}
}
fn timestamp_to_timestamp_ms_expr(val: i64, unit: TimeUnit) -> Expr {
let timestamp = match unit {
TimeUnit::Second => val * 1_000,
TimeUnit::Millisecond => val,
TimeUnit::Microsecond => val / 1_000,
TimeUnit::Nanosecond => val / 1_000 / 1_000,
};
Expr::Literal(ScalarValue::TimestampMillisecond(Some(timestamp), None))
}
fn string_to_timestamp_ms(string: &str) -> Result<ScalarValue> {
Ok(ScalarValue::TimestampMillisecond(
Some(
Timestamp::from_str(string)
.map(|t| t.value() / 1_000_000)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
),
None,
))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use datafusion_common::{Column, DFField, DFSchema};
use datafusion_sql::TableReference;
use super::*;
#[test]
fn test_string_to_timestamp_ms() {
assert!(matches!(
string_to_timestamp_ms("2022-02-02 19:00:00+08:00").unwrap(),
ScalarValue::TimestampMillisecond(Some(1643799600000), None)
));
assert!(matches!(
string_to_timestamp_ms("2009-02-13 23:31:30Z").unwrap(),
ScalarValue::TimestampMillisecond(Some(1234567890000), None)
));
}
#[test]
fn test_timestamp_to_timestamp_ms_expr() {
assert!(matches!(
timestamp_to_timestamp_ms_expr(123, TimeUnit::Second),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123000), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123, TimeUnit::Millisecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123, TimeUnit::Microsecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(1230, TimeUnit::Microsecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(1), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123000, TimeUnit::Microsecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(1230, TimeUnit::Nanosecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None))
));
assert!(matches!(
timestamp_to_timestamp_ms_expr(123_000_000, TimeUnit::Nanosecond),
Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None))
));
}
#[test]
fn test_convert_timestamp_str() {
use datatypes::arrow::datatypes::TimeUnit as ArrowTimeUnit;
let schema_ref = Arc::new(
DFSchema::new_with_metadata(
vec![DFField::new(
None::<TableReference>,
"ts",
DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
true,
)],
HashMap::new(),
)
.unwrap(),
);
let mut converter = TypeConverter {
schemas: vec![schema_ref],
};
assert_eq!(
Expr::Column(Column::from_name("ts")).gt(Expr::Literal(
ScalarValue::TimestampMillisecond(Some(1599514949000), None)
)),
converter
.mutate(
Expr::Column(Column::from_name("ts")).gt(Expr::Literal(ScalarValue::Utf8(
Some("2020-09-08T05:42:29+08:00".to_string()),
)))
)
.unwrap()
);
}
#[test]
fn test_convert_bool() {
let col_name = "is_valid";
let schema_ref = Arc::new(
DFSchema::new_with_metadata(
vec![DFField::new(
None::<TableReference>,
col_name,
DataType::Boolean,
false,
)],
HashMap::new(),
)
.unwrap(),
);
let mut converter = TypeConverter {
schemas: vec![schema_ref],
};
assert_eq!(
Expr::Column(Column::from_name(col_name))
.eq(Expr::Literal(ScalarValue::Boolean(Some(true)))),
converter
.mutate(
Expr::Column(Column::from_name(col_name))
.eq(Expr::Literal(ScalarValue::Utf8(Some("true".to_string()))))
)
.unwrap()
);
}
}

View File

@@ -30,10 +30,12 @@ use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use promql::extension_plan::PromExtensionPlanner;
use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer};
use crate::optimizer::TypeConversionRule;
use crate::optimizer::order_hint::OrderHintRule;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::query_engine::options::QueryOptions;
/// Query engine global state
@@ -69,6 +71,8 @@ impl QueryEngineState {
analyzer.rules.insert(0, Arc::new(DistPlannerAnalyzer));
}
analyzer.rules.insert(0, Arc::new(TypeConversionRule));
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(OrderHintRule));
let session_state = SessionState::with_config_rt_and_catalog_list(
session_config,
@@ -76,6 +80,7 @@ impl QueryEngineState {
Arc::new(MemoryCatalogList::default()), // pass a dummy catalog list
)
.with_analyzer_rules(analyzer.rules)
.with_optimizer_rules(optimizer.rules)
.with_query_planner(Arc::new(DfQueryPlanner::new()));
let df_context = SessionContext::with_state(session_state);

View File

@@ -38,6 +38,7 @@ use crate::window_infer::{PlainWindowInference, WindowInfer};
pub struct ChunkReaderImpl {
schema: ProjectedSchemaRef,
batch_reader: BoxedBatchReader,
output_ordering: Option<Vec<OrderOption>>,
}
#[async_trait]
@@ -62,13 +63,22 @@ impl ChunkReader for ChunkReaderImpl {
};
self.schema.batch_to_chunk(&batch)
}
fn output_ordering(&self) -> Option<Vec<OrderOption>> {
self.output_ordering.clone()
}
}
impl ChunkReaderImpl {
pub fn new(schema: ProjectedSchemaRef, batch_reader: BoxedBatchReader) -> ChunkReaderImpl {
pub fn new(
schema: ProjectedSchemaRef,
batch_reader: BoxedBatchReader,
output_ordering: Option<Vec<OrderOption>>,
) -> ChunkReaderImpl {
ChunkReaderImpl {
schema,
batch_reader,
output_ordering,
}
}
@@ -161,12 +171,16 @@ impl ChunkReaderBuilder {
self
}
/// Try to infer time window from output ordering. If the result
/// is `None` means the output ordering is not obeyed, otherwise
/// means the output ordering is obeyed and is same with request.
fn infer_time_windows(&self, output_ordering: &[OrderOption]) -> Option<Vec<TimestampRange>> {
if output_ordering.is_empty() {
return None;
}
let OrderOption { index, options } = &output_ordering[0];
if *index != self.schema.timestamp_index() {
let OrderOption { name, options } = &output_ordering[0];
if name != self.schema.timestamp_column_name() {
return None;
}
let memtable_stats = self.memtables.iter().map(|m| m.stats()).collect::<Vec<_>>();
@@ -238,15 +252,17 @@ impl ChunkReaderBuilder {
);
self.iter_ctx.projected_schema = Some(schema.clone());
let mut output_ordering = None;
let reader = if let Some(ordering) = self.output_ordering.take() &&
let Some(windows) = self.infer_time_windows(&ordering) {
self.build_windowed(&schema, &time_range_predicate, windows, ordering)
.await?
output_ordering = Some(ordering.clone());
self.build_windowed(&schema, &time_range_predicate, windows, ordering)
.await?
} else {
self.build_reader(&schema, &time_range_predicate).await?
};
Ok(ChunkReaderImpl::new(schema, reader))
Ok(ChunkReaderImpl::new(schema, reader, output_ordering))
}
/// Build time range predicate from schema and filters.

View File

@@ -22,7 +22,7 @@ use snafu::ResultExt;
use crate::error::{self, Result};
use crate::read::{Batch, BatchReader};
use crate::schema::ProjectedSchemaRef;
use crate::schema::{ProjectedSchemaRef, StoreSchema};
/// [WindowedReader] provides a windowed record batch reader that scans all rows within a window
/// at a time and sort these rows ordered in `[<timestamp>, <PK>]` order.
@@ -102,8 +102,8 @@ fn sort_by_rows(
arrays: Vec<ArrayRef>,
order_options: &[OrderOption],
) -> Result<Vec<ArrayRef>> {
let sort_columns = build_sorted_columns(order_options);
let store_schema = schema.schema_to_read();
let sort_columns = build_sorted_columns(store_schema, order_options);
// Convert columns to rows to speed lexicographic sort
// TODO(hl): maybe optimize to lexsort_to_index when only timestamp column is involved.
let mut row_converter = RowConverter::new(
@@ -150,9 +150,9 @@ fn sort_by_rows(
/// Builds sorted columns from `order_options`.
/// Returns a vector of columns indices to sort and sort orders (true means descending order).
fn build_sorted_columns(order_options: &[OrderOption]) -> Vec<(usize, bool)> {
fn build_sorted_columns(schema: &StoreSchema, order_options: &[OrderOption]) -> Vec<(usize, bool)> {
order_options
.iter()
.map(|o| (o.index, o.options.descending))
.map(|o| (schema.column_index(&o.name), o.options.descending))
.collect()
}

View File

@@ -708,7 +708,7 @@ async fn test_read_by_chunk_reader() {
)
.await
.check(vec![OrderOption {
index: 0,
name: "timestamp".to_string(),
options: SortOptions {
descending: true,
nulls_first: true,
@@ -737,7 +737,7 @@ async fn test_read_by_chunk_reader() {
)
.await
.check(vec![OrderOption {
index: 0,
name: "timestamp".to_string(),
options: SortOptions {
descending: true,
nulls_first: true,
@@ -768,7 +768,7 @@ async fn test_read_by_chunk_reader() {
)
.await
.check(vec![OrderOption {
index: 0,
name: "timestamp".to_string(),
options: SortOptions {
descending: true,
nulls_first: true,
@@ -799,7 +799,7 @@ async fn test_read_by_chunk_reader() {
)
.await
.check(vec![OrderOption {
index: 0,
name: "timestamp".to_string(),
options: SortOptions {
descending: false,
nulls_first: true,

View File

@@ -135,6 +135,11 @@ impl RegionSchema {
self.store_schema.timestamp_index()
}
#[inline]
pub(crate) fn timestamp_column_name(&self) -> &str {
self.store_schema.column_name(self.timestamp_index())
}
#[inline]
pub(crate) fn value_indices(&self) -> impl Iterator<Item = usize> {
self.store_schema.value_indices()

View File

@@ -177,6 +177,13 @@ impl StoreSchema {
&self.schema.column_schemas()[idx].name
}
/// # Panic
/// Panics if `name` is not a valid column name.
#[inline]
pub(crate) fn column_index(&self, name: &str) -> usize {
self.schema.column_index_by_name(name).unwrap()
}
#[inline]
pub(crate) fn num_columns(&self) -> usize {
self.schema.num_columns()

View File

@@ -14,6 +14,7 @@
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_recordbatch::OrderOption;
use datatypes::vectors::VectorRef;
use crate::storage::SchemaRef;
@@ -45,4 +46,8 @@ pub trait ChunkReader: Send {
// project the chunk according to required projection.
fn project_chunk(&self, chunk: Chunk) -> Chunk;
fn output_ordering(&self) -> Option<Vec<OrderOption>> {
None
}
}

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[features]
testing = []
[dependencies]
anymap = "1.0.0-beta.2"
async-trait = "0.1"

View File

@@ -53,6 +53,11 @@ impl DfTableProviderAdapter {
pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
self.scan_req.lock().unwrap().output_ordering = Some(order_opts.to_vec());
}
#[cfg(feature = "testing")]
pub fn get_scan_req(&self) -> ScanRequest {
self.scan_req.lock().unwrap().clone()
}
}
#[async_trait::async_trait]
@@ -96,8 +101,8 @@ impl TableProvider for DfTableProviderAdapter {
order_opts
.iter()
.map(|order_opt| {
let col_name = schema.column_name_by_index(order_opt.index);
let col_expr = Arc::new(Column::new(col_name, order_opt.index));
let col_index = schema.column_index_by_name(&order_opt.name).unwrap();
let col_expr = Arc::new(Column::new(&order_opt.name, col_index));
PhysicalSortExpr {
expr: col_expr,
options: order_opt.options,