feat: prepare for implementing considering partition key in the distributed planner (#2000)

* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix frontend logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* check substrait compatibility before pushdown

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* going to revert some rules

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test and clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix compile error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove println

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2023-07-31 20:36:23 +08:00
committed by GitHub
parent bddaf265a9
commit 5bd80a74ab
36 changed files with 432 additions and 160 deletions

2
Cargo.lock generated
View File

@@ -9670,10 +9670,12 @@ dependencies = [
"common-error",
"common-telemetry",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datafusion-substrait",
"datatypes",
"futures",
"promql",
"prost",
"session",
"snafu",

View File

@@ -167,6 +167,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
value_indices: vec![],
options: Default::default(),
region_numbers: (1..=100).collect(),
partition_key_indices: vec![],
};
RawTableInfo {

View File

@@ -177,7 +177,7 @@ impl Repl {
query_engine.optimize(&plan).context(PlanStatementSnafu)?;
let plan = DFLogicalSubstraitConvertor {}
.encode(plan)
.encode(&plan)
.context(SubstraitEncodeLogicalPlanSnafu)?;
self.database.logical_plan(plan.to_vec(), None).await

View File

@@ -154,6 +154,7 @@ define_catalog_value!(TableGlobalValue, CatalogValue, SchemaValue);
#[cfg(test)]
mod tests {
use super::*;
#[test]

View File

@@ -287,6 +287,7 @@ mod tests {
value_indices: vec![2, 3],
options: Default::default(),
region_numbers: vec![1],
partition_key_indices: vec![],
};
RawTableInfo {

View File

@@ -13,10 +13,12 @@ common-catalog = { path = "../catalog" }
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datatypes = { path = "../../datatypes" }
futures = "0.3"
promql = { path = "../../promql" }
prost.workspace = true
session = { path = "../../session" }
snafu.workspace = true

View File

@@ -28,6 +28,7 @@ use snafu::ResultExt;
use substrait_proto::proto::Plan;
use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error};
use crate::extension_serializer::ExtensionSerializer;
use crate::SubstraitPlan;
pub struct DFLogicalSubstraitConvertor;
@@ -46,7 +47,8 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
schema: &str,
) -> Result<Self::Plan, Self::Error> {
let state_config = SessionConfig::new().with_default_catalog_and_schema(catalog, schema);
let state = SessionState::with_config_rt(state_config, Arc::new(RuntimeEnv::default()));
let state = SessionState::with_config_rt(state_config, Arc::new(RuntimeEnv::default()))
.with_serializer_registry(Arc::new(ExtensionSerializer));
let mut context = SessionContext::with_state(state);
context.register_catalog_list(catalog_list);
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
@@ -56,11 +58,14 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
Ok(df_plan)
}
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error> {
fn encode(&self, plan: &Self::Plan) -> Result<Bytes, Self::Error> {
let mut buf = BytesMut::new();
let context = SessionContext::new();
let session_state =
SessionState::with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()))
.with_serializer_registry(Arc::new(ExtensionSerializer));
let context = SessionContext::with_state(session_state);
let substrait_plan = to_substrait_plan(&plan, &context).context(EncodeDfPlanSnafu)?;
let substrait_plan = to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)?;
substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
Ok(buf.freeze())

View File

@@ -60,6 +60,7 @@ impl SerializerRegistry for ExtensionSerializer {
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
"EmptyMetric should not be serialized".to_string(),
)),
name if name == "MergeScan" => Ok(vec![]),
other => Err(DataFusionError::NotImplemented(format!(
"Serizlize logical plan for {}",
other

View File

@@ -17,6 +17,7 @@
mod df_substrait;
pub mod error;
pub mod extension_serializer;
use std::sync::Arc;
@@ -40,5 +41,5 @@ pub trait SubstraitPlan {
schema: &str,
) -> Result<Self::Plan, Self::Error>;
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error>;
fn encode(&self, plan: &Self::Plan) -> Result<Bytes, Self::Error>;
}

View File

@@ -25,7 +25,8 @@ mod mock;
pub mod server;
pub mod sql;
mod store;
#[cfg(test)]
mod tests;
use greptimedb_telemetry::telemetry;
#[cfg(test)]
mod tests;

View File

@@ -266,8 +266,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Cannot find primary key column by name: {}", msg))]
PrimaryKeyNotFound { msg: String, location: Location },
#[snafu(display("Cannot find column by name: {}", msg))]
ColumnNotFound { msg: String, location: Location },
#[snafu(display("Failed to execute statement, source: {}", source))]
ExecuteStatement {
@@ -592,7 +592,7 @@ impl ErrorExt for Error {
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
| Error::SchemaExists { .. }
| Error::PrimaryKeyNotFound { .. }
| Error::ColumnNotFound { .. }
| Error::MissingMetasrvOpts { .. }
| Error::BuildRegex { .. }
| Error::InvalidSchema { .. }

View File

@@ -35,7 +35,7 @@ use common_error::ext::BoxedError;
use common_meta::helper::{SchemaKey, SchemaValue};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest};
use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest};
use common_meta::rpc::store::CompareAndPutRequest;
use common_meta::table_name::TableName;
use common_query::Output;
@@ -65,7 +65,7 @@ use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu,
ColumnNotFoundSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu,
TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu,
UnrecognizedTableOptionSnafu,
@@ -108,7 +108,9 @@ impl DistInstance {
&create_table.table_name,
);
let mut table_info = create_table_info(create_table)?;
let (partitions, partition_cols) = parse_partitions(create_table, partitions)?;
let mut table_info = create_table_info(create_table, partition_cols)?;
let resp = self
.create_table_procedure(create_table, partitions, table_info.clone())
@@ -562,10 +564,9 @@ impl DistInstance {
async fn create_table_procedure(
&self,
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
partitions: Vec<Partition>,
table_info: RawTableInfo,
) -> Result<SubmitDdlTaskResponse> {
let partitions = parse_partitions(create_table, partitions)?;
let partitions = partitions.into_iter().map(Into::into).collect();
let request = SubmitDdlTaskRequest {
@@ -759,7 +760,10 @@ fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Parti
}))
}
fn create_table_info(create_table: &CreateTableExpr) -> Result<RawTableInfo> {
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
) -> Result<RawTableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
@@ -791,7 +795,17 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result<RawTableInfo> {
column_name_to_index_map
.get(name)
.cloned()
.context(PrimaryKeyNotFoundSnafu { msg: name })
.context(ColumnNotFoundSnafu { msg: name })
})
.collect::<Result<Vec<_>>>()?;
let partition_key_indices = partition_columns
.into_iter()
.map(|col_name| {
column_name_to_index_map
.get(&col_name)
.cloned()
.context(ColumnNotFoundSnafu { msg: col_name })
})
.collect::<Result<Vec<_>>>()?;
@@ -806,6 +820,7 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result<RawTableInfo> {
options: TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?,
created_on: DateTime::default(),
partition_key_indices,
};
let desc = if create_table.desc.is_empty() {
@@ -833,17 +848,20 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result<RawTableInfo> {
fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Vec<MetaPartition>> {
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(create_table, &partitions)?;
let partition_columns = find_partition_columns(&partitions)?;
let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?;
partition_entries
.into_iter()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()
.context(DeserializePartitionSnafu)
Ok((
partition_entries
.into_iter()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()
.context(DeserializePartitionSnafu)?,
partition_columns,
))
}
fn find_partition_entries(
@@ -895,10 +913,7 @@ fn find_partition_entries(
Ok(entries)
}
fn find_partition_columns(
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
) -> Result<Vec<String>> {
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
partitions
.column_list
@@ -906,7 +921,7 @@ fn find_partition_columns(
.map(|x| x.value.clone())
.collect::<Vec<_>>()
} else {
vec![create_table.time_index.clone()]
vec![]
};
Ok(columns)
}
@@ -952,7 +967,7 @@ ENGINE=mito",
match &result[0] {
Statement::CreateTable(c) => {
let expr = expr_factory::create_to_expr(c, QueryContext::arc()).unwrap();
let partitions = parse_partitions(&expr, c.partitions.clone()).unwrap();
let (partitions, _) = parse_partitions(&expr, c.partitions.clone()).unwrap();
let json = serde_json::to_string(&partitions).unwrap();
assert_eq!(json, expected);
}

View File

@@ -55,7 +55,7 @@ impl DatanodeInstance {
let logical_plan = self.build_logical_plan(&plan)?;
let substrait_plan = DFLogicalSubstraitConvertor
.encode(logical_plan)
.encode(&logical_plan)
.context(error::EncodeSubstraitLogicalPlanSnafu)?;
let result = self

View File

@@ -40,8 +40,9 @@ pub mod table_routes;
pub use crate::error::Result;
mod inactive_node_manager;
#[cfg(test)]
mod test_util;
mod greptimedb_telemetry;
use greptimedb_telemetry::telemetry;
#[cfg(test)]
mod test_util;

View File

@@ -140,6 +140,7 @@ pub(crate) mod tests {
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
};

View File

@@ -182,7 +182,7 @@ mod tests {
// modification to manifest-related structs is compatible with older manifests.
#[test]
fn test_table_manifest_compatibility() {
let table_change = r#"{"table_info":{"ident":{"table_id":0,"version":0},"name":"demo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"host","data_type":{"String":null},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"cpu","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"memory","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":true,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":3,"version":0},"primary_key_indices":[0],"value_indices":[1,2,3],"engine":"mito","next_column_id":1,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"2023-03-06T08:50:34.662020Z"},"table_type":"Base"}}"#;
let table_change = r#"{"table_info":{"ident":{"table_id":0,"version":0},"name":"demo","desc":null,"catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"host","data_type":{"String":null},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"cpu","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"memory","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":true,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}}],"timestamp_index":3,"version":0},"primary_key_indices":[0],"value_indices":[1,2,3],"engine":"mito","partition_key_indices":[],"next_column_id":1,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"2023-03-06T08:50:34.662020Z"},"table_type":"Base"}}"#;
let _ = serde_json::from_str::<TableChange>(table_change).unwrap();
let table_remove =

View File

@@ -167,45 +167,46 @@ impl PartitionRule for RangeColumnsPartitionRule {
}
fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) {
let PartitionExpr {
column: _,
op,
value,
} = exprs
.iter()
.find(|x| x.column == self.column_list[0])
// "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs"
.unwrap();
let regions =
if !exprs.is_empty() && exprs.iter().all(|x| self.column_list.contains(&x.column)) {
let PartitionExpr {
column: _,
op,
value,
} = exprs
.iter()
.find(|x| x.column == self.column_list[0])
// "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs"
.unwrap();
let regions = &self.first_column_regions;
match self
.first_column_bounds
.binary_search(&PartitionBound::Value(value.clone()))
{
Ok(i) => match op {
Operator::Lt => &regions[..=i],
Operator::LtEq => &regions[..=(i + 1)],
Operator::Eq => &regions[(i + 1)..=(i + 1)],
Operator::Gt | Operator::GtEq => &regions[(i + 1)..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
Err(i) => match op {
Operator::Lt | Operator::LtEq => &regions[..=i],
Operator::Eq => &regions[i..=i],
Operator::Gt | Operator::GtEq => &regions[i..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
}
.iter()
.flatten()
.cloned()
.collect::<Vec<RegionNumber>>()
} else {
self.regions.clone()
};
let regions = &self.first_column_regions;
match self
.first_column_bounds
.binary_search(&PartitionBound::Value(value.clone()))
{
Ok(i) => match op {
Operator::Lt => &regions[..=i],
Operator::LtEq => &regions[..=(i + 1)],
Operator::Eq => &regions[(i + 1)..=(i + 1)],
Operator::Gt | Operator::GtEq => &regions[(i + 1)..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
Err(i) => match op {
Operator::Lt | Operator::LtEq => &regions[..=i],
Operator::Eq => &regions[i..=i],
Operator::Gt | Operator::GtEq => &regions[i..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
}
.iter()
.flatten()
.cloned()
.collect::<Vec<RegionNumber>>()
} else {
self.regions.clone()
};
Ok(regions)
}
}

View File

@@ -163,13 +163,6 @@ impl PartitionRuleManager {
let partitions = self.find_table_partitions(table_id).await?;
let partition_columns = partitions[0].partition.partition_columns();
ensure!(
!partition_columns.is_empty(),
error::InvalidTableRouteDataSnafu {
table_id,
err_msg: "no partition columns found"
}
);
let regions = partitions
.iter()

View File

@@ -51,6 +51,12 @@ impl WriteSplitter {
let row_nums = check_req(&insert)?;
let mut insert = insert;
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
// If no partition column, all rows are inserted into the first region.
let mut split = HashMap::new();
split.insert(0, insert);
return Ok(split);
}
let missing_columns = schema
.column_schemas()
.iter()
@@ -87,6 +93,12 @@ impl WriteSplitter {
Self::validate_delete_request(&request)?;
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
// If no partition column, all requests are sent to the first region.
let mut split = HashMap::new();
split.insert(0, request);
return Ok(split);
}
let values = find_partitioning_values(&request.key_column_values, &partition_columns)?;
let regional_value_indexes = self.split_partitioning_values(&values)?;
@@ -280,7 +292,8 @@ mod tests {
use datatypes::types::{BooleanType, Int16Type, StringType};
use datatypes::value::Value;
use datatypes::vectors::{
BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVectorBuilder, Vector,
BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVector, StringVectorBuilder,
Vector,
};
use serde::{Deserialize, Serialize};
use store_api::storage::RegionNumber;
@@ -359,7 +372,7 @@ mod tests {
}
#[test]
fn test_writer_spliter_without_partition_columns() {
fn test_writer_spliter_with_id_partition_columns() {
let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
@@ -392,6 +405,53 @@ mod tests {
);
}
#[test]
fn test_writer_spliter_without_partition_columns() {
let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns();
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter.split_insert(insert, &mock_schema).unwrap();
assert_eq!(1, ret.len());
let r1_insert = ret.get(&0).unwrap();
assert_eq!("demo", r1_insert.table_name);
let r1_columns = &r1_insert.columns_values;
assert_eq!(2, r1_columns.len());
assert_columns(
r1_columns,
&[
(
"host",
&[Value::from("host1"), Value::Null, Value::from("host3")],
),
(
"enable_reboot",
&[Value::from(true), Value::from(false), Value::from(true)],
),
],
);
}
#[test]
fn test_delete_spliter_without_partition_columns() {
let mut key_column_values = HashMap::new();
key_column_values.insert(
"host".to_string(),
Arc::new(StringVector::from(vec!["localhost"])) as _,
);
let delete = DeleteRequest { key_column_values };
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter
.split_delete(delete, vec![&String::from("host")])
.unwrap();
assert_eq!(1, ret.len());
}
#[test]
fn test_partition_insert_request() {
let insert = mock_insert_request();
@@ -631,4 +691,28 @@ mod tests {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
struct EmptyPartitionRule;
// PARTITION BY LIST COLUMNS() (
// PARTITION r0 VALUES LESS THAN MAXVALUE,
// );
impl PartitionRule for EmptyPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec![]
}
fn find_region(&self, _: &[Value]) -> Result<RegionNumber, Error> {
Ok(0)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
unimplemented!()
}
}
}

View File

@@ -19,4 +19,5 @@ mod planner;
mod utils;
pub use analyzer::DistPlannerAnalyzer;
pub use merge_scan::MergeScanLogicalPlan;
pub use planner::DistExtensionPlanner;

View File

@@ -14,10 +14,14 @@
use std::sync::{Arc, Mutex};
use datafusion::datasource::DefaultTableSource;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_expr::{Extension, LogicalPlan};
use datafusion_optimizer::analyzer::AnalyzerRule;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::commutativity::{
partial_commutative_transformer, Categorizer, Commutativity,
@@ -161,6 +165,8 @@ struct CommutativeVisitor {
next_stage: Vec<LogicalPlan>,
// hash of the stop node
stop_node: Option<u64>,
/// Partition columns of current visiting table
current_partition_cols: Option<Vec<String>>,
}
impl TreeNodeVisitor for CommutativeVisitor {
@@ -170,18 +176,46 @@ impl TreeNodeVisitor for CommutativeVisitor {
// find the first merge scan and stop traversing down
// todo: check if it works for join
Ok(match plan {
LogicalPlan::Extension(ext) => {
if ext.node.name() == MergeScanLogicalPlan::name() {
VisitRecursion::Skip
} else {
VisitRecursion::Continue
LogicalPlan::TableScan(table_scan) => {
// TODO(ruihang): spawn a sub visitor to retrieve partition columns
if let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
if let Some(provider) = source
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
if provider.table().table_type() == TableType::Base {
let info = provider.table().table_info();
let partition_key_indices = info.meta.partition_key_indices.clone();
let schema = info.meta.schema.clone();
let partition_cols = partition_key_indices
.into_iter()
.map(|index| schema.column_name_by_index(index).to_string())
.collect::<Vec<String>>();
self.current_partition_cols = Some(partition_cols);
}
}
}
VisitRecursion::Continue
}
_ => VisitRecursion::Continue,
})
}
fn post_visit(&mut self, plan: &LogicalPlan) -> datafusion_common::Result<VisitRecursion> {
if DFLogicalSubstraitConvertor.encode(plan).is_err() {
common_telemetry::info!(
"substrait error: {:?}",
DFLogicalSubstraitConvertor.encode(plan)
);
self.stop_node = Some(utils::hash_plan(plan));
return Ok(VisitRecursion::Stop);
}
match Categorizer::check_plan(plan) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
@@ -201,6 +235,16 @@ impl TreeNodeVisitor for CommutativeVisitor {
self.next_stage.push(plan)
}
},
Commutativity::CheckPartition => {
if let Some(partition_cols) = &self.current_partition_cols
&& partition_cols.is_empty() {
// no partition columns, and can be encoded skip
return Ok(VisitRecursion::Continue);
} else {
self.stop_node = Some(utils::hash_plan(plan));
return Ok(VisitRecursion::Stop);
}
},
Commutativity::NonCommutative
| Commutativity::Unimplemented
| Commutativity::Unsupported => {
@@ -218,6 +262,7 @@ impl CommutativeVisitor {
Self {
next_stage: vec![],
stop_node: None,
current_partition_cols: None,
}
}
}

View File

@@ -19,6 +19,8 @@ use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
use crate::dist_plan::MergeScanLogicalPlan;
#[allow(dead_code)]
pub enum Commutativity {
Commutative,
@@ -27,6 +29,7 @@ pub enum Commutativity {
TransformedCommutative(Option<Transformer>),
NonCommutative,
Unimplemented,
CheckPartition,
/// For unrelated plans like DDL
Unsupported,
}
@@ -36,7 +39,15 @@ pub struct Categorizer {}
impl Categorizer {
pub fn check_plan(plan: &LogicalPlan) -> Commutativity {
match plan {
LogicalPlan::Projection(_) => Commutativity::Unimplemented,
LogicalPlan::Projection(proj) => {
for expr in &proj.expr {
let commutativity = Self::check_expr(expr);
if !matches!(commutativity, Commutativity::Commutative) {
return commutativity;
}
}
Commutativity::Commutative
}
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
LogicalPlan::Window(_) => Commutativity::Unimplemented,
@@ -44,7 +55,7 @@ impl Categorizer {
// check all children exprs and uses the strictest level
Commutativity::Unimplemented
}
LogicalPlan::Sort(_) => Commutativity::NonCommutative,
LogicalPlan::Sort(_) => Commutativity::Unimplemented,
LogicalPlan::Join(_) => Commutativity::NonCommutative,
LogicalPlan::CrossJoin(_) => Commutativity::NonCommutative,
LogicalPlan::Repartition(_) => {
@@ -52,7 +63,7 @@ impl Categorizer {
Commutativity::Unimplemented
}
LogicalPlan::Union(_) => Commutativity::Unimplemented,
LogicalPlan::TableScan(_) => Commutativity::NonCommutative,
LogicalPlan::TableScan(_) => Commutativity::CheckPartition,
LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative,
LogicalPlan::Subquery(_) => Commutativity::Unimplemented,
LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented,
@@ -79,7 +90,8 @@ impl Categorizer {
|| name == InstantManipulate::name()
|| name == SeriesNormalize::name()
|| name == RangeManipulate::name()
|| name == SeriesDivide::name() =>
|| name == SeriesDivide::name()
|| name == MergeScanLogicalPlan::name() =>
{
Commutativity::Commutative
}
@@ -89,8 +101,7 @@ impl Categorizer {
pub fn check_expr(expr: &Expr) -> Commutativity {
match expr {
Expr::Alias(_, _)
| Expr::Column(_)
Expr::Column(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr(_)
@@ -124,7 +135,9 @@ impl Categorizer {
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::Wildcard => Commutativity::Unimplemented,
Expr::QualifiedWildcard { .. }
Expr::Alias(_, _)
| Expr::QualifiedWildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use std::sync::Arc;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use async_stream::try_stream;
use client::client_manager::DatanodeClients;
use client::Database;
@@ -28,12 +28,13 @@ use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatchStreamAdaptor, SendableRecordBatchStream,
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream,
};
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::Schema;
use futures_util::StreamExt;
use snafu::ResultExt;
@@ -123,6 +124,8 @@ impl MergeScanExec {
arrow_schema: ArrowSchemaRef,
clients: Arc<DatanodeClients>,
) -> Self {
// remove all metadata
let arrow_schema = Arc::new(ArrowSchema::new(arrow_schema.fields().to_vec()));
Self {
table,
peers,
@@ -163,12 +166,12 @@ impl MergeScanExec {
}
Output::RecordBatches(record_batches) => {
for batch in record_batches.into_iter() {
yield batch;
yield Self::remove_metadata_from_record_batch(batch);
}
}
Output::Stream(mut stream) => {
while let Some(batch) = stream.next().await {
yield batch?;
yield Self::remove_metadata_from_record_batch(batch?);
}
}
}
@@ -186,6 +189,15 @@ impl MergeScanExec {
output_ordering: None,
}))
}
fn remove_metadata_from_record_batch(batch: RecordBatch) -> RecordBatch {
let schema = ArrowSchema::new(batch.schema.arrow_schema().fields().to_vec());
RecordBatch::new(
Arc::new(Schema::try_from(schema).unwrap()),
batch.columns().iter().cloned(),
)
.unwrap()
}
}
impl ExecutionPlan for MergeScanExec {

View File

@@ -82,12 +82,12 @@ impl ExtensionPlanner for DistExtensionPlanner {
} else {
// TODO(ruihang): generate different execution plans for different variant merge operation
let input_plan = merge_scan.input();
let input_physical_plan = planner
.create_physical_plan(input_plan, session_state)
.await?;
let Some(table_name) = self.get_table_name(input_plan)? else {
// no relation found in input plan, going to execute them locally
return planner
.create_physical_plan(input_plan, session_state)
.await
.map(Some);
return Ok(Some(input_physical_plan));
};
if table_name.schema_name == INFORMATION_SCHEMA_NAME {
@@ -97,10 +97,10 @@ impl ExtensionPlanner for DistExtensionPlanner {
.map(Some);
}
let input_schema = input_plan.schema().clone();
let input_schema = input_physical_plan.schema().clone();
let input_plan = self.set_table_name(&table_name, input_plan.clone())?;
let substrait_plan: Bytes = DFLogicalSubstraitConvertor
.encode(input_plan.clone())
.encode(&input_plan)
.context(error::EncodeSubstraitLogicalPlanSnafu)?
.into();
let peers = self.get_peers(&table_name).await;
@@ -110,7 +110,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
table_name,
peers,
substrait_plan,
Arc::new(input_schema.as_ref().into()),
input_schema,
self.clients.clone(),
);

View File

@@ -19,7 +19,6 @@ pub mod datafusion;
pub mod dist_plan;
pub mod error;
pub mod executor;
pub mod extension_serializer;
pub mod logical_optimizer;
mod metrics;
mod optimizer;

View File

@@ -39,11 +39,11 @@ use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use partition::manager::PartitionRuleManager;
use promql::extension_plan::PromExtensionPlanner;
use substrait::extension_serializer::ExtensionSerializer;
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer};
use crate::extension_serializer::ExtensionSerializer;
use crate::optimizer::order_hint::OrderHintRule;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::query_engine::options::QueryOptions;

View File

@@ -115,6 +115,8 @@ pub struct TableMeta {
pub options: TableOptions,
#[builder(default = "Utc::now()")]
pub created_on: DateTime<Utc>,
#[builder(default = "Vec::new()")]
pub partition_key_indices: Vec<usize>,
}
impl TableMetaBuilder {
@@ -518,6 +520,7 @@ pub struct RawTableMeta {
pub engine_options: HashMap<String, String>,
pub options: TableOptions,
pub created_on: DateTime<Utc>,
pub partition_key_indices: Vec<usize>,
}
impl From<TableMeta> for RawTableMeta {
@@ -532,6 +535,7 @@ impl From<TableMeta> for RawTableMeta {
engine_options: meta.engine_options,
options: meta.options,
created_on: meta.created_on,
partition_key_indices: meta.partition_key_indices,
}
}
}
@@ -550,6 +554,7 @@ impl TryFrom<RawTableMeta> for TableMeta {
engine_options: raw.engine_options,
options: raw.options,
created_on: raw.created_on,
partition_key_indices: raw.partition_key_indices,
})
}
}

View File

@@ -108,7 +108,7 @@ async fn test_show_create_table(instance: Arc<dyn MockInstance>) {
| | ts BIGINT NOT NULL, |
| | TIME INDEX (ts) |
| | ) |
| | PARTITION BY RANGE COLUMNS (ts) ( |
| | PARTITION BY RANGE COLUMNS () ( |
| | PARTITION r0 VALUES LESS THAN (MAXVALUE) |
| | ) |
| | ENGINE=mito |

View File

@@ -14,8 +14,9 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1;
+-+-+
| logical_plan_| Sort: integers.i % Int64(2) ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[integers.i % Int64(2)]], aggr=[[]]_|
|_|_Projection: integers.i % Int64(2)_|
|_|_MergeScan [is_placeholder=false]_|
|_|_Aggregate: groupBy=[[integers.i % Int64(2)]], aggr=[[]]_|
|_|_Projection: integers.i % Int64(2)_|
|_|_TableScan: integers projection=[i]_|
| physical_plan | SortPreservingMergeExec: [integers.i % Int64(2)@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[integers.i % Int64(2)@0 ASC NULLS LAST]_|
@@ -23,7 +24,6 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1;
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_|
|_|_ProjectionExec: expr=[i@0 % 2 as integers.i % Int64(2)]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_|
@@ -48,11 +48,9 @@ EXPLAIN SELECT a, b FROM test ORDER BY a, b;
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_|
|_|_Projection: test.a, test.b_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: test projection=[a, b, t]_|
|_|_TableScan: test projection=[a, b]_|
| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] |
|_|_ProjectionExec: expr=[a@0 as a, b@1 as b]_|
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+
@@ -69,9 +67,9 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;
+-+-+
| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]_|
|_|_Projection: test.a, test.b_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: test projection=[a, b, t]_|
|_|_Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]_|
|_|_TableScan: test projection=[a, b]_|
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST]_|
|_|_SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]_|
@@ -79,7 +77,6 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[a@0 as a, b@1 as b]_|
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+

View File

@@ -0,0 +1,72 @@
CREATE TABLE single_partition(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);
Affected Rows: 0
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
EXPLAIN SELECT COUNT(*) FROM single_partition;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]_|
|_|_Projection:_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: single_partition projection=[i, j, k]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
EXPLAIN SELECT SUM(i) FROM single_partition;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[SUM(single_partition.i)]]_|
|_|_Projection: single_partition.i_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: single_partition projection=[i, j, k]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[SUM(single_partition.i)]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[SUM(single_partition.i)]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[i@0 as i]_|
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
EXPLAIN SELECT * FROM single_partition ORDER BY i DESC;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: single_partition.i DESC NULLS FIRST_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: single_partition projection=[i, j, k] |
| physical_plan | SortExec: expr=[i@0 DESC]_|
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+
drop table single_partition;
Affected Rows: 1

View File

@@ -0,0 +1,24 @@
CREATE TABLE single_partition(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
EXPLAIN SELECT COUNT(*) FROM single_partition;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
EXPLAIN SELECT SUM(i) FROM single_partition;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
EXPLAIN SELECT * FROM single_partition ORDER BY i DESC;
drop table single_partition;

View File

@@ -110,7 +110,12 @@ SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i;
Error: 3001(EngineExecuteQuery), No field named __correlated_sq_3.i. Valid fields are integers.i, integers.j.
+---+---+
| i | j |
+---+---+
| 1 | 1 |
| 2 | 2 |
+---+---+
SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i IN ((SELECT i FROM integers)) AND i1.i=i2.i ORDER BY 1;

View File

@@ -1,45 +1,39 @@
explain select * from numbers;
+---------------+------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------+
explain select * from numbers order by number desc;
+---------------+--------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------+
| logical_plan | Projection: numbers.number |
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | ProjectionExec: expr=[number@0 as number] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+--------------------------------------------+
explain select * from numbers order by number desc;
+---------------+----------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | ProjectionExec: expr=[number@0 as number] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+----------------------------------------------+
explain select * from numbers order by number asc;
+---------------+----------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | ProjectionExec: expr=[number@0 as number] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+----------------------------------------------+
+---------------+--------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+--------------------------------------------+
explain select * from numbers order by number desc limit 10;
@@ -48,13 +42,11 @@ explain select * from numbers order by number desc limit 10;
+---------------+---------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | Projection: numbers.number |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | ProjectionExec: expr=[number@0 as number] |
| | ExecutionPlan(PlaceHolder) |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+---------------------------------------------------+
@@ -65,13 +57,11 @@ explain select * from numbers order by number asc limit 10;
+---------------+------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | Projection: numbers.number |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | ProjectionExec: expr=[number@0 as number] |
| | ExecutionPlan(PlaceHolder) |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------------------+

View File

@@ -55,15 +55,14 @@ TQL EXPLAIN host_load1{__field__="val"};
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivide: tags=["collector", "host"] |
| | Sort: host_load1.collector DESC NULLS LAST, host_load1.host DESC NULLS LAST, host_load1.ts DESC NULLS LAST |
| | Projection: host_load1.val, host_load1.collector, host_load1.host, host_load1.ts |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
| | Projection: host_load1.val, host_load1.collector, host_load1.host, host_load1.ts |
| | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] |
| | MergeScanExec: peers=[REDACTED
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+