diff --git a/Cargo.lock b/Cargo.lock index 67bc70a25d..a82f6fe7ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2275,9 +2275,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" +checksum = "626ae34994d3d8d668f4269922248239db4ae42d538b14c398b74a52208e8086" dependencies = [ "csv-core", "itoa", diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 95f253da1d..454748b36c 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -163,6 +163,7 @@ impl Table for InformationTable { let stream = RecordBatchStreamAdaptor { schema: projected_schema, stream: Box::pin(stream), + output_ordering: None, }; Ok(Box::pin(stream)) } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 632a88267e..91072c872e 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -43,9 +43,9 @@ pub trait RecordBatchStream: Stream> { pub type SendableRecordBatchStream = Pin>; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct OrderOption { - pub index: usize, + pub name: String, pub options: SortOptions, } @@ -196,12 +196,17 @@ impl Stream for SimpleRecordBatchStream { pub struct RecordBatchStreamAdaptor { pub schema: SchemaRef, pub stream: Pin> + Send>>, + pub output_ordering: Option>, } impl RecordBatchStream for RecordBatchStreamAdaptor { fn schema(&self) -> SchemaRef { self.schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.output_ordering.as_deref() + } } impl Stream for RecordBatchStreamAdaptor { diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 3058011d53..74d02dc234 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -222,7 +222,11 @@ impl Table for DistTable { } }, ); - let record_batch_stream = RecordBatchStreamAdaptor { schema, stream }; + let record_batch_stream = RecordBatchStreamAdaptor { + schema, + stream, + output_ordering: None, + }; Ok(Box::pin(record_batch_stream)) } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index a5ecf8fa92..849e5a702d 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -208,7 +208,11 @@ impl Table for MitoTable { } }); - let stream = Box::pin(RecordBatchStreamAdaptor { schema, stream }); + let stream = Box::pin(RecordBatchStreamAdaptor { + schema, + stream, + output_ordering: None, + }); Ok(Arc::new(StreamScanAdapter::new(stream))) } @@ -275,6 +279,7 @@ impl Table for MitoTable { })?; let schema = stream_schema.clone(); + let output_ordering = readers.get(0).and_then(|reader| reader.output_ordering()); let stream = Box::pin(async_stream::try_stream! { for mut reader in readers { @@ -285,7 +290,11 @@ impl Table for MitoTable { } }); - Ok(Box::pin(RecordBatchStreamAdaptor { schema, stream })) + Ok(Box::pin(RecordBatchStreamAdaptor { + schema, + stream, + output_ordering, + })) } fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult> { diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 14339b0394..2ffb1b0980 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -56,4 +56,5 @@ statrs = "0.16" stats-cli = "3.0" store-api = { path = "../store-api" } streaming-stats = "0.2" +table = { path = "../table", features = ["testing"] } tokio-stream = "0.1" diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 011d72d8b1..450ca7e767 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,399 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; - -use common_time::timestamp::{TimeUnit, Timestamp}; -use datafusion::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; -use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue}; -use datafusion_expr::expr::InList; -use datafusion_expr::{ - Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan, -}; -use datafusion_optimizer::analyzer::AnalyzerRule; -use datatypes::arrow::compute; -use datatypes::arrow::datatypes::DataType; - -/// TypeConversionRule converts some literal values in logical plan to other types according -/// to data type of corresponding columns. -/// Specifically: -/// - string literal of timestamp is converted to `Expr::Literal(ScalarValue::TimestampMillis)` -/// - string literal of boolean is converted to `Expr::Literal(ScalarValue::Boolean)` -pub struct TypeConversionRule; - -impl AnalyzerRule for TypeConversionRule { - // TODO(ruihang): fix this warning - #[allow(deprecated)] - fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { - let schemas = plan.all_schemas().into_iter().cloned().collect::>(); - plan.transform(&|plan| match plan { - LogicalPlan::Filter(filter) => { - let mut converter = TypeConverter { - schemas: schemas.clone(), - }; - let rewritten = filter.predicate.clone().rewrite(&mut converter)?; - Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new( - rewritten, - filter.input, - )?))) - } - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - projected_schema, - filters, - fetch, - }) => { - let mut converter = TypeConverter { - schemas: schemas.clone(), - }; - let rewrite_filters = filters - .into_iter() - .map(|e| e.rewrite(&mut converter)) - .collect::>>()?; - Ok(Transformed::Yes(LogicalPlan::TableScan(TableScan { - table_name: table_name.clone(), - source: source.clone(), - projection, - projected_schema, - filters: rewrite_filters, - fetch, - }))) - } - LogicalPlan::Projection { .. } - | LogicalPlan::Window { .. } - | LogicalPlan::Aggregate { .. } - | LogicalPlan::Repartition { .. } - | LogicalPlan::Extension { .. } - | LogicalPlan::Sort { .. } - | LogicalPlan::Explain { .. } - | LogicalPlan::Limit { .. } - | LogicalPlan::Union { .. } - | LogicalPlan::Join { .. } - | LogicalPlan::CrossJoin { .. } - | LogicalPlan::Distinct { .. } - | LogicalPlan::Values { .. } - | LogicalPlan::Analyze { .. } => { - let mut converter = TypeConverter { - schemas: plan.all_schemas().into_iter().cloned().collect(), - }; - let inputs = plan.inputs().into_iter().cloned().collect::>(); - let expr = plan - .expressions() - .into_iter() - .map(|e| e.rewrite(&mut converter)) - .collect::>>()?; - - datafusion_expr::utils::from_plan(&plan, &expr, &inputs).map(Transformed::Yes) - } - - LogicalPlan::Subquery { .. } - | LogicalPlan::SubqueryAlias { .. } - | LogicalPlan::EmptyRelation(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Dml(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Unnest(_) - | LogicalPlan::Statement(_) - | LogicalPlan::Ddl(_) => Ok(Transformed::No(plan)), - }) - } - - fn name(&self) -> &str { - "TypeConversionRule" - } -} - -struct TypeConverter { - schemas: Vec, -} - -impl TypeConverter { - fn column_type(&self, expr: &Expr) -> Option { - if let Expr::Column(_) = expr { - for schema in &self.schemas { - if let Ok(v) = expr.get_type(schema) { - return Some(v); - } - } - } - None - } - - fn cast_scalar_value(value: &ScalarValue, target_type: &DataType) -> Result { - match (target_type, value) { - (DataType::Timestamp(_, _), ScalarValue::Utf8(Some(v))) => string_to_timestamp_ms(v), - (DataType::Boolean, ScalarValue::Utf8(Some(v))) => match v.to_lowercase().as_str() { - "true" => Ok(ScalarValue::Boolean(Some(true))), - "false" => Ok(ScalarValue::Boolean(Some(false))), - _ => Ok(ScalarValue::Boolean(None)), - }, - (target_type, value) => { - let value_arr = value.to_array(); - let arr = - compute::cast(&value_arr, target_type).map_err(DataFusionError::ArrowError)?; - - ScalarValue::try_from_array( - &arr, - 0, // index: Converts a value in `array` at `index` into a ScalarValue - ) - } - } - } - - fn convert_type<'b>(&self, mut left: &'b Expr, mut right: &'b Expr) -> Result<(Expr, Expr)> { - let left_type = self.column_type(left); - let right_type = self.column_type(right); - - let mut reverse = false; - let left_type = match (&left_type, &right_type) { - (Some(v), None) => v, - (None, Some(v)) => { - reverse = true; - std::mem::swap(&mut left, &mut right); - v - } - _ => return Ok((left.clone(), right.clone())), - }; - - match (left, right) { - (Expr::Column(col), Expr::Literal(value)) => { - let casted_right = Self::cast_scalar_value(value, left_type)?; - if casted_right.is_null() { - return Err(DataFusionError::Plan(format!( - "column:{col:?} value:{value:?} is invalid", - ))); - } - if reverse { - Ok((Expr::Literal(casted_right), left.clone())) - } else { - Ok((left.clone(), Expr::Literal(casted_right))) - } - } - _ => Ok((left.clone(), right.clone())), - } - } -} - -impl TreeNodeRewriter for TypeConverter { - type N = Expr; - - fn mutate(&mut self, expr: Expr) -> Result { - let new_expr = match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { - Operator::Eq - | Operator::NotEq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq => { - let (left, right) = self.convert_type(&left, &right)?; - Expr::BinaryExpr(BinaryExpr { - left: Box::new(left), - op, - right: Box::new(right), - }) - } - _ => Expr::BinaryExpr(BinaryExpr { left, op, right }), - }, - Expr::Between(Between { - expr, - negated, - low, - high, - }) => { - let (expr, low) = self.convert_type(&expr, &low)?; - let (expr, high) = self.convert_type(&expr, &high)?; - Expr::Between(Between { - expr: Box::new(expr), - negated, - low: Box::new(low), - high: Box::new(high), - }) - } - Expr::InList(InList { - expr, - list, - negated, - }) => { - let mut list_expr = Vec::with_capacity(list.len()); - for e in list { - let (_, expr_conversion) = self.convert_type(&expr, &e)?; - list_expr.push(expr_conversion); - } - Expr::InList(InList { - expr, - list: list_expr, - negated, - }) - } - Expr::Literal(value) => match value { - ScalarValue::TimestampSecond(Some(i), _) => { - timestamp_to_timestamp_ms_expr(i, TimeUnit::Second) - } - ScalarValue::TimestampMillisecond(Some(i), _) => { - timestamp_to_timestamp_ms_expr(i, TimeUnit::Millisecond) - } - - ScalarValue::TimestampMicrosecond(Some(i), _) => { - timestamp_to_timestamp_ms_expr(i, TimeUnit::Microsecond) - } - ScalarValue::TimestampNanosecond(Some(i), _) => { - timestamp_to_timestamp_ms_expr(i, TimeUnit::Nanosecond) - } - _ => Expr::Literal(value), - }, - expr => expr, - }; - Ok(new_expr) - } -} - -fn timestamp_to_timestamp_ms_expr(val: i64, unit: TimeUnit) -> Expr { - let timestamp = match unit { - TimeUnit::Second => val * 1_000, - TimeUnit::Millisecond => val, - TimeUnit::Microsecond => val / 1_000, - TimeUnit::Nanosecond => val / 1_000 / 1_000, - }; - - Expr::Literal(ScalarValue::TimestampMillisecond(Some(timestamp), None)) -} - -fn string_to_timestamp_ms(string: &str) -> Result { - Ok(ScalarValue::TimestampMillisecond( - Some( - Timestamp::from_str(string) - .map(|t| t.value() / 1_000_000) - .map_err(|e| DataFusionError::External(Box::new(e)))?, - ), - None, - )) -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use datafusion_common::{Column, DFField, DFSchema}; - use datafusion_sql::TableReference; - - use super::*; - - #[test] - fn test_string_to_timestamp_ms() { - assert!(matches!( - string_to_timestamp_ms("2022-02-02 19:00:00+08:00").unwrap(), - ScalarValue::TimestampMillisecond(Some(1643799600000), None) - )); - assert!(matches!( - string_to_timestamp_ms("2009-02-13 23:31:30Z").unwrap(), - ScalarValue::TimestampMillisecond(Some(1234567890000), None) - )); - } - - #[test] - fn test_timestamp_to_timestamp_ms_expr() { - assert!(matches!( - timestamp_to_timestamp_ms_expr(123, TimeUnit::Second), - Expr::Literal(ScalarValue::TimestampMillisecond(Some(123000), None)) - )); - - assert!(matches!( - timestamp_to_timestamp_ms_expr(123, TimeUnit::Millisecond), - Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None)) - )); - - assert!(matches!( - timestamp_to_timestamp_ms_expr(123, TimeUnit::Microsecond), - Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None)) - )); - - assert!(matches!( - timestamp_to_timestamp_ms_expr(1230, TimeUnit::Microsecond), - Expr::Literal(ScalarValue::TimestampMillisecond(Some(1), None)) - )); - - assert!(matches!( - timestamp_to_timestamp_ms_expr(123000, TimeUnit::Microsecond), - Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None)) - )); - - assert!(matches!( - timestamp_to_timestamp_ms_expr(1230, TimeUnit::Nanosecond), - Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None)) - )); - assert!(matches!( - timestamp_to_timestamp_ms_expr(123_000_000, TimeUnit::Nanosecond), - Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None)) - )); - } - - #[test] - fn test_convert_timestamp_str() { - use datatypes::arrow::datatypes::TimeUnit as ArrowTimeUnit; - - let schema_ref = Arc::new( - DFSchema::new_with_metadata( - vec![DFField::new( - None::, - "ts", - DataType::Timestamp(ArrowTimeUnit::Millisecond, None), - true, - )], - HashMap::new(), - ) - .unwrap(), - ); - let mut converter = TypeConverter { - schemas: vec![schema_ref], - }; - - assert_eq!( - Expr::Column(Column::from_name("ts")).gt(Expr::Literal( - ScalarValue::TimestampMillisecond(Some(1599514949000), None) - )), - converter - .mutate( - Expr::Column(Column::from_name("ts")).gt(Expr::Literal(ScalarValue::Utf8( - Some("2020-09-08T05:42:29+08:00".to_string()), - ))) - ) - .unwrap() - ); - } - - #[test] - fn test_convert_bool() { - let col_name = "is_valid"; - let schema_ref = Arc::new( - DFSchema::new_with_metadata( - vec![DFField::new( - None::, - col_name, - DataType::Boolean, - false, - )], - HashMap::new(), - ) - .unwrap(), - ); - let mut converter = TypeConverter { - schemas: vec![schema_ref], - }; - - assert_eq!( - Expr::Column(Column::from_name(col_name)) - .eq(Expr::Literal(ScalarValue::Boolean(Some(true)))), - converter - .mutate( - Expr::Column(Column::from_name(col_name)) - .eq(Expr::Literal(ScalarValue::Utf8(Some("true".to_string())))) - ) - .unwrap() - ); - } -} +pub mod order_hint; +pub mod type_conversion; diff --git a/src/query/src/optimizer/order_hint.rs b/src/query/src/optimizer/order_hint.rs new file mode 100644 index 0000000000..6fd534e796 --- /dev/null +++ b/src/query/src/optimizer/order_hint.rs @@ -0,0 +1,168 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow_schema::SortOptions; +use common_recordbatch::OrderOption; +use datafusion::datasource::DefaultTableSource; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion}; +use datafusion_common::Result as DataFusionResult; +use datafusion_expr::expr::Sort; +use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; +use table::table::adapter::DfTableProviderAdapter; + +/// This rule will pass the nearest order requirement to the leaf table +/// scan node as ordering hint. +pub struct OrderHintRule; + +impl OptimizerRule for OrderHintRule { + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> DataFusionResult> { + Self::optimize(plan).map(Some) + } + + fn name(&self) -> &str { + "OrderHintRule" + } +} + +impl OrderHintRule { + fn optimize(plan: &LogicalPlan) -> DataFusionResult { + let mut visitor = OrderHintVisitor::default(); + plan.visit(&mut visitor)?; + + if let Some(order_expr) = visitor.order_expr.take() { + plan.clone() + .transform_down(&|plan| Self::set_ordering_hint(plan, &order_expr)) + } else { + Ok(plan.clone()) + } + } + + fn set_ordering_hint( + plan: LogicalPlan, + order_expr: &[Sort], + ) -> DataFusionResult> { + match &plan { + LogicalPlan::TableScan(table_scan) => { + let mut transformed = false; + if let Some(source) = table_scan + .source + .as_any() + .downcast_ref::() + { + if let Some(adapter) = source + .table_provider + .as_any() + .downcast_ref::() + { + let mut opts = Vec::with_capacity(order_expr.len()); + for sort in order_expr { + let name = match sort.expr.try_into_col() { + Ok(col) => col.name, + Err(_) => return Ok(Transformed::No(plan)), + }; + opts.push(OrderOption { + name, + options: SortOptions { + descending: !sort.asc, + nulls_first: sort.nulls_first, + }, + }) + } + adapter.with_ordering_hint(&opts); + transformed = true; + } + } + if transformed { + Ok(Transformed::Yes(plan)) + } else { + Ok(Transformed::No(plan)) + } + } + _ => Ok(Transformed::No(plan)), + } + } +} + +/// Find the most closest order requirement to the leaf node. +#[derive(Default)] +struct OrderHintVisitor { + order_expr: Option>, +} + +impl TreeNodeVisitor for OrderHintVisitor { + type N = LogicalPlan; + + fn pre_visit(&mut self, node: &Self::N) -> DataFusionResult { + if let LogicalPlan::Sort(sort) = node { + let mut exprs = vec![]; + for expr in &sort.expr { + if let Expr::Sort(sort_expr) = expr { + exprs.push(sort_expr.clone()); + } + } + self.order_expr = Some(exprs); + } + Ok(VisitRecursion::Continue) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use datafusion_expr::{col, LogicalPlanBuilder}; + use datafusion_optimizer::OptimizerContext; + use table::table::numbers::NumbersTable; + + use super::*; + + #[test] + #[allow(clippy::bool_assert_comparison)] + fn set_order_hint() { + let numbers_table = Arc::new(NumbersTable::new(0)) as _; + let adapter = Arc::new(DfTableProviderAdapter::new(numbers_table)); + let table_source = Arc::new(DefaultTableSource::new(adapter.clone())); + + let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![]) + .unwrap() + .sort(vec![col("number").sort(true, false)]) + .unwrap() + .sort(vec![col("number").sort(false, true)]) + .unwrap() + .build() + .unwrap(); + + let context = OptimizerContext::default(); + OrderHintRule.try_optimize(&plan, &context).unwrap(); + + // should read the first (with `.sort(true, false)`) sort option + let scan_req = adapter.get_scan_req(); + assert_eq!("number", &scan_req.output_ordering.clone().unwrap()[0].name); + assert_eq!( + true, + !scan_req.output_ordering.clone().unwrap()[0] + .options + .descending // the previous parameter is `asc` + ); + assert_eq!( + false, + scan_req.output_ordering.unwrap()[0].options.nulls_first + ); + } +} diff --git a/src/query/src/optimizer/type_conversion.rs b/src/query/src/optimizer/type_conversion.rs new file mode 100644 index 0000000000..011d72d8b1 --- /dev/null +++ b/src/query/src/optimizer/type_conversion.rs @@ -0,0 +1,410 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; + +use common_time::timestamp::{TimeUnit, Timestamp}; +use datafusion::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue}; +use datafusion_expr::expr::InList; +use datafusion_expr::{ + Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan, +}; +use datafusion_optimizer::analyzer::AnalyzerRule; +use datatypes::arrow::compute; +use datatypes::arrow::datatypes::DataType; + +/// TypeConversionRule converts some literal values in logical plan to other types according +/// to data type of corresponding columns. +/// Specifically: +/// - string literal of timestamp is converted to `Expr::Literal(ScalarValue::TimestampMillis)` +/// - string literal of boolean is converted to `Expr::Literal(ScalarValue::Boolean)` +pub struct TypeConversionRule; + +impl AnalyzerRule for TypeConversionRule { + // TODO(ruihang): fix this warning + #[allow(deprecated)] + fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { + let schemas = plan.all_schemas().into_iter().cloned().collect::>(); + plan.transform(&|plan| match plan { + LogicalPlan::Filter(filter) => { + let mut converter = TypeConverter { + schemas: schemas.clone(), + }; + let rewritten = filter.predicate.clone().rewrite(&mut converter)?; + Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new( + rewritten, + filter.input, + )?))) + } + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + projected_schema, + filters, + fetch, + }) => { + let mut converter = TypeConverter { + schemas: schemas.clone(), + }; + let rewrite_filters = filters + .into_iter() + .map(|e| e.rewrite(&mut converter)) + .collect::>>()?; + Ok(Transformed::Yes(LogicalPlan::TableScan(TableScan { + table_name: table_name.clone(), + source: source.clone(), + projection, + projected_schema, + filters: rewrite_filters, + fetch, + }))) + } + LogicalPlan::Projection { .. } + | LogicalPlan::Window { .. } + | LogicalPlan::Aggregate { .. } + | LogicalPlan::Repartition { .. } + | LogicalPlan::Extension { .. } + | LogicalPlan::Sort { .. } + | LogicalPlan::Explain { .. } + | LogicalPlan::Limit { .. } + | LogicalPlan::Union { .. } + | LogicalPlan::Join { .. } + | LogicalPlan::CrossJoin { .. } + | LogicalPlan::Distinct { .. } + | LogicalPlan::Values { .. } + | LogicalPlan::Analyze { .. } => { + let mut converter = TypeConverter { + schemas: plan.all_schemas().into_iter().cloned().collect(), + }; + let inputs = plan.inputs().into_iter().cloned().collect::>(); + let expr = plan + .expressions() + .into_iter() + .map(|e| e.rewrite(&mut converter)) + .collect::>>()?; + + datafusion_expr::utils::from_plan(&plan, &expr, &inputs).map(Transformed::Yes) + } + + LogicalPlan::Subquery { .. } + | LogicalPlan::SubqueryAlias { .. } + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Prepare(_) + | LogicalPlan::Dml(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Unnest(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Ddl(_) => Ok(Transformed::No(plan)), + }) + } + + fn name(&self) -> &str { + "TypeConversionRule" + } +} + +struct TypeConverter { + schemas: Vec, +} + +impl TypeConverter { + fn column_type(&self, expr: &Expr) -> Option { + if let Expr::Column(_) = expr { + for schema in &self.schemas { + if let Ok(v) = expr.get_type(schema) { + return Some(v); + } + } + } + None + } + + fn cast_scalar_value(value: &ScalarValue, target_type: &DataType) -> Result { + match (target_type, value) { + (DataType::Timestamp(_, _), ScalarValue::Utf8(Some(v))) => string_to_timestamp_ms(v), + (DataType::Boolean, ScalarValue::Utf8(Some(v))) => match v.to_lowercase().as_str() { + "true" => Ok(ScalarValue::Boolean(Some(true))), + "false" => Ok(ScalarValue::Boolean(Some(false))), + _ => Ok(ScalarValue::Boolean(None)), + }, + (target_type, value) => { + let value_arr = value.to_array(); + let arr = + compute::cast(&value_arr, target_type).map_err(DataFusionError::ArrowError)?; + + ScalarValue::try_from_array( + &arr, + 0, // index: Converts a value in `array` at `index` into a ScalarValue + ) + } + } + } + + fn convert_type<'b>(&self, mut left: &'b Expr, mut right: &'b Expr) -> Result<(Expr, Expr)> { + let left_type = self.column_type(left); + let right_type = self.column_type(right); + + let mut reverse = false; + let left_type = match (&left_type, &right_type) { + (Some(v), None) => v, + (None, Some(v)) => { + reverse = true; + std::mem::swap(&mut left, &mut right); + v + } + _ => return Ok((left.clone(), right.clone())), + }; + + match (left, right) { + (Expr::Column(col), Expr::Literal(value)) => { + let casted_right = Self::cast_scalar_value(value, left_type)?; + if casted_right.is_null() { + return Err(DataFusionError::Plan(format!( + "column:{col:?} value:{value:?} is invalid", + ))); + } + if reverse { + Ok((Expr::Literal(casted_right), left.clone())) + } else { + Ok((left.clone(), Expr::Literal(casted_right))) + } + } + _ => Ok((left.clone(), right.clone())), + } + } +} + +impl TreeNodeRewriter for TypeConverter { + type N = Expr; + + fn mutate(&mut self, expr: Expr) -> Result { + let new_expr = match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq => { + let (left, right) = self.convert_type(&left, &right)?; + Expr::BinaryExpr(BinaryExpr { + left: Box::new(left), + op, + right: Box::new(right), + }) + } + _ => Expr::BinaryExpr(BinaryExpr { left, op, right }), + }, + Expr::Between(Between { + expr, + negated, + low, + high, + }) => { + let (expr, low) = self.convert_type(&expr, &low)?; + let (expr, high) = self.convert_type(&expr, &high)?; + Expr::Between(Between { + expr: Box::new(expr), + negated, + low: Box::new(low), + high: Box::new(high), + }) + } + Expr::InList(InList { + expr, + list, + negated, + }) => { + let mut list_expr = Vec::with_capacity(list.len()); + for e in list { + let (_, expr_conversion) = self.convert_type(&expr, &e)?; + list_expr.push(expr_conversion); + } + Expr::InList(InList { + expr, + list: list_expr, + negated, + }) + } + Expr::Literal(value) => match value { + ScalarValue::TimestampSecond(Some(i), _) => { + timestamp_to_timestamp_ms_expr(i, TimeUnit::Second) + } + ScalarValue::TimestampMillisecond(Some(i), _) => { + timestamp_to_timestamp_ms_expr(i, TimeUnit::Millisecond) + } + + ScalarValue::TimestampMicrosecond(Some(i), _) => { + timestamp_to_timestamp_ms_expr(i, TimeUnit::Microsecond) + } + ScalarValue::TimestampNanosecond(Some(i), _) => { + timestamp_to_timestamp_ms_expr(i, TimeUnit::Nanosecond) + } + _ => Expr::Literal(value), + }, + expr => expr, + }; + Ok(new_expr) + } +} + +fn timestamp_to_timestamp_ms_expr(val: i64, unit: TimeUnit) -> Expr { + let timestamp = match unit { + TimeUnit::Second => val * 1_000, + TimeUnit::Millisecond => val, + TimeUnit::Microsecond => val / 1_000, + TimeUnit::Nanosecond => val / 1_000 / 1_000, + }; + + Expr::Literal(ScalarValue::TimestampMillisecond(Some(timestamp), None)) +} + +fn string_to_timestamp_ms(string: &str) -> Result { + Ok(ScalarValue::TimestampMillisecond( + Some( + Timestamp::from_str(string) + .map(|t| t.value() / 1_000_000) + .map_err(|e| DataFusionError::External(Box::new(e)))?, + ), + None, + )) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use datafusion_common::{Column, DFField, DFSchema}; + use datafusion_sql::TableReference; + + use super::*; + + #[test] + fn test_string_to_timestamp_ms() { + assert!(matches!( + string_to_timestamp_ms("2022-02-02 19:00:00+08:00").unwrap(), + ScalarValue::TimestampMillisecond(Some(1643799600000), None) + )); + assert!(matches!( + string_to_timestamp_ms("2009-02-13 23:31:30Z").unwrap(), + ScalarValue::TimestampMillisecond(Some(1234567890000), None) + )); + } + + #[test] + fn test_timestamp_to_timestamp_ms_expr() { + assert!(matches!( + timestamp_to_timestamp_ms_expr(123, TimeUnit::Second), + Expr::Literal(ScalarValue::TimestampMillisecond(Some(123000), None)) + )); + + assert!(matches!( + timestamp_to_timestamp_ms_expr(123, TimeUnit::Millisecond), + Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None)) + )); + + assert!(matches!( + timestamp_to_timestamp_ms_expr(123, TimeUnit::Microsecond), + Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None)) + )); + + assert!(matches!( + timestamp_to_timestamp_ms_expr(1230, TimeUnit::Microsecond), + Expr::Literal(ScalarValue::TimestampMillisecond(Some(1), None)) + )); + + assert!(matches!( + timestamp_to_timestamp_ms_expr(123000, TimeUnit::Microsecond), + Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None)) + )); + + assert!(matches!( + timestamp_to_timestamp_ms_expr(1230, TimeUnit::Nanosecond), + Expr::Literal(ScalarValue::TimestampMillisecond(Some(0), None)) + )); + assert!(matches!( + timestamp_to_timestamp_ms_expr(123_000_000, TimeUnit::Nanosecond), + Expr::Literal(ScalarValue::TimestampMillisecond(Some(123), None)) + )); + } + + #[test] + fn test_convert_timestamp_str() { + use datatypes::arrow::datatypes::TimeUnit as ArrowTimeUnit; + + let schema_ref = Arc::new( + DFSchema::new_with_metadata( + vec![DFField::new( + None::, + "ts", + DataType::Timestamp(ArrowTimeUnit::Millisecond, None), + true, + )], + HashMap::new(), + ) + .unwrap(), + ); + let mut converter = TypeConverter { + schemas: vec![schema_ref], + }; + + assert_eq!( + Expr::Column(Column::from_name("ts")).gt(Expr::Literal( + ScalarValue::TimestampMillisecond(Some(1599514949000), None) + )), + converter + .mutate( + Expr::Column(Column::from_name("ts")).gt(Expr::Literal(ScalarValue::Utf8( + Some("2020-09-08T05:42:29+08:00".to_string()), + ))) + ) + .unwrap() + ); + } + + #[test] + fn test_convert_bool() { + let col_name = "is_valid"; + let schema_ref = Arc::new( + DFSchema::new_with_metadata( + vec![DFField::new( + None::, + col_name, + DataType::Boolean, + false, + )], + HashMap::new(), + ) + .unwrap(), + ); + let mut converter = TypeConverter { + schemas: vec![schema_ref], + }; + + assert_eq!( + Expr::Column(Column::from_name(col_name)) + .eq(Expr::Literal(ScalarValue::Boolean(Some(true)))), + converter + .mutate( + Expr::Column(Column::from_name(col_name)) + .eq(Expr::Literal(ScalarValue::Utf8(Some("true".to_string())))) + ) + .unwrap() + ); + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 33ee3a602e..5dbe33f8cb 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -30,10 +30,12 @@ use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; use datafusion_expr::LogicalPlan as DfLogicalPlan; use datafusion_optimizer::analyzer::Analyzer; +use datafusion_optimizer::optimizer::Optimizer; use promql::extension_plan::PromExtensionPlanner; use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; -use crate::optimizer::TypeConversionRule; +use crate::optimizer::order_hint::OrderHintRule; +use crate::optimizer::type_conversion::TypeConversionRule; use crate::query_engine::options::QueryOptions; /// Query engine global state @@ -69,6 +71,8 @@ impl QueryEngineState { analyzer.rules.insert(0, Arc::new(DistPlannerAnalyzer)); } analyzer.rules.insert(0, Arc::new(TypeConversionRule)); + let mut optimizer = Optimizer::new(); + optimizer.rules.push(Arc::new(OrderHintRule)); let session_state = SessionState::with_config_rt_and_catalog_list( session_config, @@ -76,6 +80,7 @@ impl QueryEngineState { Arc::new(MemoryCatalogList::default()), // pass a dummy catalog list ) .with_analyzer_rules(analyzer.rules) + .with_optimizer_rules(optimizer.rules) .with_query_planner(Arc::new(DfQueryPlanner::new())); let df_context = SessionContext::with_state(session_state); diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index ef60b8cd3a..b98a5e264f 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -38,6 +38,7 @@ use crate::window_infer::{PlainWindowInference, WindowInfer}; pub struct ChunkReaderImpl { schema: ProjectedSchemaRef, batch_reader: BoxedBatchReader, + output_ordering: Option>, } #[async_trait] @@ -62,13 +63,22 @@ impl ChunkReader for ChunkReaderImpl { }; self.schema.batch_to_chunk(&batch) } + + fn output_ordering(&self) -> Option> { + self.output_ordering.clone() + } } impl ChunkReaderImpl { - pub fn new(schema: ProjectedSchemaRef, batch_reader: BoxedBatchReader) -> ChunkReaderImpl { + pub fn new( + schema: ProjectedSchemaRef, + batch_reader: BoxedBatchReader, + output_ordering: Option>, + ) -> ChunkReaderImpl { ChunkReaderImpl { schema, batch_reader, + output_ordering, } } @@ -161,12 +171,16 @@ impl ChunkReaderBuilder { self } + /// Try to infer time window from output ordering. If the result + /// is `None` means the output ordering is not obeyed, otherwise + /// means the output ordering is obeyed and is same with request. fn infer_time_windows(&self, output_ordering: &[OrderOption]) -> Option> { if output_ordering.is_empty() { return None; } - let OrderOption { index, options } = &output_ordering[0]; - if *index != self.schema.timestamp_index() { + let OrderOption { name, options } = &output_ordering[0]; + + if name != self.schema.timestamp_column_name() { return None; } let memtable_stats = self.memtables.iter().map(|m| m.stats()).collect::>(); @@ -238,15 +252,17 @@ impl ChunkReaderBuilder { ); self.iter_ctx.projected_schema = Some(schema.clone()); + let mut output_ordering = None; let reader = if let Some(ordering) = self.output_ordering.take() && let Some(windows) = self.infer_time_windows(&ordering) { - self.build_windowed(&schema, &time_range_predicate, windows, ordering) - .await? + output_ordering = Some(ordering.clone()); + self.build_windowed(&schema, &time_range_predicate, windows, ordering) + .await? } else { self.build_reader(&schema, &time_range_predicate).await? }; - Ok(ChunkReaderImpl::new(schema, reader)) + Ok(ChunkReaderImpl::new(schema, reader, output_ordering)) } /// Build time range predicate from schema and filters. diff --git a/src/storage/src/read/windowed.rs b/src/storage/src/read/windowed.rs index a7a0025f42..44290105bf 100644 --- a/src/storage/src/read/windowed.rs +++ b/src/storage/src/read/windowed.rs @@ -22,7 +22,7 @@ use snafu::ResultExt; use crate::error::{self, Result}; use crate::read::{Batch, BatchReader}; -use crate::schema::ProjectedSchemaRef; +use crate::schema::{ProjectedSchemaRef, StoreSchema}; /// [WindowedReader] provides a windowed record batch reader that scans all rows within a window /// at a time and sort these rows ordered in `[, ]` order. @@ -102,8 +102,8 @@ fn sort_by_rows( arrays: Vec, order_options: &[OrderOption], ) -> Result> { - let sort_columns = build_sorted_columns(order_options); let store_schema = schema.schema_to_read(); + let sort_columns = build_sorted_columns(store_schema, order_options); // Convert columns to rows to speed lexicographic sort // TODO(hl): maybe optimize to lexsort_to_index when only timestamp column is involved. let mut row_converter = RowConverter::new( @@ -150,9 +150,9 @@ fn sort_by_rows( /// Builds sorted columns from `order_options`. /// Returns a vector of columns indices to sort and sort orders (true means descending order). -fn build_sorted_columns(order_options: &[OrderOption]) -> Vec<(usize, bool)> { +fn build_sorted_columns(schema: &StoreSchema, order_options: &[OrderOption]) -> Vec<(usize, bool)> { order_options .iter() - .map(|o| (o.index, o.options.descending)) + .map(|o| (schema.column_index(&o.name), o.options.descending)) .collect() } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 1d4c8eae6c..629a0c364c 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -708,7 +708,7 @@ async fn test_read_by_chunk_reader() { ) .await .check(vec![OrderOption { - index: 0, + name: "timestamp".to_string(), options: SortOptions { descending: true, nulls_first: true, @@ -737,7 +737,7 @@ async fn test_read_by_chunk_reader() { ) .await .check(vec![OrderOption { - index: 0, + name: "timestamp".to_string(), options: SortOptions { descending: true, nulls_first: true, @@ -768,7 +768,7 @@ async fn test_read_by_chunk_reader() { ) .await .check(vec![OrderOption { - index: 0, + name: "timestamp".to_string(), options: SortOptions { descending: true, nulls_first: true, @@ -799,7 +799,7 @@ async fn test_read_by_chunk_reader() { ) .await .check(vec![OrderOption { - index: 0, + name: "timestamp".to_string(), options: SortOptions { descending: false, nulls_first: true, diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs index 876b0b035d..aa8de8587f 100644 --- a/src/storage/src/schema/region.rs +++ b/src/storage/src/schema/region.rs @@ -135,6 +135,11 @@ impl RegionSchema { self.store_schema.timestamp_index() } + #[inline] + pub(crate) fn timestamp_column_name(&self) -> &str { + self.store_schema.column_name(self.timestamp_index()) + } + #[inline] pub(crate) fn value_indices(&self) -> impl Iterator { self.store_schema.value_indices() diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index 4fc99f4ee9..27401de405 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -177,6 +177,13 @@ impl StoreSchema { &self.schema.column_schemas()[idx].name } + /// # Panic + /// Panics if `name` is not a valid column name. + #[inline] + pub(crate) fn column_index(&self, name: &str) -> usize { + self.schema.column_index_by_name(name).unwrap() + } + #[inline] pub(crate) fn num_columns(&self) -> usize { self.schema.num_columns() diff --git a/src/store-api/src/storage/chunk.rs b/src/store-api/src/storage/chunk.rs index 24c391f703..b1db1bb653 100644 --- a/src/store-api/src/storage/chunk.rs +++ b/src/store-api/src/storage/chunk.rs @@ -14,6 +14,7 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; +use common_recordbatch::OrderOption; use datatypes::vectors::VectorRef; use crate::storage::SchemaRef; @@ -45,4 +46,8 @@ pub trait ChunkReader: Send { // project the chunk according to required projection. fn project_chunk(&self, chunk: Chunk) -> Chunk; + + fn output_ordering(&self) -> Option> { + None + } } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index b0c5b1ee98..77ae09e1df 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -4,6 +4,9 @@ version.workspace = true edition.workspace = true license.workspace = true +[features] +testing = [] + [dependencies] anymap = "1.0.0-beta.2" async-trait = "0.1" diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index a9e1b5a4c5..7c09940348 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -53,6 +53,11 @@ impl DfTableProviderAdapter { pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) { self.scan_req.lock().unwrap().output_ordering = Some(order_opts.to_vec()); } + + #[cfg(feature = "testing")] + pub fn get_scan_req(&self) -> ScanRequest { + self.scan_req.lock().unwrap().clone() + } } #[async_trait::async_trait] @@ -96,8 +101,8 @@ impl TableProvider for DfTableProviderAdapter { order_opts .iter() .map(|order_opt| { - let col_name = schema.column_name_by_index(order_opt.index); - let col_expr = Arc::new(Column::new(col_name, order_opt.index)); + let col_index = schema.column_index_by_name(&order_opt.name).unwrap(); + let col_expr = Arc::new(Column::new(&order_opt.name, col_index)); PhysicalSortExpr { expr: col_expr, options: order_opt.options,