diff --git a/Cargo.lock b/Cargo.lock index 523b30086b..dc2a2404f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9670,10 +9670,12 @@ dependencies = [ "common-error", "common-telemetry", "datafusion", + "datafusion-common", "datafusion-expr", "datafusion-substrait", "datatypes", "futures", + "promql", "prost", "session", "snafu", diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index 2351aaee9c..6b2b009a2e 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -167,6 +167,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { value_indices: vec![], options: Default::default(), region_numbers: (1..=100).collect(), + partition_key_indices: vec![], }; RawTableInfo { diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index ce8e0fcc3a..8c10d978b2 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -177,7 +177,7 @@ impl Repl { query_engine.optimize(&plan).context(PlanStatementSnafu)?; let plan = DFLogicalSubstraitConvertor {} - .encode(plan) + .encode(&plan) .context(SubstraitEncodeLogicalPlanSnafu)?; self.database.logical_plan(plan.to_vec(), None).await diff --git a/src/common/meta/src/helper.rs b/src/common/meta/src/helper.rs index acfc55969a..a2dbad814d 100644 --- a/src/common/meta/src/helper.rs +++ b/src/common/meta/src/helper.rs @@ -154,6 +154,7 @@ define_catalog_value!(TableGlobalValue, CatalogValue, SchemaValue); #[cfg(test)] mod tests { + use super::*; #[test] diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index e1ceb4698b..c7650931f2 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -287,6 +287,7 @@ mod tests { value_indices: vec![2, 3], options: Default::default(), region_numbers: vec![1], + partition_key_indices: vec![], }; RawTableInfo { diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml index bbd86dcaa3..4a3f2be2c1 100644 --- a/src/common/substrait/Cargo.toml +++ b/src/common/substrait/Cargo.toml @@ -13,10 +13,12 @@ common-catalog = { path = "../catalog" } common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } datafusion.workspace = true +datafusion-common.workspace = true datafusion-expr.workspace = true datafusion-substrait.workspace = true datatypes = { path = "../../datatypes" } futures = "0.3" +promql = { path = "../../promql" } prost.workspace = true session = { path = "../../session" } snafu.workspace = true diff --git a/src/common/substrait/src/df_substrait.rs b/src/common/substrait/src/df_substrait.rs index 34db1cd39c..b90301f8cc 100644 --- a/src/common/substrait/src/df_substrait.rs +++ b/src/common/substrait/src/df_substrait.rs @@ -28,6 +28,7 @@ use snafu::ResultExt; use substrait_proto::proto::Plan; use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error}; +use crate::extension_serializer::ExtensionSerializer; use crate::SubstraitPlan; pub struct DFLogicalSubstraitConvertor; @@ -46,7 +47,8 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { schema: &str, ) -> Result { let state_config = SessionConfig::new().with_default_catalog_and_schema(catalog, schema); - let state = SessionState::with_config_rt(state_config, Arc::new(RuntimeEnv::default())); + let state = SessionState::with_config_rt(state_config, Arc::new(RuntimeEnv::default())) + .with_serializer_registry(Arc::new(ExtensionSerializer)); let mut context = SessionContext::with_state(state); context.register_catalog_list(catalog_list); let plan = Plan::decode(message).context(DecodeRelSnafu)?; @@ -56,11 +58,14 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { Ok(df_plan) } - fn encode(&self, plan: Self::Plan) -> Result { + fn encode(&self, plan: &Self::Plan) -> Result { let mut buf = BytesMut::new(); - let context = SessionContext::new(); + let session_state = + SessionState::with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default())) + .with_serializer_registry(Arc::new(ExtensionSerializer)); + let context = SessionContext::with_state(session_state); - let substrait_plan = to_substrait_plan(&plan, &context).context(EncodeDfPlanSnafu)?; + let substrait_plan = to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)?; substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?; Ok(buf.freeze()) diff --git a/src/query/src/extension_serializer.rs b/src/common/substrait/src/extension_serializer.rs similarity index 98% rename from src/query/src/extension_serializer.rs rename to src/common/substrait/src/extension_serializer.rs index c94668a6c7..926912230f 100644 --- a/src/query/src/extension_serializer.rs +++ b/src/common/substrait/src/extension_serializer.rs @@ -60,6 +60,7 @@ impl SerializerRegistry for ExtensionSerializer { name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( "EmptyMetric should not be serialized".to_string(), )), + name if name == "MergeScan" => Ok(vec![]), other => Err(DataFusionError::NotImplemented(format!( "Serizlize logical plan for {}", other diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index ebefcb662e..24bff0f0f2 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -17,6 +17,7 @@ mod df_substrait; pub mod error; +pub mod extension_serializer; use std::sync::Arc; @@ -40,5 +41,5 @@ pub trait SubstraitPlan { schema: &str, ) -> Result; - fn encode(&self, plan: Self::Plan) -> Result; + fn encode(&self, plan: &Self::Plan) -> Result; } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 0d6cfdd34c..a4e6bb0df1 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -25,7 +25,8 @@ mod mock; pub mod server; pub mod sql; mod store; -#[cfg(test)] -mod tests; use greptimedb_telemetry::telemetry; + +#[cfg(test)] +mod tests; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index ca2284dedb..52f48769d5 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -266,8 +266,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Cannot find primary key column by name: {}", msg))] - PrimaryKeyNotFound { msg: String, location: Location }, + #[snafu(display("Cannot find column by name: {}", msg))] + ColumnNotFound { msg: String, location: Location }, #[snafu(display("Failed to execute statement, source: {}", source))] ExecuteStatement { @@ -592,7 +592,7 @@ impl ErrorExt for Error { | Error::CatalogNotFound { .. } | Error::SchemaNotFound { .. } | Error::SchemaExists { .. } - | Error::PrimaryKeyNotFound { .. } + | Error::ColumnNotFound { .. } | Error::MissingMetasrvOpts { .. } | Error::BuildRegex { .. } | Error::InvalidSchema { .. } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index cb7abdf47b..d7814023e2 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -35,7 +35,7 @@ use common_error::ext::BoxedError; use common_meta::helper::{SchemaKey, SchemaValue}; use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; -use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest}; +use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest}; use common_meta::rpc::store::CompareAndPutRequest; use common_meta::table_name::TableName; use common_query::Output; @@ -65,7 +65,7 @@ use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::error::{ self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu, - DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, + ColumnNotFoundSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu, @@ -108,7 +108,9 @@ impl DistInstance { &create_table.table_name, ); - let mut table_info = create_table_info(create_table)?; + let (partitions, partition_cols) = parse_partitions(create_table, partitions)?; + + let mut table_info = create_table_info(create_table, partition_cols)?; let resp = self .create_table_procedure(create_table, partitions, table_info.clone()) @@ -562,10 +564,9 @@ impl DistInstance { async fn create_table_procedure( &self, create_table: &CreateTableExpr, - partitions: Option, + partitions: Vec, table_info: RawTableInfo, ) -> Result { - let partitions = parse_partitions(create_table, partitions)?; let partitions = partitions.into_iter().map(Into::into).collect(); let request = SubmitDdlTaskRequest { @@ -759,7 +760,10 @@ fn create_partitions_stmt(partitions: Vec) -> Result Result { +fn create_table_info( + create_table: &CreateTableExpr, + partition_columns: Vec, +) -> Result { let mut column_schemas = Vec::with_capacity(create_table.column_defs.len()); let mut column_name_to_index_map = HashMap::new(); @@ -791,7 +795,17 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result { column_name_to_index_map .get(name) .cloned() - .context(PrimaryKeyNotFoundSnafu { msg: name }) + .context(ColumnNotFoundSnafu { msg: name }) + }) + .collect::>>()?; + + let partition_key_indices = partition_columns + .into_iter() + .map(|col_name| { + column_name_to_index_map + .get(&col_name) + .cloned() + .context(ColumnNotFoundSnafu { msg: col_name }) }) .collect::>>()?; @@ -806,6 +820,7 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result { options: TableOptions::try_from(&create_table.table_options) .context(UnrecognizedTableOptionSnafu)?, created_on: DateTime::default(), + partition_key_indices, }; let desc = if create_table.desc.is_empty() { @@ -833,17 +848,20 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result { fn parse_partitions( create_table: &CreateTableExpr, partitions: Option, -) -> Result> { +) -> Result<(Vec, Vec)> { // If partitions are not defined by user, use the timestamp column (which has to be existed) as // the partition column, and create only one partition. - let partition_columns = find_partition_columns(create_table, &partitions)?; + let partition_columns = find_partition_columns(&partitions)?; let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?; - partition_entries - .into_iter() - .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) - .collect::>() - .context(DeserializePartitionSnafu) + Ok(( + partition_entries + .into_iter() + .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) + .collect::>() + .context(DeserializePartitionSnafu)?, + partition_columns, + )) } fn find_partition_entries( @@ -895,10 +913,7 @@ fn find_partition_entries( Ok(entries) } -fn find_partition_columns( - create_table: &CreateTableExpr, - partitions: &Option, -) -> Result> { +fn find_partition_columns(partitions: &Option) -> Result> { let columns = if let Some(partitions) = partitions { partitions .column_list @@ -906,7 +921,7 @@ fn find_partition_columns( .map(|x| x.value.clone()) .collect::>() } else { - vec![create_table.time_index.clone()] + vec![] }; Ok(columns) } @@ -952,7 +967,7 @@ ENGINE=mito", match &result[0] { Statement::CreateTable(c) => { let expr = expr_factory::create_to_expr(c, QueryContext::arc()).unwrap(); - let partitions = parse_partitions(&expr, c.partitions.clone()).unwrap(); + let (partitions, _) = parse_partitions(&expr, c.partitions.clone()).unwrap(); let json = serde_json::to_string(&partitions).unwrap(); assert_eq!(json, expected); } diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index ac3141c4b0..510b1379b1 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -55,7 +55,7 @@ impl DatanodeInstance { let logical_plan = self.build_logical_plan(&plan)?; let substrait_plan = DFLogicalSubstraitConvertor - .encode(logical_plan) + .encode(&logical_plan) .context(error::EncodeSubstraitLogicalPlanSnafu)?; let result = self diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index a45764609f..2058de1fa5 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -40,8 +40,9 @@ pub mod table_routes; pub use crate::error::Result; mod inactive_node_manager; -#[cfg(test)] -mod test_util; mod greptimedb_telemetry; use greptimedb_telemetry::telemetry; + +#[cfg(test)] +mod test_util; diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 123b112cef..9f5e76fdbd 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -140,6 +140,7 @@ pub(crate) mod tests { engine_options: HashMap::new(), options: TableOptions::default(), created_on: DateTime::default(), + partition_key_indices: vec![], }, table_type: TableType::Base, }; diff --git a/src/mito/src/manifest/action.rs b/src/mito/src/manifest/action.rs index 288e85dd3c..03f553ce10 100644 --- a/src/mito/src/manifest/action.rs +++ b/src/mito/src/manifest/action.rs @@ -182,7 +182,7 @@ mod tests { // modification to manifest-related structs is compatible with older manifests. #[test] fn test_table_manifest_compatibility() { - let table_change = r#"{"table_info":{"ident":{"table_id":0,"version":0},"name":"demo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"host","data_type":{"String":null},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"cpu","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"memory","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":true,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":3,"version":0},"primary_key_indices":[0],"value_indices":[1,2,3],"engine":"mito","next_column_id":1,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"2023-03-06T08:50:34.662020Z"},"table_type":"Base"}}"#; + let table_change = r#"{"table_info":{"ident":{"table_id":0,"version":0},"name":"demo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"host","data_type":{"String":null},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"cpu","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"memory","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":true,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":3,"version":0},"primary_key_indices":[0],"value_indices":[1,2,3],"engine":"mito","partition_key_indices":[],"next_column_id":1,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"2023-03-06T08:50:34.662020Z"},"table_type":"Base"}}"#; let _ = serde_json::from_str::(table_change).unwrap(); let table_remove = diff --git a/src/partition/src/columns.rs b/src/partition/src/columns.rs index 99d720adc0..29bee0cab9 100644 --- a/src/partition/src/columns.rs +++ b/src/partition/src/columns.rs @@ -167,45 +167,46 @@ impl PartitionRule for RangeColumnsPartitionRule { } fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result> { - let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) { - let PartitionExpr { - column: _, - op, - value, - } = exprs - .iter() - .find(|x| x.column == self.column_list[0]) - // "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs" - .unwrap(); + let regions = + if !exprs.is_empty() && exprs.iter().all(|x| self.column_list.contains(&x.column)) { + let PartitionExpr { + column: _, + op, + value, + } = exprs + .iter() + .find(|x| x.column == self.column_list[0]) + // "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs" + .unwrap(); - let regions = &self.first_column_regions; - match self - .first_column_bounds - .binary_search(&PartitionBound::Value(value.clone())) - { - Ok(i) => match op { - Operator::Lt => ®ions[..=i], - Operator::LtEq => ®ions[..=(i + 1)], - Operator::Eq => ®ions[(i + 1)..=(i + 1)], - Operator::Gt | Operator::GtEq => ®ions[(i + 1)..], - Operator::NotEq => ®ions[..], - _ => unimplemented!(), - }, - Err(i) => match op { - Operator::Lt | Operator::LtEq => ®ions[..=i], - Operator::Eq => ®ions[i..=i], - Operator::Gt | Operator::GtEq => ®ions[i..], - Operator::NotEq => ®ions[..], - _ => unimplemented!(), - }, - } - .iter() - .flatten() - .cloned() - .collect::>() - } else { - self.regions.clone() - }; + let regions = &self.first_column_regions; + match self + .first_column_bounds + .binary_search(&PartitionBound::Value(value.clone())) + { + Ok(i) => match op { + Operator::Lt => ®ions[..=i], + Operator::LtEq => ®ions[..=(i + 1)], + Operator::Eq => ®ions[(i + 1)..=(i + 1)], + Operator::Gt | Operator::GtEq => ®ions[(i + 1)..], + Operator::NotEq => ®ions[..], + _ => unimplemented!(), + }, + Err(i) => match op { + Operator::Lt | Operator::LtEq => ®ions[..=i], + Operator::Eq => ®ions[i..=i], + Operator::Gt | Operator::GtEq => ®ions[i..], + Operator::NotEq => ®ions[..], + _ => unimplemented!(), + }, + } + .iter() + .flatten() + .cloned() + .collect::>() + } else { + self.regions.clone() + }; Ok(regions) } } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 699d643527..60cea2f8fa 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -163,13 +163,6 @@ impl PartitionRuleManager { let partitions = self.find_table_partitions(table_id).await?; let partition_columns = partitions[0].partition.partition_columns(); - ensure!( - !partition_columns.is_empty(), - error::InvalidTableRouteDataSnafu { - table_id, - err_msg: "no partition columns found" - } - ); let regions = partitions .iter() diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index 0df9f71233..8eaf5da592 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -51,6 +51,12 @@ impl WriteSplitter { let row_nums = check_req(&insert)?; let mut insert = insert; let partition_columns = self.partition_rule.partition_columns(); + if partition_columns.is_empty() { + // If no partition column, all rows are inserted into the first region. + let mut split = HashMap::new(); + split.insert(0, insert); + return Ok(split); + } let missing_columns = schema .column_schemas() .iter() @@ -87,6 +93,12 @@ impl WriteSplitter { Self::validate_delete_request(&request)?; let partition_columns = self.partition_rule.partition_columns(); + if partition_columns.is_empty() { + // If no partition column, all requests are sent to the first region. + let mut split = HashMap::new(); + split.insert(0, request); + return Ok(split); + } let values = find_partitioning_values(&request.key_column_values, &partition_columns)?; let regional_value_indexes = self.split_partitioning_values(&values)?; @@ -280,7 +292,8 @@ mod tests { use datatypes::types::{BooleanType, Int16Type, StringType}; use datatypes::value::Value; use datatypes::vectors::{ - BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVectorBuilder, Vector, + BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVector, StringVectorBuilder, + Vector, }; use serde::{Deserialize, Serialize}; use store_api::storage::RegionNumber; @@ -359,7 +372,7 @@ mod tests { } #[test] - fn test_writer_spliter_without_partition_columns() { + fn test_writer_spliter_with_id_partition_columns() { let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns(); let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; let spliter = WriteSplitter::with_partition_rule(rule); @@ -392,6 +405,53 @@ mod tests { ); } + #[test] + fn test_writer_spliter_without_partition_columns() { + let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns(); + let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; + let spliter = WriteSplitter::with_partition_rule(rule); + let ret = spliter.split_insert(insert, &mock_schema).unwrap(); + + assert_eq!(1, ret.len()); + + let r1_insert = ret.get(&0).unwrap(); + + assert_eq!("demo", r1_insert.table_name); + + let r1_columns = &r1_insert.columns_values; + assert_eq!(2, r1_columns.len()); + assert_columns( + r1_columns, + &[ + ( + "host", + &[Value::from("host1"), Value::Null, Value::from("host3")], + ), + ( + "enable_reboot", + &[Value::from(true), Value::from(false), Value::from(true)], + ), + ], + ); + } + + #[test] + fn test_delete_spliter_without_partition_columns() { + let mut key_column_values = HashMap::new(); + key_column_values.insert( + "host".to_string(), + Arc::new(StringVector::from(vec!["localhost"])) as _, + ); + let delete = DeleteRequest { key_column_values }; + let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; + let spliter = WriteSplitter::with_partition_rule(rule); + let ret = spliter + .split_delete(delete, vec![&String::from("host")]) + .unwrap(); + + assert_eq!(1, ret.len()); + } + #[test] fn test_partition_insert_request() { let insert = mock_insert_request(); @@ -631,4 +691,28 @@ mod tests { unimplemented!() } } + + #[derive(Debug, Serialize, Deserialize)] + struct EmptyPartitionRule; + + // PARTITION BY LIST COLUMNS() ( + // PARTITION r0 VALUES LESS THAN MAXVALUE, + // ); + impl PartitionRule for EmptyPartitionRule { + fn as_any(&self) -> &dyn Any { + self + } + + fn partition_columns(&self) -> Vec { + vec![] + } + + fn find_region(&self, _: &[Value]) -> Result { + Ok(0) + } + + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result, Error> { + unimplemented!() + } + } } diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index 0aa086eb43..ca1480c1bd 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -19,4 +19,5 @@ mod planner; mod utils; pub use analyzer::DistPlannerAnalyzer; +pub use merge_scan::MergeScanLogicalPlan; pub use planner::DistExtensionPlanner; diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 9095ddb6e4..8b8c5a6e29 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -14,10 +14,14 @@ use std::sync::{Arc, Mutex}; +use datafusion::datasource::DefaultTableSource; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion}; use datafusion_expr::{Extension, LogicalPlan}; use datafusion_optimizer::analyzer::AnalyzerRule; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; +use table::metadata::TableType; +use table::table::adapter::DfTableProviderAdapter; use crate::dist_plan::commutativity::{ partial_commutative_transformer, Categorizer, Commutativity, @@ -161,6 +165,8 @@ struct CommutativeVisitor { next_stage: Vec, // hash of the stop node stop_node: Option, + /// Partition columns of current visiting table + current_partition_cols: Option>, } impl TreeNodeVisitor for CommutativeVisitor { @@ -170,18 +176,46 @@ impl TreeNodeVisitor for CommutativeVisitor { // find the first merge scan and stop traversing down // todo: check if it works for join Ok(match plan { - LogicalPlan::Extension(ext) => { - if ext.node.name() == MergeScanLogicalPlan::name() { - VisitRecursion::Skip - } else { - VisitRecursion::Continue + LogicalPlan::TableScan(table_scan) => { + // TODO(ruihang): spawn a sub visitor to retrieve partition columns + if let Some(source) = table_scan + .source + .as_any() + .downcast_ref::() + { + if let Some(provider) = source + .table_provider + .as_any() + .downcast_ref::() + { + if provider.table().table_type() == TableType::Base { + let info = provider.table().table_info(); + let partition_key_indices = info.meta.partition_key_indices.clone(); + let schema = info.meta.schema.clone(); + let partition_cols = partition_key_indices + .into_iter() + .map(|index| schema.column_name_by_index(index).to_string()) + .collect::>(); + self.current_partition_cols = Some(partition_cols); + } + } } + VisitRecursion::Continue } _ => VisitRecursion::Continue, }) } fn post_visit(&mut self, plan: &LogicalPlan) -> datafusion_common::Result { + if DFLogicalSubstraitConvertor.encode(plan).is_err() { + common_telemetry::info!( + "substrait error: {:?}", + DFLogicalSubstraitConvertor.encode(plan) + ); + self.stop_node = Some(utils::hash_plan(plan)); + return Ok(VisitRecursion::Stop); + } + match Categorizer::check_plan(plan) { Commutativity::Commutative => {} Commutativity::PartialCommutative => { @@ -201,6 +235,16 @@ impl TreeNodeVisitor for CommutativeVisitor { self.next_stage.push(plan) } }, + Commutativity::CheckPartition => { + if let Some(partition_cols) = &self.current_partition_cols + && partition_cols.is_empty() { + // no partition columns, and can be encoded skip + return Ok(VisitRecursion::Continue); + } else { + self.stop_node = Some(utils::hash_plan(plan)); + return Ok(VisitRecursion::Stop); + } + }, Commutativity::NonCommutative | Commutativity::Unimplemented | Commutativity::Unsupported => { @@ -218,6 +262,7 @@ impl CommutativeVisitor { Self { next_stage: vec![], stop_node: None, + current_partition_cols: None, } } } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 8f63aac051..f7a1d52418 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -19,6 +19,8 @@ use promql::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; +use crate::dist_plan::MergeScanLogicalPlan; + #[allow(dead_code)] pub enum Commutativity { Commutative, @@ -27,6 +29,7 @@ pub enum Commutativity { TransformedCommutative(Option), NonCommutative, Unimplemented, + CheckPartition, /// For unrelated plans like DDL Unsupported, } @@ -36,7 +39,15 @@ pub struct Categorizer {} impl Categorizer { pub fn check_plan(plan: &LogicalPlan) -> Commutativity { match plan { - LogicalPlan::Projection(_) => Commutativity::Unimplemented, + LogicalPlan::Projection(proj) => { + for expr in &proj.expr { + let commutativity = Self::check_expr(expr); + if !matches!(commutativity, Commutativity::Commutative) { + return commutativity; + } + } + Commutativity::Commutative + } // TODO(ruihang): Change this to Commutative once Like is supported in substrait LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate), LogicalPlan::Window(_) => Commutativity::Unimplemented, @@ -44,7 +55,7 @@ impl Categorizer { // check all children exprs and uses the strictest level Commutativity::Unimplemented } - LogicalPlan::Sort(_) => Commutativity::NonCommutative, + LogicalPlan::Sort(_) => Commutativity::Unimplemented, LogicalPlan::Join(_) => Commutativity::NonCommutative, LogicalPlan::CrossJoin(_) => Commutativity::NonCommutative, LogicalPlan::Repartition(_) => { @@ -52,7 +63,7 @@ impl Categorizer { Commutativity::Unimplemented } LogicalPlan::Union(_) => Commutativity::Unimplemented, - LogicalPlan::TableScan(_) => Commutativity::NonCommutative, + LogicalPlan::TableScan(_) => Commutativity::CheckPartition, LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative, LogicalPlan::Subquery(_) => Commutativity::Unimplemented, LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented, @@ -79,7 +90,8 @@ impl Categorizer { || name == InstantManipulate::name() || name == SeriesNormalize::name() || name == RangeManipulate::name() - || name == SeriesDivide::name() => + || name == SeriesDivide::name() + || name == MergeScanLogicalPlan::name() => { Commutativity::Commutative } @@ -89,8 +101,7 @@ impl Categorizer { pub fn check_expr(expr: &Expr) -> Commutativity { match expr { - Expr::Alias(_, _) - | Expr::Column(_) + Expr::Column(_) | Expr::ScalarVariable(_, _) | Expr::Literal(_) | Expr::BinaryExpr(_) @@ -124,7 +135,9 @@ impl Categorizer { | Expr::InSubquery(_) | Expr::ScalarSubquery(_) | Expr::Wildcard => Commutativity::Unimplemented, - Expr::QualifiedWildcard { .. } + + Expr::Alias(_, _) + | Expr::QualifiedWildcard { .. } | Expr::GroupingSet(_) | Expr::Placeholder(_) | Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented, diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 1954d14a3f..236e32129b 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::sync::Arc; -use arrow_schema::SchemaRef as ArrowSchemaRef; +use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::try_stream; use client::client_manager::DatanodeClients; use client::Database; @@ -28,12 +28,13 @@ use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ - DfSendableRecordBatchStream, RecordBatchStreamAdaptor, SendableRecordBatchStream, + DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream, }; use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::PhysicalSortExpr; +use datatypes::schema::Schema; use futures_util::StreamExt; use snafu::ResultExt; @@ -123,6 +124,8 @@ impl MergeScanExec { arrow_schema: ArrowSchemaRef, clients: Arc, ) -> Self { + // remove all metadata + let arrow_schema = Arc::new(ArrowSchema::new(arrow_schema.fields().to_vec())); Self { table, peers, @@ -163,12 +166,12 @@ impl MergeScanExec { } Output::RecordBatches(record_batches) => { for batch in record_batches.into_iter() { - yield batch; + yield Self::remove_metadata_from_record_batch(batch); } } Output::Stream(mut stream) => { while let Some(batch) = stream.next().await { - yield batch?; + yield Self::remove_metadata_from_record_batch(batch?); } } } @@ -186,6 +189,15 @@ impl MergeScanExec { output_ordering: None, })) } + + fn remove_metadata_from_record_batch(batch: RecordBatch) -> RecordBatch { + let schema = ArrowSchema::new(batch.schema.arrow_schema().fields().to_vec()); + RecordBatch::new( + Arc::new(Schema::try_from(schema).unwrap()), + batch.columns().iter().cloned(), + ) + .unwrap() + } } impl ExecutionPlan for MergeScanExec { diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 020f8a326f..898bca3514 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -82,12 +82,12 @@ impl ExtensionPlanner for DistExtensionPlanner { } else { // TODO(ruihang): generate different execution plans for different variant merge operation let input_plan = merge_scan.input(); + let input_physical_plan = planner + .create_physical_plan(input_plan, session_state) + .await?; let Some(table_name) = self.get_table_name(input_plan)? else { // no relation found in input plan, going to execute them locally - return planner - .create_physical_plan(input_plan, session_state) - .await - .map(Some); + return Ok(Some(input_physical_plan)); }; if table_name.schema_name == INFORMATION_SCHEMA_NAME { @@ -97,10 +97,10 @@ impl ExtensionPlanner for DistExtensionPlanner { .map(Some); } - let input_schema = input_plan.schema().clone(); + let input_schema = input_physical_plan.schema().clone(); let input_plan = self.set_table_name(&table_name, input_plan.clone())?; let substrait_plan: Bytes = DFLogicalSubstraitConvertor - .encode(input_plan.clone()) + .encode(&input_plan) .context(error::EncodeSubstraitLogicalPlanSnafu)? .into(); let peers = self.get_peers(&table_name).await; @@ -110,7 +110,7 @@ impl ExtensionPlanner for DistExtensionPlanner { table_name, peers, substrait_plan, - Arc::new(input_schema.as_ref().into()), + input_schema, self.clients.clone(), ); diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index a71d6b88d7..5b04c43623 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -19,7 +19,6 @@ pub mod datafusion; pub mod dist_plan; pub mod error; pub mod executor; -pub mod extension_serializer; pub mod logical_optimizer; mod metrics; mod optimizer; diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index deff2533fc..e20ed083e4 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -39,11 +39,11 @@ use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; use partition::manager::PartitionRuleManager; use promql::extension_plan::PromExtensionPlanner; +use substrait::extension_serializer::ExtensionSerializer; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; -use crate::extension_serializer::ExtensionSerializer; use crate::optimizer::order_hint::OrderHintRule; use crate::optimizer::type_conversion::TypeConversionRule; use crate::query_engine::options::QueryOptions; diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index fb8c84fb51..3c8ae3106d 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -115,6 +115,8 @@ pub struct TableMeta { pub options: TableOptions, #[builder(default = "Utc::now()")] pub created_on: DateTime, + #[builder(default = "Vec::new()")] + pub partition_key_indices: Vec, } impl TableMetaBuilder { @@ -518,6 +520,7 @@ pub struct RawTableMeta { pub engine_options: HashMap, pub options: TableOptions, pub created_on: DateTime, + pub partition_key_indices: Vec, } impl From for RawTableMeta { @@ -532,6 +535,7 @@ impl From for RawTableMeta { engine_options: meta.engine_options, options: meta.options, created_on: meta.created_on, + partition_key_indices: meta.partition_key_indices, } } } @@ -550,6 +554,7 @@ impl TryFrom for TableMeta { engine_options: raw.engine_options, options: raw.options, created_on: raw.created_on, + partition_key_indices: raw.partition_key_indices, }) } } diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 2ccaa7b88c..e2d4e94b5e 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -108,7 +108,7 @@ async fn test_show_create_table(instance: Arc) { | | ts BIGINT NOT NULL, | | | TIME INDEX (ts) | | | ) | -| | PARTITION BY RANGE COLUMNS (ts) ( | +| | PARTITION BY RANGE COLUMNS () ( | | | PARTITION r0 VALUES LESS THAN (MAXVALUE) | | | ) | | | ENGINE=mito | diff --git a/tests/cases/distributed/explain/order_by.result b/tests/cases/distributed/explain/order_by.result index 4ad9b4cf40..b3413f906d 100644 --- a/tests/cases/distributed/explain/order_by.result +++ b/tests/cases/distributed/explain/order_by.result @@ -14,8 +14,9 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1; +-+-+ | logical_plan_| Sort: integers.i % Int64(2) ASC NULLS LAST_| |_|_Aggregate: groupBy=[[integers.i % Int64(2)]], aggr=[[]]_| -|_|_Projection: integers.i % Int64(2)_| |_|_MergeScan [is_placeholder=false]_| +|_|_Aggregate: groupBy=[[integers.i % Int64(2)]], aggr=[[]]_| +|_|_Projection: integers.i % Int64(2)_| |_|_TableScan: integers projection=[i]_| | physical_plan | SortPreservingMergeExec: [integers.i % Int64(2)@0 ASC NULLS LAST]_| |_|_SortExec: expr=[integers.i % Int64(2)@0 ASC NULLS LAST]_| @@ -23,7 +24,6 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1; |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED |_|_AggregateExec: mode=Partial, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_| -|_|_ProjectionExec: expr=[i@0 % 2 as integers.i % Int64(2)]_| |_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: peers=[REDACTED |_|_| @@ -48,11 +48,9 @@ EXPLAIN SELECT a, b FROM test ORDER BY a, b; | plan_type_| plan_| +-+-+ | logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_| -|_|_Projection: test.a, test.b_| |_|_MergeScan [is_placeholder=false]_| -|_|_TableScan: test projection=[a, b, t]_| +|_|_TableScan: test projection=[a, b]_| | physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] | -|_|_ProjectionExec: expr=[a@0 as a, b@1 as b]_| |_|_MergeScanExec: peers=[REDACTED |_|_| +-+-+ @@ -69,9 +67,9 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b; +-+-+ | logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_| |_|_Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]_| -|_|_Projection: test.a, test.b_| |_|_MergeScan [is_placeholder=false]_| -|_|_TableScan: test projection=[a, b, t]_| +|_|_Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]_| +|_|_TableScan: test projection=[a, b]_| | physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST]_| |_|_SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST]_| |_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]_| @@ -79,7 +77,6 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b; |_|_RepartitionExec: partitioning=REDACTED |_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]_| |_|_RepartitionExec: partitioning=REDACTED -|_|_ProjectionExec: expr=[a@0 as a, b@1 as b]_| |_|_MergeScanExec: peers=[REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/explain/single_partition.result b/tests/cases/distributed/explain/single_partition.result new file mode 100644 index 0000000000..afc92613cd --- /dev/null +++ b/tests/cases/distributed/explain/single_partition.result @@ -0,0 +1,72 @@ +CREATE TABLE single_partition(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY); + +Affected Rows: 0 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +EXPLAIN SELECT COUNT(*) FROM single_partition; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]_| +|_|_Projection:_| +|_|_MergeScan [is_placeholder=false]_| +|_|_TableScan: single_partition projection=[i, j, k]_| +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]_| +|_|_CoalescePartitionsExec_| +|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_ProjectionExec: expr=[]_| +|_|_MergeScanExec: peers=[REDACTED +|_|_| ++-+-+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +EXPLAIN SELECT SUM(i) FROM single_partition; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Aggregate: groupBy=[[]], aggr=[[SUM(single_partition.i)]]_| +|_|_Projection: single_partition.i_| +|_|_MergeScan [is_placeholder=false]_| +|_|_TableScan: single_partition projection=[i, j, k]_| +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[SUM(single_partition.i)]_| +|_|_CoalescePartitionsExec_| +|_|_AggregateExec: mode=Partial, gby=[], aggr=[SUM(single_partition.i)]_| +|_|_RepartitionExec: partitioning=REDACTED +|_|_ProjectionExec: expr=[i@0 as i]_| +|_|_MergeScanExec: peers=[REDACTED +|_|_| ++-+-+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +EXPLAIN SELECT * FROM single_partition ORDER BY i DESC; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: single_partition.i DESC NULLS FIRST_| +|_|_MergeScan [is_placeholder=false]_| +|_|_TableScan: single_partition projection=[i, j, k] | +| physical_plan | SortExec: expr=[i@0 DESC]_| +|_|_MergeScanExec: peers=[REDACTED +|_|_| ++-+-+ + +drop table single_partition; + +Affected Rows: 1 + diff --git a/tests/cases/distributed/explain/single_partition.sql b/tests/cases/distributed/explain/single_partition.sql new file mode 100644 index 0000000000..9036d82442 --- /dev/null +++ b/tests/cases/distributed/explain/single_partition.sql @@ -0,0 +1,24 @@ +CREATE TABLE single_partition(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY); + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +EXPLAIN SELECT COUNT(*) FROM single_partition; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +EXPLAIN SELECT SUM(i) FROM single_partition; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +EXPLAIN SELECT * FROM single_partition ORDER BY i DESC; + +drop table single_partition; diff --git a/tests/cases/distributed/optimizer/filter_push_down.result b/tests/cases/distributed/optimizer/filter_push_down.result index 4c06353723..427b8c1a54 100644 --- a/tests/cases/distributed/optimizer/filter_push_down.result +++ b/tests/cases/distributed/optimizer/filter_push_down.result @@ -110,7 +110,12 @@ SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i; -Error: 3001(EngineExecuteQuery), No field named __correlated_sq_3.i. Valid fields are integers.i, integers.j. ++---+---+ +| i | j | ++---+---+ +| 1 | 1 | +| 2 | 2 | ++---+---+ SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i IN ((SELECT i FROM integers)) AND i1.i=i2.i ORDER BY 1; diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result index c1fbc190c4..3d130d366f 100644 --- a/tests/cases/distributed/optimizer/order_by.result +++ b/tests/cases/distributed/optimizer/order_by.result @@ -1,45 +1,39 @@ explain select * from numbers; ++---------------+------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| | TableScan: numbers projection=[number] | +| physical_plan | ExecutionPlan(PlaceHolder) | +| | | ++---------------+------------------------------------------+ + +explain select * from numbers order by number desc; + +---------------+--------------------------------------------+ | plan_type | plan | +---------------+--------------------------------------------+ -| logical_plan | Projection: numbers.number | +| logical_plan | Sort: numbers.number DESC NULLS FIRST | | | MergeScan [is_placeholder=false] | | | TableScan: numbers projection=[number] | -| physical_plan | ProjectionExec: expr=[number@0 as number] | +| physical_plan | SortExec: expr=[number@0 DESC] | | | ExecutionPlan(PlaceHolder) | | | | +---------------+--------------------------------------------+ -explain select * from numbers order by number desc; - -+---------------+----------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------+ -| logical_plan | Sort: numbers.number DESC NULLS FIRST | -| | Projection: numbers.number | -| | MergeScan [is_placeholder=false] | -| | TableScan: numbers projection=[number] | -| physical_plan | SortExec: expr=[number@0 DESC] | -| | ProjectionExec: expr=[number@0 as number] | -| | ExecutionPlan(PlaceHolder) | -| | | -+---------------+----------------------------------------------+ - explain select * from numbers order by number asc; -+---------------+----------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------+ -| logical_plan | Sort: numbers.number ASC NULLS LAST | -| | Projection: numbers.number | -| | MergeScan [is_placeholder=false] | -| | TableScan: numbers projection=[number] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | -| | ProjectionExec: expr=[number@0 as number] | -| | ExecutionPlan(PlaceHolder) | -| | | -+---------------+----------------------------------------------+ ++---------------+--------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------+ +| logical_plan | Sort: numbers.number ASC NULLS LAST | +| | MergeScan [is_placeholder=false] | +| | TableScan: numbers projection=[number] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | ExecutionPlan(PlaceHolder) | +| | | ++---------------+--------------------------------------------+ explain select * from numbers order by number desc limit 10; @@ -48,13 +42,11 @@ explain select * from numbers order by number desc limit 10; +---------------+---------------------------------------------------+ | logical_plan | Limit: skip=0, fetch=10 | | | Sort: numbers.number DESC NULLS FIRST, fetch=10 | -| | Projection: numbers.number | -| | MergeScan [is_placeholder=false] | -| | TableScan: numbers projection=[number] | +| | MergeScan [is_placeholder=false] | +| | TableScan: numbers projection=[number] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 DESC] | -| | ProjectionExec: expr=[number@0 as number] | -| | ExecutionPlan(PlaceHolder) | +| | ExecutionPlan(PlaceHolder) | | | | +---------------+---------------------------------------------------+ @@ -65,13 +57,11 @@ explain select * from numbers order by number asc limit 10; +---------------+------------------------------------------------------+ | logical_plan | Limit: skip=0, fetch=10 | | | Sort: numbers.number ASC NULLS LAST, fetch=10 | -| | Projection: numbers.number | -| | MergeScan [is_placeholder=false] | -| | TableScan: numbers projection=[number] | +| | MergeScan [is_placeholder=false] | +| | TableScan: numbers projection=[number] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] | -| | ProjectionExec: expr=[number@0 as number] | -| | ExecutionPlan(PlaceHolder) | +| | ExecutionPlan(PlaceHolder) | | | | +---------------+------------------------------------------------------+ diff --git a/tests/cases/distributed/tql-explain-analyze/explain.result b/tests/cases/distributed/tql-explain-analyze/explain.result index 2be8b54bfa..f9824c7bc6 100644 --- a/tests/cases/distributed/tql-explain-analyze/explain.result +++ b/tests/cases/distributed/tql-explain-analyze/explain.result @@ -55,15 +55,14 @@ TQL EXPLAIN host_load1{__field__="val"}; | | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] | | | PromSeriesDivide: tags=["collector", "host"] | | | Sort: host_load1.collector DESC NULLS LAST, host_load1.host DESC NULLS LAST, host_load1.ts DESC NULLS LAST | -| | Projection: host_load1.val, host_load1.collector, host_load1.host, host_load1.ts | -| | MergeScan [is_placeholder=false] | +| | MergeScan [is_placeholder=false] | +| | Projection: host_load1.val, host_load1.collector, host_load1.host, host_load1.ts | | | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | | | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] | | | RepartitionExec: partitioning=REDACTED | | PromSeriesDivideExec: tags=["collector", "host"] | -| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] | -| | MergeScanExec: peers=[REDACTED +| | MergeScanExec: peers=[REDACTED | | | +---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/limit/limit.result b/tests/cases/standalone/limit/limit.result similarity index 100% rename from tests/cases/standalone/common/limit/limit.result rename to tests/cases/standalone/limit/limit.result diff --git a/tests/cases/standalone/common/limit/limit.sql b/tests/cases/standalone/limit/limit.sql similarity index 100% rename from tests/cases/standalone/common/limit/limit.sql rename to tests/cases/standalone/limit/limit.sql