From fe2a9f91d40918bf8415f7e3ecfd22c0d58cca24 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 20 Mar 2026 07:06:11 +0800 Subject: [PATCH] use tsid in manipulate plan, resolve_tag_columns walks whole plan Signed-off-by: Ruihang Xia --- .../src/extension_plan/instant_manipulate.rs | 93 +++++++++++++++++-- src/query/src/promql/planner.rs | 26 +++++- 2 files changed, 111 insertions(+), 8 deletions(-) diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 14cf629c39..1e01166689 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -225,14 +225,19 @@ impl InstantManipulate { return tag_columns.to_vec(); } - let LogicalPlan::Extension(Extension { node }) = input else { - return Vec::new(); - }; + Self::find_series_divide_tags(input).unwrap_or_default() + } - node.as_any() - .downcast_ref::() - .map(|series_divide| series_divide.tags().to_vec()) - .unwrap_or_default() + fn find_series_divide_tags(plan: &LogicalPlan) -> Option> { + if let LogicalPlan::Extension(Extension { node }) = plan + && let Some(series_divide) = node.as_any().downcast_ref::() + { + return Some(series_divide.tags().to_vec()); + } + + plan.inputs() + .into_iter() + .find_map(Self::find_series_divide_tags) } pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { @@ -780,6 +785,80 @@ mod test { assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]); } + #[test] + fn rebuild_should_recover_tag_columns_from_series_normalize_input() { + let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let series_divide = LogicalPlan::Extension(Extension { + node: Arc::new(SeriesDivide::new( + vec!["__tsid".to_string()], + TIME_INDEX_COLUMN.to_string(), + input, + )), + }); + let series_normalize = LogicalPlan::Extension(Extension { + node: Arc::new(crate::extension_plan::SeriesNormalize::new( + 0, + TIME_INDEX_COLUMN, + false, + vec!["__tsid".to_string()], + series_divide, + )), + }); + let bytes = InstantManipulate::new( + 0, + 0, + 0, + 0, + TIME_INDEX_COLUMN.to_string(), + vec!["__tsid".to_string()], + Some("value".to_string()), + series_normalize.clone(), + ) + .serialize(); + let plan = InstantManipulate::deserialize(&bytes) + .unwrap() + .with_exprs_and_inputs(vec![], vec![series_normalize]) + .unwrap(); + + assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]); + } + + #[test] + fn to_execution_plan_enables_tsid_fast_path() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + ])); + let exec_input: Arc = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[], schema, None).unwrap(), + ))); + + let exec = InstantManipulate::new( + 0, + 0, + 0, + 0, + TIME_INDEX_COLUMN.to_string(), + vec!["__tsid".to_string()], + Some("value".to_string()), + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(datafusion::common::DFSchema::empty()), + }), + ) + .to_execution_plan(exec_input); + + assert!(format!("{exec:?}").contains("reuse_all_non_sample_columns: true")); + } + #[tokio::test] async fn tsid_fast_path_reuses_non_sample_columns_when_output_grows() { let schema = Arc::new(Schema::new(vec![ diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 0eff87952c..256b9d5abf 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -915,7 +915,11 @@ impl PromPlanner { .time_index_column .clone() .expect("time index should be set in `setup_context`"), - self.ctx.tag_columns.clone(), + if self.ctx.use_tsid { + vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()] + } else { + self.ctx.tag_columns.clone() + }, self.ctx.field_columns.first().cloned(), normalize, ); @@ -4021,6 +4025,10 @@ mod test { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::greptime_timestamp; use common_query::test_util::DummyDecoder; + use datafusion::arrow::datatypes::Schema as ArrowSchema; + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::datasource::source::DataSourceExec; + use datafusion::logical_expr::Extension; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use promql_parser::label::Labels; @@ -4033,6 +4041,16 @@ mod test { use crate::options::QueryOptions; use crate::parser::QueryLanguageParser; + fn find_instant_manipulate(plan: &LogicalPlan) -> Option<&InstantManipulate> { + if let LogicalPlan::Extension(Extension { node }) = plan + && let Some(instant_manipulate) = node.as_any().downcast_ref::() + { + return Some(instant_manipulate); + } + + plan.inputs().into_iter().find_map(find_instant_manipulate) + } + fn build_query_engine_state() -> QueryEngineState { QueryEngineState::new( new_memory_catalog_manager().unwrap(), @@ -4657,6 +4675,12 @@ mod test { .iter() .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) ); + + let manipulate = find_instant_manipulate(&plan).unwrap(); + let exec = manipulate.to_execution_plan(Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[], Arc::new(ArrowSchema::empty()), None).unwrap(), + )))); + assert!(format!("{exec:?}").contains("reuse_all_non_sample_columns: true")); } #[tokio::test]