perf: preserve tsid distribution through merge scans

This commit is contained in:
Ruihang Xia
2026-03-29 08:44:05 +08:00
parent 556f7e74fc
commit 6659a932f4
6 changed files with 772 additions and 107 deletions

View File

@@ -27,6 +27,7 @@ use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Subquery, col as col_fn};
use datafusion_optimizer::analyzer::AnalyzerRule;
use promql::extension_plan::SeriesDivide;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
@@ -509,6 +510,20 @@ impl PlanRewriter {
.map(|index| schema.column_name_by_index(index).to_string())
.collect::<Vec<String>>();
// Metric engine scans can project the internal `__tsid` column even though it is
// not part of the logical table schema. Equal `__tsid` values always belong to the
// same series, so carry it as an additional partition key when it is available.
if plan
.schema()
.index_of_column_by_name(None, DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some()
&& !partition_cols
.iter()
.any(|col| col == DATA_SCHEMA_TSID_COLUMN_NAME)
{
partition_cols.push(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
}
let partition_rules = table.partition_rules();
let exist_phy_part_cols_not_in_logical_table = partition_rules
.map(|r| !r.extra_phy_cols_not_in_logical_table.is_empty())

View File

@@ -21,6 +21,7 @@ use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptio
use async_stream::stream;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_plugins::GREPTIME_EXEC_READ_COST;
use common_query::prelude::greptime_timestamp;
use common_query::request::QueryRequest;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_telemetry::tracing_context::TracingContext;
@@ -37,12 +38,15 @@ use datafusion::physical_plan::{
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr,
};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use session::context::QueryContextRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::time::Instant;
@@ -160,12 +164,168 @@ impl std::fmt::Debug for MergeScanExec {
}
impl MergeScanExec {
fn default_partition_exprs(
session_state: &SessionState,
plan: &LogicalPlan,
partition_cols: &AliasMapping,
) -> Vec<Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
partition_cols
.iter()
.filter_map(|col| Self::partition_expr_for_alias(session_state, plan, col.1.first()))
.collect()
}
fn partition_expr_for_alias(
session_state: &SessionState,
plan: &LogicalPlan,
column: Option<&ColumnExpr>,
) -> Option<Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
let column = column?;
session_state
.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(column.name().to_string())),
plan.schema(),
)
.ok()
}
fn prefer_tsid_partition_exprs(
session_state: &SessionState,
plan: &LogicalPlan,
partition_cols: &AliasMapping,
) -> Option<Vec<Arc<dyn datafusion_physical_expr::PhysicalExpr>>> {
if !Self::is_promql_tsid_ordered_plan(plan) {
return None;
}
let tsid_aliases = partition_cols.get(DATA_SCHEMA_TSID_COLUMN_NAME)?;
let tsid_expr = Self::partition_expr_for_alias(session_state, plan, tsid_aliases.first())?;
Some(vec![tsid_expr])
}
fn is_promql_tsid_ordered_plan(plan: &LogicalPlan) -> bool {
match plan {
LogicalPlan::Sort(sort) => {
if sort.expr.len() != 2 {
return false;
}
let [tsid_sort, time_sort] = sort.expr.as_slice() else {
return false;
};
Self::is_ascending_nulls_first_sort(tsid_sort, DATA_SCHEMA_TSID_COLUMN_NAME)
&& Self::is_ascending_nulls_first_sort(time_sort, greptime_timestamp())
}
LogicalPlan::Projection(projection) => {
Self::is_promql_tsid_ordered_plan(projection.input.as_ref())
}
LogicalPlan::Filter(filter) => Self::is_promql_tsid_ordered_plan(filter.input.as_ref()),
LogicalPlan::SubqueryAlias(alias) => {
Self::is_promql_tsid_ordered_plan(alias.input.as_ref())
}
LogicalPlan::Extension(extension)
if matches!(
extension.node.name(),
"PromInstantManipulate"
| "PromSeriesDivide"
| "PromNormalize"
| "PromScalarCalculate"
| "PromRangeManipulate"
) =>
{
extension
.node
.inputs()
.first()
.is_some_and(|input| Self::is_promql_tsid_ordered_plan(input))
}
_ => false,
}
}
fn is_ascending_nulls_first_sort(
sort_expr: &datafusion_expr::expr::Sort,
column: &str,
) -> bool {
sort_expr.asc
&& sort_expr.nulls_first
&& matches!(
sort_expr.expr.try_as_col(),
Some(col) if col.name == column
)
}
pub(crate) fn logical_sort_ordering(
session_state: &SessionState,
plan: &LogicalPlan,
) -> Result<Option<LexOrdering>> {
if Self::is_promql_tsid_ordered_plan(plan) {
let tsid_expr = session_state.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
)),
plan.schema(),
)?;
let time_expr = session_state.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(
greptime_timestamp().to_string(),
)),
plan.schema(),
)?;
return Ok(LexOrdering::new(vec![
PhysicalSortExpr::new(
tsid_expr,
SortOptions {
descending: false,
nulls_first: true,
},
),
PhysicalSortExpr::new(
time_expr,
SortOptions {
descending: false,
nulls_first: true,
},
),
]));
}
let LogicalPlan::Sort(sort) = plan else {
return Ok(None);
};
let lex_ordering = LexOrdering::new(
sort.expr
.iter()
.map(|sort_expr| {
let physical_expr = session_state
.create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
Ok(PhysicalSortExpr::new(
physical_expr,
SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
))
})
.collect::<Result<Vec<_>>>()?,
)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Sort plan must contain at least one expression: {plan}"
))
})?;
Ok(Some(lex_ordering))
}
#[allow(clippy::too_many_arguments)]
pub fn new(
session_state: &SessionState,
table: TableName,
regions: Vec<RegionId>,
plan: LogicalPlan,
remote_orderings: Vec<LexOrdering>,
arrow_schema: &ArrowSchema,
region_query_handler: RegionQueryHandlerRef,
query_ctx: QueryContextRef,
@@ -184,46 +344,23 @@ impl MergeScanExec {
// break the ordering on merging (of MergeScan).
//
// Otherwise, we need to use the default ordering.
let eq_properties = if let LogicalPlan::Sort(sort) = &plan
&& target_partition >= regions.len()
{
let lex_ordering = sort
.expr
.iter()
.map(|sort_expr| {
let physical_expr = session_state
.create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
Ok(PhysicalSortExpr::new(
physical_expr,
SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
))
})
.collect::<Result<Vec<_>>>()?;
EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
let eq_properties = if target_partition >= regions.len() {
if !remote_orderings.is_empty() {
EquivalenceProperties::new_with_orderings(arrow_schema.clone(), remote_orderings)
} else if let Some(ordering) = Self::logical_sort_ordering(session_state, &plan)? {
EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![ordering])
} else {
EquivalenceProperties::new(arrow_schema.clone())
}
} else {
EquivalenceProperties::new(arrow_schema.clone())
};
let partition_exprs = partition_cols
.iter()
.filter_map(|col| {
if let Some(first_alias) = col.1.first() {
session_state
.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(
first_alias.name().to_string(),
)),
plan.schema(),
)
.ok()
} else {
None
}
})
.collect();
let partition_exprs =
Self::prefer_tsid_partition_exprs(session_state, &plan, &partition_cols)
.unwrap_or_else(|| {
Self::default_partition_exprs(session_state, &plan, &partition_cols)
});
let partitioning = Partitioning::Hash(partition_exprs, target_partition);
let properties = Arc::new(PlanProperties::new(
@@ -421,31 +558,49 @@ impl MergeScanExec {
return None;
};
if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
&& curr_dist == &hash_exprs
{
// No need to change the distribution
return None;
}
let all_partition_col_aliases: HashSet<_> = self
.partition_cols
.values()
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
.collect();
let mut overlaps = vec![];
for expr in &hash_exprs {
if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
&& all_partition_col_aliases.contains(col_expr.name())
{
overlaps.push(expr.clone());
}
let mut overlaps = hash_exprs
.iter()
.filter(|expr| {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
})
.cloned()
.collect::<Vec<_>>();
// Metric-engine scans can satisfy any hash distribution that includes `__tsid`.
// Equal requested keys must also share the same `__tsid`, and equal `__tsid` values are
// guaranteed to stay co-located across MergeScan partitions. Advertise the full requested
// distribution so EnforceDistribution can skip redundant reshuffles.
if self
.arrow_schema
.column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some()
&& hash_exprs.iter().any(|expr| {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
})
{
overlaps = hash_exprs.clone();
}
if overlaps.is_empty() {
return None;
}
if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
&& curr_dist == &overlaps
{
// No need to change the distribution.
return None;
}
Some(Self {
table: self.table.clone(),
regions: self.regions.clone(),
@@ -494,6 +649,213 @@ impl MergeScanExec {
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use async_trait::async_trait;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_common::ToDFSchema;
use datafusion_expr::{EmptyRelation, LogicalPlan, LogicalPlanBuilder, col};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
use session::ReadPreference;
use session::context::QueryContext;
use super::*;
use crate::error::Result as QueryResult;
use crate::region_query::RegionQueryHandler;
struct NoopRegionQueryHandler;
#[async_trait]
impl RegionQueryHandler for NoopRegionQueryHandler {
async fn do_get(
&self,
_read_preference: ReadPreference,
_request: QueryRequest,
) -> QueryResult<SendableRecordBatchStream> {
unreachable!("merge scan distribution tests should not execute remote queries")
}
}
#[test]
fn try_with_new_distribution_satisfies_tsid_hash_requirements() {
let merge_scan = test_merge_scan_exec(
BTreeMap::from([
(
"host".to_string(),
BTreeSet::from([ColumnExpr::from_name("host")]),
),
(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]),
),
]),
vec![
partition_column("host", 0),
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
],
);
let optimized = merge_scan
.try_with_new_distribution(Distribution::HashPartitioned(vec![
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
partition_column("greptime_timestamp", 2),
]))
.unwrap();
let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning
else {
panic!("expected hash partitioning");
};
assert_eq!(partition_count, 32);
assert_eq!(
column_names(&exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
}
#[test]
fn try_with_new_distribution_keeps_regular_partition_overlap() {
let merge_scan = test_merge_scan_exec(
BTreeMap::from([(
"host".to_string(),
BTreeSet::from([ColumnExpr::from_name("host")]),
)]),
vec![partition_column("greptime_timestamp", 2)],
);
let optimized = merge_scan
.try_with_new_distribution(Distribution::HashPartitioned(vec![
partition_column("host", 0),
partition_column("greptime_timestamp", 2),
]))
.unwrap();
let Partitioning::Hash(ref exprs, partition_count) = optimized.properties.partitioning
else {
panic!("expected hash partitioning");
};
assert_eq!(partition_count, 32);
assert_eq!(column_names(&exprs), vec!["host"]);
}
#[test]
fn new_prefers_tsid_partitioning_for_promql_tsid_sort() {
let session_state = SessionStateBuilder::new().with_default_features().build();
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
greptime_timestamp(),
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]));
let partition_cols = BTreeMap::from([
(
"host".to_string(),
BTreeSet::from([ColumnExpr::from_name("host")]),
),
(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([ColumnExpr::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)]),
),
]);
let plan = promql_tsid_sorted_plan(schema.clone());
let merge_scan = MergeScanExec::new(
&session_state,
TableName::new("greptime", "public", "test"),
vec![RegionId::new(1, 0), RegionId::new(1, 1)],
plan,
vec![],
schema.as_ref(),
Arc::new(NoopRegionQueryHandler),
QueryContext::arc(),
32,
partition_cols,
)
.unwrap();
let Partitioning::Hash(ref exprs, partition_count) = merge_scan.properties.partitioning
else {
panic!("expected hash partitioning");
};
assert_eq!(partition_count, 32);
assert_eq!(column_names(exprs), vec![DATA_SCHEMA_TSID_COLUMN_NAME]);
}
fn test_merge_scan_exec(
partition_cols: AliasMapping,
current_partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> MergeScanExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]));
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
MergeScanExec {
table: TableName::new("greptime", "public", "test"),
regions: vec![RegionId::new(1, 0), RegionId::new(1, 1)],
plan,
arrow_schema: schema.clone(),
region_query_handler: Arc::new(NoopRegionQueryHandler),
metric: ExecutionPlanMetricsSet::new(),
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::Hash(current_partition_exprs, 32),
EmissionType::Incremental,
Boundedness::Bounded,
)),
sub_stage_metrics: Arc::default(),
partition_metrics: Arc::default(),
query_ctx: QueryContext::arc(),
target_partition: 32,
partition_cols,
}
}
fn partition_column(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new(name, index))
}
fn column_names(exprs: &[Arc<dyn PhysicalExpr>]) -> Vec<&str> {
exprs
.iter()
.map(|expr| expr.as_any().downcast_ref::<Column>().unwrap().name())
.collect()
}
fn promql_tsid_sorted_plan(schema: Arc<Schema>) -> LogicalPlan {
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: schema.to_dfschema_ref().unwrap(),
});
LogicalPlanBuilder::from(input)
.sort(vec![
col(DATA_SCHEMA_TSID_COLUMN_NAME).sort(true, true),
col(greptime_timestamp()).sort(true, true),
])
.unwrap()
.build()
.unwrap()
}
}
/// Metrics for a region of a partition.
#[derive(Debug, Clone)]
struct RegionMetrics {

View File

@@ -20,17 +20,22 @@ use ahash::HashMap;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::greptime_timestamp;
use datafusion::arrow::compute::SortOptions;
use datafusion::common::Result;
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{DataFusionError, TableReference};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes};
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
pub use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
@@ -162,6 +167,51 @@ impl ExtensionPlanner for DistExtensionPlanner {
return fallback(optimized_plan).await;
};
let remote_orderings = if let Ok(optimized_remote_plan) =
self.optimize_input_logical_plan(session_state, input_plan)
{
planner
.create_physical_plan(&optimized_remote_plan, session_state)
.await
.ok()
.map(|plan| {
let mut remote_orderings = Vec::new();
if let Some(preferred_ordering) =
tsid_time_ordering(plan.as_ref()).filter(|ordering| {
plan.equivalence_properties()
.ordering_satisfy(ordering.clone())
.unwrap_or(false)
})
{
remote_orderings.push(preferred_ordering);
}
if let Some(preferred_ordering) =
MergeScanExec::logical_sort_ordering(session_state, input_plan)
.ok()
.flatten()
.filter(|ordering| {
plan.equivalence_properties()
.ordering_satisfy(ordering.clone())
.unwrap_or(false)
})
{
remote_orderings.push(preferred_ordering);
}
for ordering in plan.equivalence_properties().oeq_class().iter().cloned() {
if !remote_orderings.contains(&ordering) {
remote_orderings.push(ordering);
}
}
remote_orderings
})
.unwrap_or_default()
} else {
Vec::new()
};
// TODO(ruihang): generate different execution plans for different variant merge operation
let schema = optimized_plan.schema().as_arrow();
let query_ctx = session_state
@@ -173,6 +223,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
table_name,
regions,
input_plan.clone(),
remote_orderings,
schema,
self.region_query_handler.clone(),
query_ctx,
@@ -183,6 +234,31 @@ impl ExtensionPlanner for DistExtensionPlanner {
}
}
fn tsid_time_ordering(plan: &dyn ExecutionPlan) -> Option<LexOrdering> {
let schema = plan.schema();
let tsid_index = schema.index_of(DATA_SCHEMA_TSID_COLUMN_NAME).ok()?;
let time_index = schema.index_of(greptime_timestamp()).ok()?;
LexOrdering::new(vec![
PhysicalSortExpr::new(
Arc::new(PhysicalColumn::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
tsid_index,
)),
SortOptions {
descending: false,
nulls_first: true,
},
),
PhysicalSortExpr::new(
Arc::new(PhysicalColumn::new(greptime_timestamp(), time_index)),
SortOptions {
descending: false,
nulls_first: true,
},
),
])
}
impl DistExtensionPlanner {
/// Extract fully resolved table name from logical plan
fn extract_full_table_name(plan: &LogicalPlan) -> Result<Option<TableName>> {

View File

@@ -17,8 +17,10 @@ use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion_common::Result as DfResult;
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr::utils::map_columns_before_projection;
use crate::dist_plan::MergeScanExec;
@@ -83,7 +85,9 @@ impl PassDistribution {
let mut new_children = Vec::with_capacity(children.len());
for (idx, child) in children.into_iter().enumerate() {
let child_req = match required.get(idx) {
Some(Distribution::UnspecifiedDistribution) => None,
Some(Distribution::UnspecifiedDistribution) => {
Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, &current_req)
}
None => current_req.clone(),
Some(req) => Some(req.clone()),
};
@@ -103,4 +107,200 @@ impl PassDistribution {
plan.with_new_children(new_children)
}
}
fn propagate_unspecified_child_requirement(
plan: &dyn ExecutionPlan,
idx: usize,
current_req: &Option<Distribution>,
) -> Option<Distribution> {
if idx != 0 {
return None;
}
let Some(Distribution::HashPartitioned(required_exprs)) = current_req else {
return None;
};
let projection = plan.as_any().downcast_ref::<ProjectionExec>()?;
let proj_exprs = projection
.expr()
.iter()
.map(|expr| (Arc::clone(&expr.expr), expr.alias.clone()))
.collect::<Vec<_>>();
let mapped = map_columns_before_projection(required_exprs, &proj_exprs);
(mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped))
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use async_trait::async_trait;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::NullEquality;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning};
use datafusion_expr::{JoinType, LogicalPlanBuilder};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use session::ReadPreference;
use session::context::QueryContext;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
use table::table_name::TableName;
use super::*;
use crate::error::Result as QueryResult;
use crate::region_query::RegionQueryHandler;
struct NoopRegionQueryHandler;
#[async_trait]
impl RegionQueryHandler for NoopRegionQueryHandler {
async fn do_get(
&self,
_read_preference: ReadPreference,
_request: QueryRequest,
) -> QueryResult<SendableRecordBatchStream> {
unreachable!("pass distribution tests should not execute remote queries")
}
}
#[test]
fn passes_hash_requirement_through_projection_to_merge_scan() {
let schema = test_schema();
let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
let left_projection = Arc::new(
ProjectionExec::try_new(
vec![
ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"),
ProjectionExpr::new(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
DATA_SCHEMA_TSID_COLUMN_NAME,
),
ProjectionExpr::new(
partition_column("greptime_timestamp", 2),
"greptime_timestamp",
),
],
left_merge_scan,
)
.unwrap(),
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
let join = Arc::new(
HashJoinExec::try_new(
left_projection,
right_merge_scan,
vec![
(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
),
(
partition_column("greptime_timestamp", 2),
partition_column("greptime_timestamp", 2),
),
],
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNull,
false,
)
.unwrap(),
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
let optimized = PassDistribution
.optimize(join, &ConfigOptions::default())
.unwrap();
let hash_join = optimized.as_any().downcast_ref::<HashJoinExec>().unwrap();
let left_projection = hash_join
.left()
.as_any()
.downcast_ref::<ProjectionExec>()
.unwrap();
let left_partitioning = left_projection.input().output_partitioning();
let right_partitioning = hash_join.right().output_partitioning();
let Partitioning::Hash(left_exprs, left_count) = left_partitioning else {
panic!("expected left merge scan hash partitioning");
};
let Partitioning::Hash(right_exprs, right_count) = right_partitioning else {
panic!("expected right merge scan hash partitioning");
};
assert_eq!(*left_count, 32);
assert_eq!(*right_count, 32);
assert_eq!(
column_names(left_exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
assert_eq!(
column_names(right_exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
}
fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec {
let session_state = SessionStateBuilder::new().with_default_features().build();
let partition_cols = BTreeMap::from([(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([datafusion_common::Column::from_name(
DATA_SCHEMA_TSID_COLUMN_NAME,
)]),
)]);
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
MergeScanExec::new(
&session_state,
TableName::new("greptime", "public", "test"),
vec![RegionId::new(1, 0), RegionId::new(1, 1)],
plan,
vec![],
schema.as_ref(),
Arc::new(NoopRegionQueryHandler),
QueryContext::arc(),
32,
partition_cols,
)
.unwrap()
}
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]))
}
fn partition_column(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(PhysicalColumn::new(name, index))
}
fn column_names(exprs: &[Arc<dyn PhysicalExpr>]) -> Vec<&str> {
exprs
.iter()
.map(|expr| {
expr.as_any()
.downcast_ref::<PhysicalColumn>()
.unwrap()
.name()
})
.collect()
}
}

View File

@@ -100,31 +100,35 @@ TQL EXPLAIN (
)
);
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] |
| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL |
| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 |
| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST |
| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) |
| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts |
| | SubqueryAlias: test_tsid |
| | Filter: phy.__table_id=UInt32(REDACTED) |
| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] |
| | ]] |
| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 |
| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 |
| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_merge(__sum_state(prom_avg_over_time(ts_range,v))) AS sum(prom_avg_over_time(ts_range,v))]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[__sum_state(prom_avg_over_time(ts_range,v))]] |
| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL |
| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 |
| | PromRangeManipulate: req range=[1769139000000..1769139900000], interval=[60000], eval range=[1800000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST |
| | Filter: test_tsid.ts >= TimestampMillisecond(1769137200001, None) AND test_tsid.ts <= TimestampMillisecond(1769139900000, None) |
| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts |
| | SubqueryAlias: test_tsid |
| | Filter: phy.__table_id=UInt32(REDACTED) |
| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] |
| | ]] |
| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 |
| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] |
| | RepartitionExec: REDACTED
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] |
| | RepartitionExec: REDACTED
| | AggregateExec: mode=Partial, gby=[le@0 as le, tag4@1 as tag4, tag5@2 as tag5, ts@3 as ts], aggr=[sum(prom_avg_over_time(ts_range,v))] |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
CREATE FLOW IF NOT EXISTS test_tsid
SINK TO 'test_tsid_output'

View File

@@ -42,14 +42,16 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
| 0_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[__sum_state(tsid_metric.val)] REDACTED
|_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
@@ -71,14 +73,16 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
| 0_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[__sum_state(tsid_metric.val), __first_value_state(tsid_metric.__tsid)] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, instance@3 as instance, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
@@ -103,12 +107,6 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid
|_|_|_REDACTED
|_|_|_ScalarCalculateExec: tags=[] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
@@ -116,17 +114,25 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_ProjectionExec: expr=[ts@3 as ts, job@1 as job] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
|_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED
@@ -154,12 +160,6 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
|_|_|_REDACTED
|_|_|_ScalarCalculateExec: tags=[] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED
@@ -167,6 +167,16 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED
|_|_|_ProjectionExec: expr=[ts@1 as ts, job@0 as job] REDACTED
|_|_|_FilterExec: val@0 IS NOT NULL, projection=[job@1, ts@2] REDACTED
|_|_|_ProjectionExec: expr=[val@0 as val, job@1 as job, ts@3 as ts] REDACTED
@@ -175,11 +185,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m
|_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[__sum_state(prom_irate(ts_range,val))] REDACTED
|_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED