simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-02 07:19:21 +08:00
parent dba2b571e7
commit 0e7ccfadf7
11 changed files with 107 additions and 1330 deletions

View File

@@ -397,20 +397,6 @@ impl ExecutionPlan for SeriesDivideExec {
if self.tag_columns.is_empty() {
return vec![Distribution::SinglePartition];
}
// If the upstream already produces a single partition, `SeriesDivide` can safely process
// the full ordered stream directly. Repartitioning a single partition into the global
// target partition count only adds shuffle/merge overhead without improving correctness.
if self
.input
.properties()
.output_partitioning()
.partition_count()
<= 1
{
return vec![Distribution::UnspecifiedDistribution];
}
let schema = self.input.schema();
vec![Distribution::HashPartitioned(
self.tag_columns
@@ -754,45 +740,6 @@ mod test {
))
}
fn prepare_multi_partition_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("path", DataType::Utf8, true),
Field::new(
"time_index",
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let data_1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["000", "000"])) as _,
Arc::new(StringArray::from(vec!["foo", "foo"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![1000, 2000],
)) as _,
],
)
.unwrap();
let data_2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["001", "001"])) as _,
Arc::new(StringArray::from(vec!["bar", "bar"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![3000, 4000],
)) as _,
],
)
.unwrap();
DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![data_1], vec![data_2]], schema, None).unwrap(),
))
}
#[test]
fn pruning_should_keep_tags_and_time_index_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
@@ -813,38 +760,6 @@ mod test {
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[test]
fn single_partition_input_skips_repartition_requirement() {
let divide_exec = SeriesDivideExec {
tag_columns: vec!["host".to_string()],
time_index_column: "time_index".to_string(),
input: Arc::new(prepare_test_data()),
metric: ExecutionPlanMetricsSet::new(),
};
let requirement = divide_exec.required_input_distribution();
assert!(matches!(
requirement.as_slice(),
[Distribution::UnspecifiedDistribution]
));
}
#[test]
fn multi_partition_input_still_requires_hash_partitioning() {
let divide_exec = SeriesDivideExec {
tag_columns: vec!["host".to_string()],
time_index_column: "time_index".to_string(),
input: Arc::new(prepare_multi_partition_test_data()),
metric: ExecutionPlanMetricsSet::new(),
};
let requirement = divide_exec.required_input_distribution();
assert!(matches!(
requirement.as_slice(),
[Distribution::HashPartitioned(_)]
));
}
#[tokio::test]
async fn overall_data() {
let memory_exec = Arc::new(prepare_test_data());

View File

@@ -27,7 +27,6 @@ use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Subquery, col as col_fn};
use datafusion_optimizer::analyzer::AnalyzerRule;
use promql::extension_plan::SeriesDivide;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
@@ -510,20 +509,6 @@ impl PlanRewriter {
.map(|index| schema.column_name_by_index(index).to_string())
.collect::<Vec<String>>();
// Metric engine scans can project the internal `__tsid` column even though it is
// not part of the logical table schema. Equal `__tsid` values always belong to the
// same series, so carry it as an additional partition key when it is available.
if plan
.schema()
.index_of_column_by_name(None, DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some()
&& !partition_cols
.iter()
.any(|col| col == DATA_SCHEMA_TSID_COLUMN_NAME)
{
partition_cols.push(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
}
let partition_rules = table.partition_rules();
let exist_phy_part_cols_not_in_logical_table = partition_rules
.map(|r| !r.extra_phy_cols_not_in_logical_table.is_empty())

View File

@@ -37,18 +37,12 @@ use datafusion::physical_plan::{
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr,
};
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use promql::extension_plan::{
InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize,
};
use session::context::QueryContextRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::time::Instant;
@@ -166,185 +160,12 @@ impl std::fmt::Debug for MergeScanExec {
}
impl MergeScanExec {
fn default_partition_exprs(
session_state: &SessionState,
plan: &LogicalPlan,
partition_cols: &AliasMapping,
) -> Vec<Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
partition_cols
.iter()
.filter_map(|col| Self::partition_expr_for_alias(session_state, plan, col.1.first()))
.collect()
}
fn partition_expr_for_alias(
session_state: &SessionState,
plan: &LogicalPlan,
column: Option<&ColumnExpr>,
) -> Option<Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
let column = column?;
session_state
.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(column.name().to_string())),
plan.schema(),
)
.ok()
}
fn prefer_tsid_partition_exprs(
session_state: &SessionState,
plan: &LogicalPlan,
partition_cols: &AliasMapping,
) -> Option<Vec<Arc<dyn datafusion_physical_expr::PhysicalExpr>>> {
Self::promql_tsid_ordered_time_index(plan)?;
let tsid_aliases = partition_cols.get(DATA_SCHEMA_TSID_COLUMN_NAME)?;
let tsid_expr = Self::partition_expr_for_alias(session_state, plan, tsid_aliases.first())?;
Some(vec![tsid_expr])
}
fn promql_tsid_ordered_time_index(plan: &LogicalPlan) -> Option<String> {
let time_index_column = match plan {
LogicalPlan::Sort(sort) => {
if sort.expr.len() != 2 {
return None;
}
let [tsid_sort, time_sort] = sort.expr.as_slice() else {
return None;
};
let tsid_column = Self::ascending_nulls_first_sort_column(tsid_sort)?;
let time_column = Self::ascending_nulls_first_sort_column(time_sort)?;
(tsid_column == DATA_SCHEMA_TSID_COLUMN_NAME).then_some(time_column)
}
LogicalPlan::Projection(projection) => {
Self::promql_tsid_ordered_time_index(projection.input.as_ref())
}
LogicalPlan::Filter(filter) => {
Self::promql_tsid_ordered_time_index(filter.input.as_ref())
}
LogicalPlan::SubqueryAlias(alias) => {
Self::promql_tsid_ordered_time_index(alias.input.as_ref())
}
LogicalPlan::Extension(extension) if Self::is_promql_passthrough_node(extension) => {
extension
.node
.inputs()
.first()
.and_then(|input| Self::promql_tsid_ordered_time_index(input))
}
_ => None,
}?;
let schema = plan.schema();
let has_tsid = schema
.index_of_column_by_name(None, DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some();
let has_time_index = schema
.index_of_column_by_name(None, &time_index_column)
.is_some();
(has_tsid && has_time_index).then_some(time_index_column)
}
fn is_promql_passthrough_node(extension: &Extension) -> bool {
let node = extension.node.as_any();
node.is::<InstantManipulate>()
|| node.is::<SeriesDivide>()
|| node.is::<SeriesNormalize>()
|| node.is::<ScalarCalculate>()
|| node.is::<RangeManipulate>()
}
fn ascending_nulls_first_sort_column(
sort_expr: &datafusion_expr::expr::Sort,
) -> Option<String> {
(sort_expr.asc && sort_expr.nulls_first)
.then(|| sort_expr.expr.try_as_col().map(|col| col.name.clone()))
.flatten()
}
fn schema_exposes_column(plan: &LogicalPlan, column_name: &str) -> bool {
plan.schema()
.index_of_column_by_name(None, column_name)
.is_some()
}
pub(crate) fn logical_sort_ordering(
session_state: &SessionState,
plan: &LogicalPlan,
) -> Result<Option<LexOrdering>> {
if let Some(time_index_column) = Self::promql_tsid_ordered_time_index(plan) {
if !Self::schema_exposes_column(plan, DATA_SCHEMA_TSID_COLUMN_NAME)
|| !Self::schema_exposes_column(plan, &time_index_column)
{
return Ok(None);
}
let tsid_expr = session_state.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
)),
plan.schema(),
)?;
let time_expr = session_state.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(time_index_column)),
plan.schema(),
)?;
return Ok(LexOrdering::new(vec![
PhysicalSortExpr::new(
tsid_expr,
SortOptions {
descending: false,
nulls_first: true,
},
),
PhysicalSortExpr::new(
time_expr,
SortOptions {
descending: false,
nulls_first: true,
},
),
]));
}
let LogicalPlan::Sort(sort) = plan else {
return Ok(None);
};
let lex_ordering = LexOrdering::new(
sort.expr
.iter()
.map(|sort_expr| {
let physical_expr = session_state
.create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
Ok(PhysicalSortExpr::new(
physical_expr,
SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
))
})
.collect::<Result<Vec<_>>>()?,
)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Sort plan must contain at least one expression: {plan}"
))
})?;
Ok(Some(lex_ordering))
}
#[allow(clippy::too_many_arguments)]
pub fn new(
session_state: &SessionState,
table: TableName,
regions: Vec<RegionId>,
plan: LogicalPlan,
remote_orderings: Vec<LexOrdering>,
arrow_schema: &ArrowSchema,
region_query_handler: RegionQueryHandlerRef,
query_ctx: QueryContextRef,
@@ -363,23 +184,46 @@ impl MergeScanExec {
// break the ordering on merging (of MergeScan).
//
// Otherwise, we need to use the default ordering.
let eq_properties = if target_partition >= regions.len() {
if !remote_orderings.is_empty() {
EquivalenceProperties::new_with_orderings(arrow_schema.clone(), remote_orderings)
} else if let Some(ordering) = Self::logical_sort_ordering(session_state, &plan)? {
EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![ordering])
} else {
EquivalenceProperties::new(arrow_schema.clone())
}
let eq_properties = if let LogicalPlan::Sort(sort) = &plan
&& target_partition >= regions.len()
{
let lex_ordering = sort
.expr
.iter()
.map(|sort_expr| {
let physical_expr = session_state
.create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
Ok(PhysicalSortExpr::new(
physical_expr,
SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
))
})
.collect::<Result<Vec<_>>>()?;
EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
} else {
EquivalenceProperties::new(arrow_schema.clone())
};
let partition_exprs =
Self::prefer_tsid_partition_exprs(session_state, &plan, &partition_cols)
.unwrap_or_else(|| {
Self::default_partition_exprs(session_state, &plan, &partition_cols)
});
let partition_exprs = partition_cols
.iter()
.filter_map(|col| {
if let Some(first_alias) = col.1.first() {
session_state
.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(
first_alias.name().to_string(),
)),
plan.schema(),
)
.ok()
} else {
None
}
})
.collect();
let partitioning = Partitioning::Hash(partition_exprs, target_partition);
let properties = Arc::new(PlanProperties::new(
@@ -577,49 +421,31 @@ impl MergeScanExec {
return None;
};
if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
&& curr_dist == &hash_exprs
{
// No need to change the distribution
return None;
}
let all_partition_col_aliases: HashSet<_> = self
.partition_cols
.values()
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
.collect();
let mut overlaps = hash_exprs
.iter()
.filter(|expr| {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
})
.cloned()
.collect::<Vec<_>>();
// Metric-engine scans can satisfy any hash distribution that includes `__tsid`.
// Equal requested keys must also share the same `__tsid`, and equal `__tsid` values are
// guaranteed to stay co-located across MergeScan partitions. Advertise the full requested
// distribution so EnforceDistribution can skip redundant reshuffles.
if self
.arrow_schema
.column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some()
&& hash_exprs.iter().any(|expr| {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
})
{
overlaps = hash_exprs.clone();
let mut overlaps = vec![];
for expr in &hash_exprs {
if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
&& all_partition_col_aliases.contains(col_expr.name())
{
overlaps.push(expr.clone());
}
}
if overlaps.is_empty() {
return None;
}
if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
&& curr_dist == &overlaps
{
// No need to change the distribution.
return None;
}
Some(Self {
table: self.table.clone(),
regions: self.regions.clone(),
@@ -668,278 +494,6 @@ impl MergeScanExec {
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use async_trait::async_trait;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_common::ToDFSchema;
use datafusion_expr::{EmptyRelation, Extension, LogicalPlan, LogicalPlanBuilder, col};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
use promql::extension_plan::{InstantManipulate, SeriesDivide};
use session::ReadPreference;
use session::context::QueryContext;
use super::*;
use crate::error::Result as QueryResult;
use crate::region_query::RegionQueryHandler;
struct NoopRegionQueryHandler;
#[async_trait]
impl RegionQueryHandler for NoopRegionQueryHandler {
async fn do_get(
&self,
_read_preference: ReadPreference,
_request: QueryRequest,
) -> QueryResult<SendableRecordBatchStream> {
unreachable!("merge scan distribution tests should not execute remote queries")
}
}
#[test]
fn try_with_new_distribution_satisfies_tsid_hash_requirements() {
let merge_scan = test_merge_scan_exec(
BTreeMap::from([
(
"host".to_string(),
BTreeSet::from([ColumnExpr::from_name("host")]),
),
(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]),
),
]),
vec![
partition_column("host", 0),
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
],
);
let optimized = merge_scan
.try_with_new_distribution(Distribution::HashPartitioned(vec![
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
partition_column("greptime_timestamp", 2),
]))
.unwrap();
let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning
else {
panic!("expected hash partitioning");
};
assert_eq!(partition_count, 32);
assert_eq!(
column_names(exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
}
#[test]
fn try_with_new_distribution_keeps_regular_partition_overlap() {
let merge_scan = test_merge_scan_exec(
BTreeMap::from([(
"host".to_string(),
BTreeSet::from([ColumnExpr::from_name("host")]),
)]),
vec![partition_column("greptime_timestamp", 2)],
);
let optimized = merge_scan
.try_with_new_distribution(Distribution::HashPartitioned(vec![
partition_column("host", 0),
partition_column("greptime_timestamp", 2),
]))
.unwrap();
let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning
else {
panic!("expected hash partitioning");
};
assert_eq!(partition_count, 32);
assert_eq!(column_names(exprs), vec!["host"]);
}
#[test]
fn new_prefers_tsid_partitioning_for_promql_tsid_sort() {
let session_state = SessionStateBuilder::new().with_default_features().build();
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]));
let partition_cols = BTreeMap::from([
(
"host".to_string(),
BTreeSet::from([ColumnExpr::from_name("host")]),
),
(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]),
),
]);
let plan = promql_tsid_sorted_plan(schema.clone(), "ts");
let ordering = MergeScanExec::logical_sort_ordering(&session_state, &plan)
.unwrap()
.unwrap();
let merge_scan = MergeScanExec::new(
&session_state,
TableName::new("greptime", "public", "test"),
vec![RegionId::new(1, 0), RegionId::new(1, 1)],
plan,
vec![],
schema.as_ref(),
Arc::new(NoopRegionQueryHandler),
QueryContext::arc(),
32,
partition_cols,
)
.unwrap();
let Partitioning::Hash(ref exprs, partition_count) = merge_scan.properties.partitioning
else {
panic!("expected hash partitioning");
};
assert_eq!(partition_count, 32);
assert_eq!(column_names(exprs), vec![DATA_SCHEMA_TSID_COLUMN_NAME]);
assert_eq!(
ordering_column_names(&ordering),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "ts"]
);
}
#[test]
fn logical_sort_ordering_ignores_projected_away_tsid_columns() {
let session_state = SessionStateBuilder::new().with_default_features().build();
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]));
let projected = LogicalPlanBuilder::from(promql_tsid_sorted_plan(schema, "ts"))
.project(vec![col("host"), col("ts"), col("greptime_value")])
.unwrap()
.build()
.unwrap();
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(InstantManipulate::new(
0,
10,
1,
1,
"ts".to_string(),
Some("greptime_value".to_string()),
projected,
)),
});
let ordering = MergeScanExec::logical_sort_ordering(&session_state, &plan).unwrap();
assert!(ordering.is_none());
}
fn test_merge_scan_exec(
partition_cols: AliasMapping,
current_partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> MergeScanExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]));
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
MergeScanExec {
table: TableName::new("greptime", "public", "test"),
regions: vec![RegionId::new(1, 0), RegionId::new(1, 1)],
plan,
arrow_schema: schema.clone(),
region_query_handler: Arc::new(NoopRegionQueryHandler),
metric: ExecutionPlanMetricsSet::new(),
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::Hash(current_partition_exprs, 32),
EmissionType::Incremental,
Boundedness::Bounded,
)),
sub_stage_metrics: Arc::default(),
partition_metrics: Arc::default(),
query_ctx: QueryContext::arc(),
target_partition: 32,
partition_cols,
}
}
fn partition_column(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new(name, index))
}
fn column_names(exprs: &[Arc<dyn PhysicalExpr>]) -> Vec<&str> {
exprs
.iter()
.map(|expr| expr.as_any().downcast_ref::<Column>().unwrap().name())
.collect()
}
fn ordering_column_names(ordering: &LexOrdering) -> Vec<&str> {
ordering
.iter()
.map(|sort_expr| {
sort_expr
.expr
.as_any()
.downcast_ref::<Column>()
.unwrap()
.name()
})
.collect()
}
fn promql_tsid_sorted_plan(schema: Arc<Schema>, time_index: &str) -> LogicalPlan {
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: schema.to_dfschema_ref().unwrap(),
});
let sorted = LogicalPlanBuilder::from(input)
.sort(vec![
col(DATA_SCHEMA_TSID_COLUMN_NAME).sort(true, true),
col(time_index).sort(true, true),
])
.unwrap()
.build()
.unwrap();
LogicalPlan::Extension(Extension {
node: Arc::new(SeriesDivide::new(
vec!["host".to_string()],
time_index.to_string(),
sorted,
)),
})
}
}
/// Metrics for a region of a partition.
#[derive(Debug, Clone)]
struct RegionMetrics {

View File

@@ -20,22 +20,17 @@ use ahash::HashMap;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::greptime_timestamp;
use datafusion::arrow::compute::SortOptions;
use datafusion::common::Result;
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{DataFusionError, TableReference};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes};
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
pub use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
@@ -167,51 +162,6 @@ impl ExtensionPlanner for DistExtensionPlanner {
return fallback(optimized_plan).await;
};
let remote_orderings = if let Ok(optimized_remote_plan) =
self.optimize_input_logical_plan(session_state, input_plan)
{
planner
.create_physical_plan(&optimized_remote_plan, session_state)
.await
.ok()
.map(|plan| {
let mut remote_orderings = Vec::new();
if let Some(preferred_ordering) =
tsid_time_ordering(plan.as_ref()).filter(|ordering| {
plan.equivalence_properties()
.ordering_satisfy(ordering.clone())
.unwrap_or(false)
})
{
remote_orderings.push(preferred_ordering);
}
if let Some(preferred_ordering) =
MergeScanExec::logical_sort_ordering(session_state, input_plan)
.ok()
.flatten()
.filter(|ordering| {
plan.equivalence_properties()
.ordering_satisfy(ordering.clone())
.unwrap_or(false)
})
{
remote_orderings.push(preferred_ordering);
}
for ordering in plan.equivalence_properties().oeq_class().iter().cloned() {
if !remote_orderings.contains(&ordering) {
remote_orderings.push(ordering);
}
}
remote_orderings
})
.unwrap_or_default()
} else {
Vec::new()
};
// TODO(ruihang): generate different execution plans for different variant merge operation
let schema = optimized_plan.schema().as_arrow();
let query_ctx = session_state
@@ -223,7 +173,6 @@ impl ExtensionPlanner for DistExtensionPlanner {
table_name,
regions,
input_plan.clone(),
remote_orderings,
schema,
self.region_query_handler.clone(),
query_ctx,
@@ -234,31 +183,6 @@ impl ExtensionPlanner for DistExtensionPlanner {
}
}
fn tsid_time_ordering(plan: &dyn ExecutionPlan) -> Option<LexOrdering> {
let schema = plan.schema();
let tsid_index = schema.index_of(DATA_SCHEMA_TSID_COLUMN_NAME).ok()?;
let time_index = schema.index_of(greptime_timestamp()).ok()?;
LexOrdering::new(vec![
PhysicalSortExpr::new(
Arc::new(PhysicalColumn::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
tsid_index,
)),
SortOptions {
descending: false,
nulls_first: true,
},
),
PhysicalSortExpr::new(
Arc::new(PhysicalColumn::new(greptime_timestamp(), time_index)),
SortOptions {
descending: false,
nulls_first: true,
},
),
])
}
impl DistExtensionPlanner {
/// Extract fully resolved table name from logical plan
fn extract_full_table_name(plan: &LogicalPlan) -> Result<Option<TableName>> {

View File

@@ -23,7 +23,6 @@ pub mod string_normalization;
#[cfg(test)]
pub(crate) mod test_util;
pub mod transcribe_atat;
pub mod tsid_join_repartition;
pub mod type_conversion;
pub mod windowed_sort;

View File

@@ -17,10 +17,8 @@ use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion_common::Result as DfResult;
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr::utils::map_columns_before_projection;
use crate::dist_plan::MergeScanExec;
@@ -85,9 +83,7 @@ impl PassDistribution {
let mut new_children = Vec::with_capacity(children.len());
for (idx, child) in children.into_iter().enumerate() {
let child_req = match required.get(idx) {
Some(Distribution::UnspecifiedDistribution) => {
Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, &current_req)
}
Some(Distribution::UnspecifiedDistribution) => None,
None => current_req.clone(),
Some(req) => Some(req.clone()),
};
@@ -107,200 +103,4 @@ impl PassDistribution {
plan.with_new_children(new_children)
}
}
fn propagate_unspecified_child_requirement(
plan: &dyn ExecutionPlan,
idx: usize,
current_req: &Option<Distribution>,
) -> Option<Distribution> {
if idx != 0 {
return None;
}
let Some(Distribution::HashPartitioned(required_exprs)) = current_req else {
return None;
};
let projection = plan.as_any().downcast_ref::<ProjectionExec>()?;
let proj_exprs = projection
.expr()
.iter()
.map(|expr| (Arc::clone(&expr.expr), expr.alias.clone()))
.collect::<Vec<_>>();
let mapped = map_columns_before_projection(required_exprs, &proj_exprs);
(mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped))
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use async_trait::async_trait;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::NullEquality;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning};
use datafusion_expr::{JoinType, LogicalPlanBuilder};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use session::ReadPreference;
use session::context::QueryContext;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
use table::table_name::TableName;
use super::*;
use crate::error::Result as QueryResult;
use crate::region_query::RegionQueryHandler;
struct NoopRegionQueryHandler;
#[async_trait]
impl RegionQueryHandler for NoopRegionQueryHandler {
async fn do_get(
&self,
_read_preference: ReadPreference,
_request: QueryRequest,
) -> QueryResult<SendableRecordBatchStream> {
unreachable!("pass distribution tests should not execute remote queries")
}
}
#[test]
fn passes_hash_requirement_through_projection_to_merge_scan() {
let schema = test_schema();
let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
let left_projection = Arc::new(
ProjectionExec::try_new(
vec![
ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"),
ProjectionExpr::new(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
DATA_SCHEMA_TSID_COLUMN_NAME,
),
ProjectionExpr::new(
partition_column("greptime_timestamp", 2),
"greptime_timestamp",
),
],
left_merge_scan,
)
.unwrap(),
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
let join = Arc::new(
HashJoinExec::try_new(
left_projection,
right_merge_scan,
vec![
(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
),
(
partition_column("greptime_timestamp", 2),
partition_column("greptime_timestamp", 2),
),
],
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNull,
false,
)
.unwrap(),
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
let optimized = PassDistribution
.optimize(join, &ConfigOptions::default())
.unwrap();
let hash_join = optimized.as_any().downcast_ref::<HashJoinExec>().unwrap();
let left_projection = hash_join
.left()
.as_any()
.downcast_ref::<ProjectionExec>()
.unwrap();
let left_partitioning = left_projection.input().output_partitioning();
let right_partitioning = hash_join.right().output_partitioning();
let Partitioning::Hash(left_exprs, left_count) = left_partitioning else {
panic!("expected left merge scan hash partitioning");
};
let Partitioning::Hash(right_exprs, right_count) = right_partitioning else {
panic!("expected right merge scan hash partitioning");
};
assert_eq!(*left_count, 32);
assert_eq!(*right_count, 32);
assert_eq!(
column_names(left_exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
assert_eq!(
column_names(right_exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
}
fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec {
let session_state = SessionStateBuilder::new().with_default_features().build();
let partition_cols = BTreeMap::from([(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([datafusion_common::Column::from_name(
DATA_SCHEMA_TSID_COLUMN_NAME,
)]),
)]);
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
MergeScanExec::new(
&session_state,
TableName::new("greptime", "public", "test"),
vec![RegionId::new(1, 0), RegionId::new(1, 1)],
plan,
vec![],
schema.as_ref(),
Arc::new(NoopRegionQueryHandler),
QueryContext::arc(),
32,
partition_cols,
)
.unwrap()
}
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]))
}
fn partition_column(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(PhysicalColumn::new(name, index))
}
fn column_names(exprs: &[Arc<dyn PhysicalExpr>]) -> Vec<&str> {
exprs
.iter()
.map(|expr| {
expr.as_any()
.downcast_ref::<PhysicalColumn>()
.unwrap()
.name()
})
.collect()
}
}

View File

@@ -1,383 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, Partitioning};
use datafusion_common::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
/// Removes repartitions that only strengthen an existing `__tsid` distribution into
/// `(__tsid, time)` for partitioned hash joins.
#[derive(Debug)]
pub struct TsidJoinRepartition;
impl PhysicalOptimizerRule for TsidJoinRepartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> DfResult<Arc<dyn ExecutionPlan>> {
plan.transform_down(|plan| {
let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() else {
return Ok(Transformed::no(plan));
};
if *hash_join.partition_mode() != PartitionMode::Partitioned {
return Ok(Transformed::no(plan));
}
let left_join_keys = join_side_column_names(hash_join.on(), true);
let right_join_keys = join_side_column_names(hash_join.on(), false);
let Some(left_join_keys) = left_join_keys else {
return Ok(Transformed::no(plan));
};
let Some(right_join_keys) = right_join_keys else {
return Ok(Transformed::no(plan));
};
let Some(left_repartition) =
hash_join.left().as_any().downcast_ref::<RepartitionExec>()
else {
return Ok(Transformed::no(plan));
};
let Some(right_repartition) =
hash_join.right().as_any().downcast_ref::<RepartitionExec>()
else {
return Ok(Transformed::no(plan));
};
let Some(left_input) = removable_tsid_repartition(left_repartition, &left_join_keys)
else {
return Ok(Transformed::no(plan));
};
let Some(right_input) = removable_tsid_repartition(right_repartition, &right_join_keys)
else {
return Ok(Transformed::no(plan));
};
if left_input.output_partitioning().partition_count()
!= right_input.output_partitioning().partition_count()
{
return Ok(Transformed::no(plan));
}
let new_join = hash_join
.builder()
.with_new_children(vec![left_input, right_input])?
.recompute_properties()
.reset_state()
.build_exec()?;
Ok(Transformed::yes(new_join))
})
.map(|result| result.data)
}
fn name(&self) -> &str {
"TsidJoinRepartition"
}
fn schema_check(&self) -> bool {
false
}
}
fn removable_tsid_repartition(
repartition: &RepartitionExec,
join_keys: &BTreeSet<String>,
) -> Option<Arc<dyn ExecutionPlan>> {
if repartition.preserve_order() {
return None;
}
let Partitioning::Hash(requested_exprs, requested_partition_count) = repartition.partitioning()
else {
return None;
};
let Partitioning::Hash(existing_exprs, existing_partition_count) =
repartition.input().output_partitioning()
else {
return None;
};
if *requested_partition_count != *existing_partition_count {
return None;
}
let requested_names = column_names(requested_exprs)?;
if requested_names != *join_keys || !requested_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME) {
return None;
}
let existing_names = column_names(&existing_exprs)?;
if existing_names != BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]) {
return None;
}
if requested_names.len() <= existing_names.len() {
return None;
}
Some(repartition.input().clone())
}
fn join_side_column_names(
on: &[(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)],
left: bool,
) -> Option<BTreeSet<String>> {
on.iter()
.map(|(left_expr, right_expr)| {
if left {
physical_column_name(left_expr)
} else {
physical_column_name(right_expr)
}
})
.collect()
}
fn column_names(exprs: &[Arc<dyn PhysicalExpr>]) -> Option<BTreeSet<String>> {
exprs.iter().map(physical_column_name).collect()
}
fn physical_column_name(expr: &Arc<dyn PhysicalExpr>) -> Option<String> {
expr.as_any()
.downcast_ref::<PhysicalColumn>()
.map(|column| column.name().to_string())
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::common::NullEquality;
use datafusion::execution::TaskContext;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Partitioning, PlanProperties, SendableRecordBatchStream,
displayable,
};
use datafusion_common::Result as DfResult;
use datafusion_expr::JoinType;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use super::*;
#[derive(Debug)]
struct PartitionedTestExec {
properties: Arc<PlanProperties>,
}
impl PartitionedTestExec {
fn new(schema: SchemaRef, partitioning: Partitioning) -> Self {
Self {
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
partitioning,
EmissionType::Incremental,
Boundedness::Bounded,
)),
}
}
}
impl DisplayAs for PartitionedTestExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
write!(f, "PartitionedTestExec")
}
}
impl ExecutionPlan for PartitionedTestExec {
fn name(&self) -> &str {
"PartitionedTestExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> DfResult<SendableRecordBatchStream> {
unreachable!("optimizer tests should not execute PartitionedTestExec")
}
}
#[test]
fn removes_repartition_for_tsid_strengthening_join() {
let schema = test_schema();
let left = repartitioned_child(
schema.clone(),
vec![partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0)],
);
let right = repartitioned_child(
schema.clone(),
vec![partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0)],
);
let join_on = vec![
(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0),
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0),
),
(
partition_column("greptime_timestamp", 1),
partition_column("greptime_timestamp", 1),
),
];
let join = Arc::new(
HashJoinExec::try_new(
left,
right,
join_on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNull,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let optimized = TsidJoinRepartition
.optimize(join, &ConfigOptions::default())
.unwrap();
let hash_join = optimized.as_any().downcast_ref::<HashJoinExec>().unwrap();
assert!(!hash_join.left().as_any().is::<RepartitionExec>());
assert!(!hash_join.right().as_any().is::<RepartitionExec>());
let plan_str = displayable(optimized.as_ref()).indent(false).to_string();
assert!(!plan_str.contains("RepartitionExec"), "{plan_str}");
}
#[test]
fn keeps_repartition_without_existing_tsid_distribution() {
let schema = test_schema();
let left = repartitioned_child(
schema.clone(),
vec![partition_column("greptime_timestamp", 1)],
);
let right = repartitioned_child(
schema.clone(),
vec![partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0)],
);
let join_on = vec![
(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0),
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0),
),
(
partition_column("greptime_timestamp", 1),
partition_column("greptime_timestamp", 1),
),
];
let join = Arc::new(
HashJoinExec::try_new(
left,
right,
join_on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNull,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let optimized = TsidJoinRepartition
.optimize(join, &ConfigOptions::default())
.unwrap();
let hash_join = optimized.as_any().downcast_ref::<HashJoinExec>().unwrap();
assert!(hash_join.left().as_any().is::<RepartitionExec>());
assert!(hash_join.right().as_any().is::<RepartitionExec>());
}
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]))
}
fn repartitioned_child(
schema: SchemaRef,
existing_partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Arc<dyn ExecutionPlan> {
let input = Arc::new(PartitionedTestExec::new(
schema,
Partitioning::Hash(existing_partition_exprs, 32),
));
Arc::new(
RepartitionExec::try_new(
input,
Partitioning::Hash(
vec![
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 0),
partition_column("greptime_timestamp", 1),
],
32,
),
)
.unwrap(),
)
}
fn partition_column(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(PhysicalColumn::new(name, index))
}
}

View File

@@ -68,7 +68,6 @@ use crate::optimizer::remove_duplicate::RemoveDuplicate;
use crate::optimizer::scan_hint::ScanHintRule;
use crate::optimizer::string_normalization::StringNormalizationRule;
use crate::optimizer::transcribe_atat::TranscribeAtatRule;
use crate::optimizer::tsid_join_repartition::TsidJoinRepartition;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
use crate::options::QueryOptions as QueryOptionsNew;
@@ -195,8 +194,6 @@ impl QueryEngineState {
physical_optimizer
.rules
.push(Arc::new(WindowedSortPhysicalRule));
// Relax redundant repartitions for tsid-based PromQL joins after distribution is enforced.
physical_optimizer.rules.push(Arc::new(TsidJoinRepartition));
// explicitly not do filter pushdown for windowed sort&part sort
// (notice that `PartSortExec` create another new dyn filter that need to be pushdown if want to use dyn filter optimization)
// benchmark shows it can cause performance regression due to useless filtering and extra shuffle.

View File

@@ -100,35 +100,31 @@ TQL EXPLAIN (
)
);
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 |
| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_merge(__sum_state(prom_avg_over_time(ts_range,v))) AS sum(prom_avg_over_time(ts_range,v))]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_state(prom_avg_over_time(ts_range,v))]] |
| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL |
| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 |
| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST |
| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) |
| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts |
| | SubqueryAlias: test_tsid |
| | Filter: phy.__table_id=UInt32(REDACTED) |
| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] |
| | ]] |
| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 |
| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] |
| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL |
| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 |
| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST |
| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) |
| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts |
| | SubqueryAlias: test_tsid |
| | Filter: phy.__table_id=UInt32(REDACTED) |
| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] |
| | ]] |
| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 |
| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] |
| | RepartitionExec: REDACTED
| | AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] |
| | RepartitionExec: REDACTED
| | AggregateExec: mode=Partial, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
CREATE FLOW IF NOT EXISTS test_tsid
SINK TO 'test_tsid_output'

View File

@@ -196,7 +196,6 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|_|_|
| physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_|
| physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_|
| physical_plan after TsidJoinRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_|
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_|
@@ -343,7 +342,6 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
|_|_|
| physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_|
| physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_|
| physical_plan after TsidJoinRepartition_| SAME TEXT AS ABOVE_|
| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_|
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_|

View File

@@ -42,16 +42,14 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
@@ -73,16 +71,14 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, instance@3 as instance, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
@@ -107,6 +103,12 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid
|_|_|_REDACTED
|_|_|_ScalarCalculateExec: tags=[] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
@@ -114,25 +116,17 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_ProjectionExec: expr=[ts@3 as ts, job@1 as job] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED
@@ -160,6 +154,12 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
|_|_|_REDACTED
|_|_|_ScalarCalculateExec: tags=[] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
@@ -167,16 +167,6 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_ProjectionExec: expr=[ts@1 as ts, job@0 as job] REDACTED
|_|_|_FilterExec: val@0 IS NOT NULL, projection=[job@1, ts@2] REDACTED
|_|_|_ProjectionExec: expr=[val@0 as val, job@1 as job, ts@3 as ts] REDACTED
@@ -185,9 +175,11 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
|_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED