From 966ade75654389bf7cb50ea21949dbc9dd343cfa Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 21 Jan 2026 14:25:36 +0800 Subject: [PATCH] feat: use tsid on promql planner (#7590) * expose tsid on logical table's schema and use it on planner Signed-off-by: Ruihang Xia * detect table type on planner Signed-off-by: Ruihang Xia * simplification Signed-off-by: Ruihang Xia * filter out internal columns Signed-off-by: Ruihang Xia * reset tsid flag Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/query/src/promql/planner.rs | 917 +++++++++++++++++- .../tql-explain-analyze/tsid_column.result | 107 ++ .../tql-explain-analyze/tsid_column.sql | 47 + 3 files changed, 1031 insertions(+), 40 deletions(-) create mode 100644 tests/cases/standalone/tql-explain-analyze/tsid_column.result create mode 100644 tests/cases/standalone/tql-explain-analyze/tsid_column.sql diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 61f5aeff43..cc0febb89d 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -47,7 +47,7 @@ use datafusion::sql::TableReference; use datafusion_common::{DFSchema, NullEquality}; use datafusion_expr::expr::WindowFunctionParams; use datafusion_expr::utils::conjunction; -use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit}; +use datafusion_expr::{ExprSchemable, Literal, SortExpr, TableSource, col, lit}; use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -73,7 +73,8 @@ use promql_parser::parser::{ use regex::{self, Regex}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metric_engine_consts::{ - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, + METRIC_ENGINE_NAME, is_metric_engine_internal_column, }; use table::table::adapter::DfTableProviderAdapter; @@ -138,6 +139,12 @@ struct PromPlannerContext { time_index_column: Option, field_columns: Vec, tag_columns: Vec, + /// Use metric engine internal series identifier column (`__tsid`) as series key. + /// + /// This is enabled only when the underlying scan can provide `__tsid` (`UInt64`). The planner + /// uses it internally (e.g. as the series key for [`SeriesDivide`]) and strips it from the + /// final output. + use_tsid: bool, /// The matcher for field columns `__field__`. field_column_matcher: Option>, /// The matcher for selectors (normal matchers). @@ -164,6 +171,7 @@ impl PromPlannerContext { self.time_index_column = None; self.field_columns = vec![]; self.tag_columns = vec![]; + self.use_tsid = false; self.field_column_matcher = None; self.selector_matcher.clear(); self.schema_name = None; @@ -174,6 +182,7 @@ impl PromPlannerContext { fn reset_table_name_and_schema(&mut self) { self.table_name = Some(String::new()); self.schema_name = None; + self.use_tsid = false; } /// Check if `le` is present in tag columns @@ -204,11 +213,14 @@ impl PromPlanner { .await?; // Apply alias if provided - if let Some(alias_name) = alias { - planner.apply_alias_projection(plan, alias_name) + let plan = if let Some(alias_name) = alias { + planner.apply_alias_projection(plan, alias_name)? } else { - Ok(plan) - } + plan + }; + + // Never leak internal series identifier to output. + planner.strip_tsid_column(plan) } #[cfg(test)] @@ -342,19 +354,40 @@ impl PromPlanner { } = aggr_expr; let input = self.prom_expr_to_plan(expr, query_engine_state).await?; + let input_has_tsid = input.schema().fields().iter().any(|field| { + field.name() == DATA_SCHEMA_TSID_COLUMN_NAME + && field.data_type() == &ArrowDataType::UInt64 + }); match (*op).id() { token::T_TOPK | token::T_BOTTOMK => { self.prom_topk_bottomk_to_plan(aggr_expr, input).await } _ => { + let input_tag_columns = self.ctx.tag_columns.clone(); // calculate columns to group by // Need to append time index column into group by columns let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?; // convert op and value columns to aggregate exprs - let (aggr_exprs, prev_field_exprs) = + let (mut aggr_exprs, prev_field_exprs) = self.create_aggregate_exprs(*op, param, &input)?; + let keep_tsid = op.id() != token::T_COUNT_VALUES + && input_has_tsid + && input_tag_columns.iter().collect::>() + == self.ctx.tag_columns.iter().collect::>(); + + if keep_tsid { + aggr_exprs.push( + first_value( + DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)), + vec![], + ) + .alias(DATA_SCHEMA_TSID_COLUMN_NAME), + ); + } + self.ctx.use_tsid = keep_tsid; + // create plan let builder = LogicalPlanBuilder::from(input); let builder = if op.id() == token::T_COUNT_VALUES { @@ -404,6 +437,12 @@ impl PromPlanner { .. } = aggr_expr; + let input_has_tsid = input.schema().fields().iter().any(|field| { + field.name() == DATA_SCHEMA_TSID_COLUMN_NAME + && field.data_type() == &ArrowDataType::UInt64 + }); + self.ctx.use_tsid = input_has_tsid; + let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?; let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?; @@ -452,6 +491,14 @@ impl PromPlanner { .create_field_column_exprs()? .into_iter() .chain(self.create_tag_column_exprs()?) + .chain( + self.ctx + .use_tsid + .then_some(DfExpr::Column(Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + ))) + .into_iter(), + ) .chain(Some(self.create_time_index_column_expr()?)); LogicalPlanBuilder::from(input) @@ -1147,6 +1194,14 @@ impl PromPlanner { .into_iter() .map(|col| DfExpr::Column(Column::new_unqualified(col))) .chain(self.create_tag_column_exprs()?) + .chain( + self.ctx + .use_tsid + .then_some(DfExpr::Column(Column::new_unqualified( + DATA_SCHEMA_TSID_COLUMN_NAME, + ))) + .into_iter(), + ) .chain(Some(self.create_time_index_column_expr()?)) .collect::>(); @@ -1159,8 +1214,23 @@ impl PromPlanner { } // make sort plan + let series_key_columns = if self.ctx.use_tsid { + vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()] + } else { + self.ctx.tag_columns.clone() + }; + + let sort_exprs = if self.ctx.use_tsid { + vec![ + DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true), + self.create_time_index_column_expr()?.sort(true, true), + ] + } else { + self.create_tag_and_time_index_column_sort_exprs()? + }; + let sort_plan = LogicalPlanBuilder::from(table_scan) - .sort(self.create_tag_and_time_index_column_sort_exprs()?) + .sort(sort_exprs) .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)?; @@ -1175,7 +1245,7 @@ impl PromPlanner { })?; let divide_plan = LogicalPlan::Extension(Extension { node: Arc::new(SeriesDivide::new( - self.ctx.tag_columns.clone(), + series_key_columns.clone(), time_index_column, sort_plan, )), @@ -1194,7 +1264,7 @@ impl PromPlanner { table: table_ref.to_quoted_string(), })?, is_range_selector, - self.ctx.tag_columns.clone(), + series_key_columns, divide_plan, ); let logical_plan = LogicalPlan::Extension(Extension { @@ -1229,6 +1299,9 @@ impl PromPlanner { } let mut exprs = Vec::with_capacity(labels.labels.len()); for label in &labels.labels { + if is_metric_engine_internal_column(label) { + continue; + } // nonexistence label will be ignored if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label) { @@ -1252,6 +1325,10 @@ impl PromPlanner { .map(|f| f.name()) .collect::>(); + // Exclude metric engine internal columns (not PromQL labels) from the implicit + // "without" label set. + all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str())); + // remove "without"-ed fields // nonexistence label will be ignored for label in &labels.labels { @@ -1362,6 +1439,9 @@ impl PromPlanner { } fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option { + if is_metric_engine_internal_column(column) { + return None; + } schema .fields() .iter() @@ -1369,6 +1449,18 @@ impl PromPlanner { .map(|field| field.name().clone()) } + fn table_from_source(&self, source: &Arc) -> Result { + Ok(source + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table_provider + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table()) + } + fn table_ref(&self) -> Result { let table_name = self .ctx @@ -1456,48 +1548,156 @@ impl PromPlanner { .await .context(CatalogSnafu)?; - let is_time_index_ms = provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table() + let logical_table = self.table_from_source(&provider)?; + + let scan_table_ref = table_ref.clone(); + let mut scan_provider = provider; + let mut table_id_filter: Option = None; + + // If it's a metric engine logical table, scan its physical table directly and filter by + // `__table_id = logical_table_id` to get access to internal columns like `__tsid`. + if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME + && let Some(physical_table_name) = logical_table + .table_info() + .meta + .options + .extra_options + .get(LOGICAL_TABLE_METADATA_KEY) + { + let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name { + TableReference::partial(schema_name.as_str(), physical_table_name.as_str()) + } else { + TableReference::bare(physical_table_name.as_str()) + }; + + let physical_provider = match self + .table_provider + .resolve_table(physical_table_ref.clone()) + .await + { + Ok(provider) => provider, + Err(e) if e.status_code() == StatusCode::TableNotFound => { + // Fall back to scanning the logical table. It still works, but without + // `__tsid` optimization. + scan_provider.clone() + } + Err(e) => return Err(e).context(CatalogSnafu), + }; + + if !Arc::ptr_eq(&physical_provider, &scan_provider) { + // Only rewrite when internal columns exist in physical schema. + let physical_table = self.table_from_source(&physical_provider)?; + + let has_table_id = physical_table + .schema() + .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME) + .is_some(); + let has_tsid = physical_table + .schema() + .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME) + .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_))); + + if has_table_id && has_tsid { + scan_provider = physical_provider; + table_id_filter = Some(logical_table.table_info().ident.table_id); + } + } + } + + let scan_table = self.table_from_source(&scan_provider)?; + + let use_tsid = table_id_filter.is_some() + && scan_table + .schema() + .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME) + .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_))); + self.ctx.use_tsid = use_tsid; + + let is_time_index_ms = scan_table .schema() .timestamp_column() .with_context(|| TimeIndexNotFoundSnafu { - table: table_ref.to_quoted_string(), + table: scan_table_ref.to_quoted_string(), })? .data_type == ConcreteDataType::timestamp_millisecond_datatype(); - let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)?; + let scan_projection = if table_id_filter.is_some() { + let mut required_columns = HashSet::new(); + required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string()); + required_columns.insert(self.ctx.time_index_column.clone().with_context(|| { + TimeIndexNotFoundSnafu { + table: scan_table_ref.to_quoted_string(), + } + })?); + for col in &self.ctx.tag_columns { + required_columns.insert(col.clone()); + } + for col in &self.ctx.field_columns { + required_columns.insert(col.clone()); + } + if use_tsid { + required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); + } + + let arrow_schema = scan_table.schema().arrow_schema().clone(); + Some( + arrow_schema + .fields() + .iter() + .enumerate() + .filter(|(_, field)| required_columns.contains(field.name().as_str())) + .map(|(idx, _)| idx) + .collect::>(), + ) + } else { + None + }; + + let mut scan_plan = + LogicalPlanBuilder::scan(scan_table_ref.clone(), scan_provider, scan_projection) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + if let Some(table_id) = table_id_filter { + scan_plan = LogicalPlanBuilder::from(scan_plan) + .filter( + DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)) + .eq(lit(table_id)), + ) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + } if !is_time_index_ms { // cast to ms if time_index not in Millisecond precision let expr: Vec<_> = self - .ctx - .field_columns - .iter() - .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone()))) + .create_field_column_exprs()? + .into_iter() .chain(self.create_tag_column_exprs()?) + .chain( + self.ctx + .use_tsid + .then_some(DfExpr::Column(Column::new( + Some(scan_table_ref.clone()), + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + ))) + .into_iter(), + ) .chain(Some(DfExpr::Alias(Alias { expr: Box::new(DfExpr::Cast(Cast { expr: Box::new(self.create_time_index_column_expr()?), data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None), })), - relation: Some(table_ref.clone()), + relation: Some(scan_table_ref.clone()), name: self .ctx .time_index_column .as_ref() .with_context(|| TimeIndexNotFoundSnafu { - table: table_ref.to_quoted_string(), + table: scan_table_ref.to_quoted_string(), })? .clone(), metadata: None, @@ -1508,6 +1708,28 @@ impl PromPlanner { .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)?; + } else if table_id_filter.is_some() { + // Drop the internal `__table_id` column after filtering. + let project_exprs = self + .create_field_column_exprs()? + .into_iter() + .chain(self.create_tag_column_exprs()?) + .chain( + self.ctx + .use_tsid + .then_some(DfExpr::Column(Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + ))) + .into_iter(), + ) + .chain(Some(self.create_time_index_column_expr()?)) + .collect::>(); + + scan_plan = LogicalPlanBuilder::from(scan_plan) + .project(project_exprs) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; } let result = LogicalPlanBuilder::from(scan_plan) @@ -1521,22 +1743,14 @@ impl PromPlanner { /// Returns a logical plan for an empty metric. async fn setup_context(&mut self) -> Result> { let table_ref = self.table_ref()?; - let table = match self.table_provider.resolve_table(table_ref.clone()).await { + let source = match self.table_provider.resolve_table(table_ref.clone()).await { Err(e) if e.status_code() == StatusCode::TableNotFound => { let plan = self.setup_context_for_empty_metric()?; return Ok(Some(plan)); } res => res.context(CatalogSnafu)?, }; - let table = table - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table(); + let table = self.table_from_source(&source)?; // set time index column name let time_index = table @@ -1571,6 +1785,8 @@ impl PromPlanner { .collect(); self.ctx.tag_columns = tags; + self.ctx.use_tsid = false; + Ok(None) } @@ -1581,6 +1797,7 @@ impl PromPlanner { self.ctx.reset_table_name_and_schema(); self.ctx.tag_columns = vec![]; self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; + self.ctx.use_tsid = false; // The table doesn't have any data, so we set start to 0 and end to -1. let plan = LogicalPlan::Extension(Extension { @@ -2988,6 +3205,7 @@ impl PromPlanner { .chain([left_time_index.clone()]) .collect::>(); self.ctx.time_index_column = Some(left_time_index.clone()); + self.ctx.use_tsid = left_context.use_tsid; // alias right time index column if necessary if left_context.time_index_column != right_context.time_index_column { @@ -3127,6 +3345,16 @@ impl PromPlanner { // Take the name of first field column. The length is checked above. let left_field_col = left_context.field_columns.first().unwrap(); let right_field_col = right_context.field_columns.first().unwrap(); + let left_has_tsid = left + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME); + let right_has_tsid = right + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME); // step 0: fill all columns in output schema let mut all_columns_set = left @@ -3136,6 +3364,11 @@ impl PromPlanner { .chain(right.schema().fields().iter()) .map(|field| field.name().clone()) .collect::>(); + // Keep `__tsid` only when both sides contain it, otherwise it may break schema alignment + // (e.g. `unknown_metric or some_metric`). + if !(left_has_tsid && right_has_tsid) { + all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME); + } // remove time index column all_columns_set.remove(&left_time_index_column); all_columns_set.remove(&right_time_index_column); @@ -3243,6 +3476,7 @@ impl PromPlanner { self.ctx.time_index_column = Some(left_time_index_column); self.ctx.tag_columns = all_tags.into_iter().collect(); self.ctx.field_columns = vec![left_field_col.clone()]; + self.ctx.use_tsid = left_has_tsid && right_has_tsid; Ok(result) } @@ -3349,6 +3583,30 @@ impl PromPlanner { Ok(fn_expr) } + fn strip_tsid_column(&self, plan: LogicalPlan) -> Result { + let schema = plan.schema(); + if !schema + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + { + return Ok(plan); + } + + let project_exprs = schema + .fields() + .iter() + .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME) + .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone())))) + .collect::>>()?; + + LogicalPlanBuilder::from(plan) + .project(project_exprs) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + /// Apply an alias to the query result by adding a projection with the alias name fn apply_alias_projection( &mut self, @@ -3517,6 +3775,156 @@ mod test { ) } + async fn build_test_table_provider_with_tsid( + table_name_tuples: &[(String, String)], + num_tag: usize, + num_field: usize, + ) -> DfTableSourceProvider { + let catalog_list = MemoryCatalogManager::with_default_setup(); + + let physical_table_name = "phy"; + let physical_table_id = 999u32; + + // Register a metric engine physical table with internal columns. + { + let mut columns = vec![ + ColumnSchema::new( + DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), + ConcreteDataType::uint32_datatype(), + false, + ), + ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + ConcreteDataType::uint64_datatype(), + false, + ), + ]; + for i in 0..num_tag { + columns.push(ColumnSchema::new( + format!("tag_{i}"), + ConcreteDataType::string_datatype(), + false, + )); + } + columns.push( + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ); + for i in 0..num_field { + columns.push(ColumnSchema::new( + format!("field_{i}"), + ConcreteDataType::float64_datatype(), + true, + )); + } + + let schema = Arc::new(Schema::new(columns)); + let primary_key_indices = (0..(2 + num_tag)).collect::>(); + let table_meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices(primary_key_indices) + .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect()) + .engine(METRIC_ENGINE_NAME.to_string()) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .table_id(physical_table_id) + .name(physical_table_name) + .meta(table_meta) + .build() + .unwrap(); + let table = EmptyTable::from_table_info(&table_info); + + assert!( + catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: physical_table_name.to_string(), + table_id: physical_table_id, + table, + }) + .is_ok() + ); + } + + // Register metric engine logical tables without `__tsid`, referencing the physical table. + for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() { + let mut columns = vec![]; + for i in 0..num_tag { + columns.push(ColumnSchema::new( + format!("tag_{i}"), + ConcreteDataType::string_datatype(), + false, + )); + } + columns.push( + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ); + for i in 0..num_field { + columns.push(ColumnSchema::new( + format!("field_{i}"), + ConcreteDataType::float64_datatype(), + true, + )); + } + + let schema = Arc::new(Schema::new(columns)); + let mut options = table::requests::TableOptions::default(); + options.extra_options.insert( + LOGICAL_TABLE_METADATA_KEY.to_string(), + physical_table_name.to_string(), + ); + let table_id = 1024u32 + idx as u32; + let table_meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices((0..num_tag).collect()) + .value_indices((num_tag + 1..num_tag + 1 + num_field).collect()) + .engine(METRIC_ENGINE_NAME.to_string()) + .options(options) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .table_id(table_id) + .name(table_name.clone()) + .meta(table_meta) + .build() + .unwrap(); + let table = EmptyTable::from_table_info(&table_info); + + assert!( + catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: schema_name.clone(), + table_name: table_name.clone(), + table_id, + table, + }) + .is_ok() + ); + } + + DfTableSourceProvider::new( + catalog_list, + false, + QueryContext::arc(), + DummyDecoder::arc(), + false, + ) + } + async fn build_test_table_provider_with_fields( table_name_tuples: &[(String, String)], tags: &[&str], @@ -3876,6 +4284,435 @@ mod test { do_aggregate_expr_plan("sum", "sum").await; } + #[tokio::test] + async fn tsid_is_used_for_series_divide_when_available() { + let prom_expr = parser::parse("some_metric").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]")); + assert!(plan_str.contains("__tsid ASC NULLS FIRST")); + assert!( + !plan + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + ); + } + + #[tokio::test] + async fn physical_table_name_is_not_leaked_in_plan() { + let prom_expr = parser::parse("some_metric").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("TableScan: some_metric")); + assert!(!plan_str.contains("TableScan: phy")); + } + + #[tokio::test] + async fn sum_without_does_not_group_by_tsid() { + let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]")); + + let aggr_line = plan_str + .lines() + .find(|line| line.contains("Aggregate: groupBy=")) + .unwrap(); + assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME)); + } + + #[tokio::test] + async fn topk_without_does_not_partition_by_tsid() { + let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]")); + + let window_line = plan_str + .lines() + .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()")) + .unwrap(); + let partition_by = window_line + .split("PARTITION BY [") + .nth(1) + .and_then(|s| s.split("] ORDER BY").next()) + .unwrap(); + assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME)); + } + + #[tokio::test] + async fn sum_by_does_not_group_by_tsid() { + let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]")); + + let aggr_line = plan_str + .lines() + .find(|line| line.contains("Aggregate: groupBy=")) + .unwrap(); + assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME)); + } + + #[tokio::test] + async fn topk_by_does_not_partition_by_tsid() { + let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]")); + + let window_line = plan_str + .lines() + .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()")) + .unwrap(); + let partition_by = window_line + .split("PARTITION BY [") + .nth(1) + .and_then(|s| s.split("] ORDER BY").next()) + .unwrap(); + assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME)); + } + + #[tokio::test] + async fn selector_matcher_on_tsid_does_not_use_internal_column() { + let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet) { + if let LogicalPlan::Filter(filter) = plan { + datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap(); + } + for input in plan.inputs() { + collect_filter_cols(input, out); + } + } + + let mut filter_cols = HashSet::new(); + collect_filter_cols(&plan, &mut filter_cols); + assert!( + !filter_cols + .iter() + .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME) + ); + } + + #[tokio::test] + async fn tsid_is_not_used_when_physical_table_is_missing() { + let prom_expr = parser::parse("some_metric").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let catalog_list = MemoryCatalogManager::with_default_setup(); + + // Register a metric engine logical table referencing a missing physical table. + let mut columns = vec![ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + false, + )]; + columns.push( + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ); + columns.push(ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::float64_datatype(), + true, + )); + let schema = Arc::new(Schema::new(columns)); + let mut options = table::requests::TableOptions::default(); + options + .extra_options + .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string()); + let table_meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices(vec![0]) + .value_indices(vec![2]) + .engine(METRIC_ENGINE_NAME.to_string()) + .options(options) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .table_id(1024) + .name("some_metric") + .meta(table_meta) + .build() + .unwrap(); + let table = EmptyTable::from_table_info(&table_info); + catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "some_metric".to_string(), + table_id: 1024, + table, + }) + .unwrap(); + + let table_provider = DfTableSourceProvider::new( + catalog_list, + false, + QueryContext::arc(), + DummyDecoder::arc(), + false, + ); + + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]")); + assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]")); + } + + #[tokio::test] + async fn tsid_is_carried_only_when_aggregate_preserves_label_set() { + let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("first_value") && plan_str.contains("__tsid")); + assert!( + !plan + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + ); + + // Merging aggregate: label set is reduced, tsid should not be carried. + let prom_expr = parser::parse("sum(some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + let plan_str = plan.display_indent_schema().to_string(); + assert!(!plan_str.contains("first_value")); + } + + #[tokio::test] + async fn or_operator_with_unknown_metric_does_not_require_tsid() { + let prom_expr = parser::parse("unknown_metric or some_metric").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + assert!( + !plan + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + ); + } + #[tokio::test] async fn aggregate_avg() { do_aggregate_expr_plan("avg", "avg").await; diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result new file mode 100644 index 0000000000..d816c8eb16 --- /dev/null +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -0,0 +1,107 @@ +CREATE TABLE tsid_physical ( + ts TIMESTAMP(3) TIME INDEX, + val DOUBLE, +) ENGINE = metric WITH ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE tsid_metric ( + job STRING NULL, + instance STRING NULL, + ts TIMESTAMP(3) NOT NULL, + val DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(job, instance), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_physical' +); + +Affected Rows: 0 + +INSERT INTO tsid_metric VALUES + ('job1', 'instance1', 0, 1), + ('job1', 'instance2', 0, 2), + ('job1', 'instance1', 5000, 3), + ('job1', 'instance2', 5000, 4), + ('job1', 'instance1', 10000, 5), + ('job1', 'instance2', 10000, 6); + +Affected Rows: 6 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(tsid_metric); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED +|_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED +|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_SortExec: expr=[__tsid@1 ASC, ts@2 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[val@1 as val, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 3_| ++-+-+-+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED +|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_SortExec: expr=[__tsid@3 ASC, ts@4 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[val@1 as val, instance@3 as instance, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 6_| ++-+-+-+ + +DROP TABLE tsid_metric; + +Affected Rows: 0 + +DROP TABLE tsid_physical; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.sql b/tests/cases/standalone/tql-explain-analyze/tsid_column.sql new file mode 100644 index 0000000000..5fd35505d5 --- /dev/null +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.sql @@ -0,0 +1,47 @@ +CREATE TABLE tsid_physical ( + ts TIMESTAMP(3) TIME INDEX, + val DOUBLE, +) ENGINE = metric WITH ("physical_metric_table" = ""); + +CREATE TABLE tsid_metric ( + job STRING NULL, + instance STRING NULL, + ts TIMESTAMP(3) NOT NULL, + val DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY(job, instance), +) +ENGINE = metric +WITH( + on_physical_table = 'tsid_physical' +); + +INSERT INTO tsid_metric VALUES + ('job1', 'instance1', 0, 1), + ('job1', 'instance2', 0, 2), + ('job1', 'instance1', 5000, 3), + ('job1', 'instance2', 5000, 4), + ('job1', 'instance1', 10000, 5), + ('job1', 'instance2', 10000, 6); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(tsid_metric); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); + +DROP TABLE tsid_metric; +DROP TABLE tsid_physical; +