From 50318d596d8bf48ef92aa707cd33d7f262ab5807 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 17 Jan 2026 23:55:49 +0800 Subject: [PATCH] expose tsid on logical table's schema and use it on planner Signed-off-by: Ruihang Xia --- .../meta/src/ddl/create_logical_tables.rs | 1 + .../create_logical_tables/region_request.rs | 2 +- .../create_logical_tables/update_metadata.rs | 73 +++- .../src/ddl/tests/alter_logical_tables.rs | 2 + .../reconcile_regions.rs | 2 +- src/metric-engine/src/engine/read.rs | 11 +- .../src/engine/region_metadata.rs | 18 + src/metric-engine/src/row_modifier.rs | 158 ++++++-- src/operator/src/delete.rs | 7 +- .../src/req_convert/delete/table_to_region.rs | 11 +- .../src/req_convert/insert/stmt_to_region.rs | 2 + src/query/src/datafusion.rs | 2 + src/query/src/planner.rs | 31 +- src/query/src/promql/planner.rs | 358 +++++++++++++++++- src/query/src/sql/show_create_table.rs | 53 ++- src/servers/src/prom_store.rs | 5 +- .../common/alter/alter_metric_table.result | 4 + tests/cases/standalone/common/basic.result | 4 + tests/cases/standalone/common/basic.sql | 2 + .../common/create/create_metric_table.result | 2 + .../create/metric_engine_partition.result | 3 +- .../common/insert/logical_metric_table.result | 16 +- .../tql-explain-analyze/tsid_column.result | 105 +++++ .../tql-explain-analyze/tsid_column.sql | 47 +++ 24 files changed, 868 insertions(+), 51 deletions(-) create mode 100644 tests/cases/standalone/tql-explain-analyze/tsid_column.result create mode 100644 tests/cases/standalone/tql-explain-analyze/tsid_column.sql diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 0fcb5c8d62..a345ca8b57 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -138,6 +138,7 @@ impl CreateLogicalTablesProcedure { /// Abort(not-retry): /// - Failed to create table metadata. pub async fn on_create_metadata(&mut self) -> Result { + self.add_tsid_column_to_logical_tables(); self.update_physical_table_metadata().await?; let table_ids = self.create_logical_tables_metadata().await?; diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index ea204078d3..1780ef739b 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info, false)?; + let template = build_template_from_raw_table_info(raw_table_info, true)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs index cd24d07a78..fdd5d6eff8 100644 --- a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs @@ -17,7 +17,9 @@ use std::ops::Deref; use common_telemetry::{info, warn}; use itertools::Itertools; use snafu::OptionExt; -use table::metadata::TableId; +use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; +use store_api::storage::consts::ReservedColumnId; +use table::metadata::{RawTableInfo, TableId}; use table::table_name::TableName; use crate::cache_invalidator::Context; @@ -27,6 +29,20 @@ use crate::error::{Result, TableInfoNotFoundSnafu}; use crate::instruction::CacheIdent; impl CreateLogicalTablesProcedure { + pub(crate) fn add_tsid_column_to_logical_tables(&mut self) { + for (task, table_id_already_exists) in self + .data + .tasks + .iter_mut() + .zip(self.data.table_ids_already_exists.iter()) + { + if table_id_already_exists.is_some() { + continue; + } + add_tsid_column_to_raw_table_info(&mut task.table_info); + } + } + pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> { if self.data.physical_columns.is_empty() { warn!( @@ -128,3 +144,58 @@ impl CreateLogicalTablesProcedure { Ok(table_ids) } } + +fn add_tsid_column_to_raw_table_info(table_info: &mut RawTableInfo) { + if table_info + .meta + .schema + .column_schemas + .iter() + .any(|col| col.name == DATA_SCHEMA_TSID_COLUMN_NAME) + { + return; + } + + let should_update_column_ids = + table_info.meta.column_ids.len() == table_info.meta.schema.column_schemas.len(); + let column_index = table_info.meta.schema.column_schemas.len(); + table_info + .meta + .schema + .column_schemas + .push(datatypes::schema::ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME, + datatypes::prelude::ConcreteDataType::uint64_datatype(), + false, + )); + table_info.meta.primary_key_indices.push(column_index); + if should_update_column_ids { + table_info.meta.column_ids.push(ReservedColumnId::tsid()); + } + table_info.sort_columns(); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ddl::test_util::test_create_logical_table_task; + + #[test] + fn add_tsid_preserves_column_ids_when_present() { + let mut task = test_create_logical_table_task("foo"); + let schema_len = task.table_info.meta.schema.column_schemas.len(); + task.table_info.meta.column_ids = (0..schema_len as u32).collect(); + + add_tsid_column_to_raw_table_info(&mut task.table_info); + + assert_eq!( + task.table_info.meta.column_ids.len(), + task.table_info.meta.schema.column_schemas.len() + ); + let name_to_ids = task.table_info.name_to_ids().unwrap(); + assert_eq!( + name_to_ids.get(DATA_SCHEMA_TSID_COLUMN_NAME), + Some(&ReservedColumnId::tsid()) + ); + } +} diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 139f90eed2..ce60dc021a 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -553,6 +553,7 @@ async fn test_on_part_duplicate_alter_request() { assert_eq!( table1_cols, vec![ + "__tsid".to_string(), "col_0".to_string(), "cpu".to_string(), "host".to_string(), @@ -572,6 +573,7 @@ async fn test_on_part_duplicate_alter_request() { assert_eq!( table2_cols, vec![ + "__tsid".to_string(), "col_0".to_string(), "cpu".to_string(), "host".to_string(), diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs index 598fae4781..b63ef2e15b 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs @@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info, false)?; + let template = build_template_from_raw_table_info(raw_table_info, true)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 13ae461db1..32bbaa6d4f 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -19,7 +19,9 @@ use common_telemetry::{debug, error, tracing}; use datafusion::logical_expr::{self, Expr}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef}; -use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, is_metric_engine_internal_column, +}; use store_api::region_engine::{RegionEngine, RegionScannerRef}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -218,7 +220,10 @@ impl MetricEngineInner { .get_metadata(data_region_id) .await .context(MitoReadOperationSnafu)?; - for name in logical_columns { + for name in logical_columns + .into_iter() + .filter(|name| !is_metric_engine_internal_column(name)) + { // Safety: logical columns is a strict subset of physical columns projection.push(physical_metadata.column_index_by_name(&name).unwrap()); } @@ -338,7 +343,7 @@ mod test { .await .unwrap(); - assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]); + assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 3, 0, 1]); assert_eq!(scan_req.filters.len(), 1); assert_eq!( scan_req.filters[0], diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index f8e0dd8dc3..53910cfd2f 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -16,13 +16,30 @@ use std::collections::HashMap; +use api::v1::SemanticType; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::storage::RegionId; +use store_api::storage::consts::ReservedColumnId; use crate::engine::MetricEngineInner; use crate::error::Result; impl MetricEngineInner { + fn tsid_column_metadata() -> ColumnMetadata { + ColumnMetadata { + column_schema: ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME, + ConcreteDataType::uint64_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: ReservedColumnId::tsid(), + } + } + /// Load column metadata of a logical region. /// /// The return value is ordered on column name. @@ -54,6 +71,7 @@ impl MetricEngineInner { .await? .into_iter() .map(|(_, column_metadata)| column_metadata) + .chain(std::iter::once(Self::tsid_column_metadata())) .collect::>(); // Update cache diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs index d5ed1cc9b0..df840afb70 100644 --- a/src/metric-engine/src/row_modifier.rs +++ b/src/metric-engine/src/row_modifier.rs @@ -111,7 +111,7 @@ impl RowModifier { .encode_to_vec(internal_columns.into_iter(), &mut buffer) .context(EncodePrimaryKeySnafu)?; self.codec - .encode_to_vec(row_iter.primary_keys(), &mut buffer) + .encode_to_vec(row_iter.user_primary_keys(), &mut buffer) .context(EncodePrimaryKeySnafu)?; values.push(ValueData::BinaryValue(buffer.clone()).into()); @@ -138,27 +138,50 @@ impl RowModifier { /// Modifies rows with dense primary key encoding. /// It adds two columns(`__table_id`, `__tsid`) to the row. fn modify_rows_dense(&self, mut iter: RowsIter, table_ids: TableIdInput<'_>) -> Result { - // add table_name column - iter.rows.schema.push(ColumnSchema { - column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint32 as i32, - semantic_type: SemanticType::Tag as _, - datatype_extension: None, - options: None, - }); - // add tsid column - iter.rows.schema.push(ColumnSchema { - column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64 as i32, - semantic_type: SemanticType::Tag as _, - datatype_extension: None, - options: None, - }); + let table_id_index = iter + .rows + .schema + .iter() + .position(|col| col.column_name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME); + let tsid_index = iter + .rows + .schema + .iter() + .position(|col| col.column_name == DATA_SCHEMA_TSID_COLUMN_NAME); + + if table_id_index.is_none() { + iter.rows.schema.push(ColumnSchema { + column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint32 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); + } + if tsid_index.is_none() { + iter.rows.schema.push(ColumnSchema { + column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); + } + for (row_index, row_iter) in iter.iter_mut().enumerate() { let table_id = table_ids.table_id_for_row(row_index); let (table_id_value, tsid) = Self::fill_internal_columns(table_id, &row_iter); - row_iter.row.values.push(table_id_value); - row_iter.row.values.push(tsid); + if let Some(table_id_index) = table_id_index { + row_iter.row.values[table_id_index] = table_id_value; + } else { + row_iter.row.values.push(table_id_value); + } + + if let Some(tsid_index) = tsid_index { + row_iter.row.values[tsid_index] = tsid; + } else { + row_iter.row.values.push(tsid); + } } Ok(iter.rows) @@ -182,7 +205,15 @@ impl RowModifier { let ts_id = if !iter.has_null_labels() { // No null labels in row, we can safely reuse the precomputed label name hash. let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash); - for (_, value) in iter.primary_keys_with_name() { + for (name, value) in iter.primary_keys_with_name() { + // Internal columns are not part of TSID generation. + // They may appear when request rows are derived from scans that include them. + if matches!( + name.as_str(), + DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME + ) { + continue; + } // The type is checked before. So only null is ignored. if let Some(ValueData::StringValue(string)) = &value.value_data { ts_id_gen.write_str(string); @@ -199,6 +230,13 @@ impl RowModifier { let mut hasher = TsidGenerator::default(); // 1. Find out label names with non-null values and get the hash. for (name, value) in iter.primary_keys_with_name() { + // Internal columns are not part of TSID generation. + if matches!( + name.as_str(), + DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME + ) { + continue; + } // The type is checked before. So only null is ignored. if let Some(ValueData::StringValue(_)) = &value.value_data { hasher.write_str(name); @@ -208,7 +246,14 @@ impl RowModifier { // 2. Use label name hash as seed and continue with label values. let mut final_hasher = TsidGenerator::new(label_name_hash); - for (_, value) in iter.primary_keys_with_name() { + for (name, value) in iter.primary_keys_with_name() { + // Internal columns are not part of TSID generation. + if matches!( + name.as_str(), + DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME + ) { + continue; + } if let Some(ValueData::StringValue(value)) = &value.value_data { final_hasher.write_str(value); } @@ -368,6 +413,13 @@ pub struct RowIter<'a> { } impl RowIter<'_> { + fn is_internal_column(&self, idx: &ValueIndex) -> bool { + matches!( + self.schema[idx.index].column_name.as_str(), + DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME + ) + } + /// Returns the primary keys with their names. fn primary_keys_with_name(&self) -> impl Iterator { self.index.indices[..self.index.num_primary_key_column] @@ -384,7 +436,9 @@ impl RowIter<'_> { fn has_null_labels(&self) -> bool { self.index.indices[..self.index.num_primary_key_column] .iter() - .any(|idx| self.row.values[idx.index].value_data.is_none()) + .any(|idx| { + !self.is_internal_column(idx) && self.row.values[idx.index].value_data.is_none() + }) } /// Returns the primary keys. @@ -402,6 +456,13 @@ impl RowIter<'_> { }) } + /// Returns the primary keys excluding reserved internal columns. + pub fn user_primary_keys(&self) -> impl Iterator)> { + self.primary_keys().filter(|(column_id, _)| { + *column_id != ReservedColumnId::table_id() && *column_id != ReservedColumnId::tsid() + }) + } + /// Returns the remaining columns. fn remaining(&mut self) -> impl Iterator + '_ { self.index.indices[self.index.num_primary_key_column..] @@ -491,6 +552,59 @@ mod tests { assert_eq!(result.schema, expected_sparse_schema()); } + #[test] + fn test_encode_sparse_ignores_input_tsid_column() { + let name_to_column_id = test_name_to_column_id(); + let encoder = RowModifier::default(); + let table_id = 1025; + + let rows_without_tsid = Rows { + schema: test_schema(), + rows: vec![test_row("greptimedb", "127.0.0.1")], + }; + + let mut schema_with_tsid = test_schema(); + schema_with_tsid.push(ColumnSchema { + column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); + let rows_with_tsid = Rows { + schema: schema_with_tsid, + rows: vec![Row { + values: vec![ + ValueData::StringValue("greptimedb".to_string()).into(), + ValueData::StringValue("127.0.0.1".to_string()).into(), + ValueData::U64Value(123).into(), + ], + }], + }; + + let result_without_tsid = encoder + .modify_rows( + RowsIter::new(rows_without_tsid, &name_to_column_id), + TableIdInput::Single(table_id), + PrimaryKeyEncoding::Sparse, + ) + .unwrap(); + let result_with_tsid = encoder + .modify_rows( + RowsIter::new(rows_with_tsid, &name_to_column_id), + TableIdInput::Single(table_id), + PrimaryKeyEncoding::Sparse, + ) + .unwrap(); + + assert_eq!(result_without_tsid.schema, expected_sparse_schema()); + assert_eq!(result_with_tsid.schema, expected_sparse_schema()); + assert_eq!( + result_without_tsid.rows[0].values, + result_with_tsid.rows[0].values + ); + } + fn expected_sparse_schema() -> Vec { vec![ColumnSchema { column_name: PRIMARY_KEY_COLUMN_NAME.to_string(), diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 1e9fef919a..a85e370abd 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -27,6 +27,7 @@ use futures_util::future; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; +use store_api::metric_engine_consts::is_metric_engine_internal_column; use table::TableRef; use table::requests::DeleteRequest as TableDeleteRequest; @@ -99,9 +100,12 @@ impl Deleter { pub async fn handle_table_delete( &self, - request: TableDeleteRequest, + mut request: TableDeleteRequest, ctx: QueryContextRef, ) -> Result { + request + .key_column_values + .retain(|col, _| !is_metric_engine_internal_column(col)); let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table = request.table_name.as_str(); @@ -227,6 +231,7 @@ impl Deleter { .table_info() .meta .row_key_column_names() + .filter(|name| !is_metric_engine_internal_column(name)) .cloned() .chain(iter::once(time_index)) .collect(); diff --git a/src/operator/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs index d68a8987cb..1ebfe896af 100644 --- a/src/operator/src/req_convert/delete/table_to_region.rs +++ b/src/operator/src/req_convert/delete/table_to_region.rs @@ -38,7 +38,16 @@ impl<'a> TableToRegion<'a> { pub async fn convert(&self, request: TableDeleteRequest) -> Result { let row_count = row_count(&request.key_column_values)?; let schema = column_schema(self.table_info, &request.key_column_values)?; - let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count); + let vectors = schema + .iter() + .map(|col| { + request + .key_column_values + .get(&col.column_name) + .expect("schema column must exist in delete request") + }) + .collect::>(); + let rows = api::helper::vectors_to_rows(vectors.into_iter(), row_count); let rows = Rows { schema, rows }; let requests = Partitioner::new(self.partition_manager) diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index e2e0969035..df11dc3b3c 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -36,6 +36,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use sql::ast::ObjectNamePartExt; use sql::statements::insert::Insert; use sqlparser::ast::{ObjectName, Value as SqlValue}; +use store_api::metric_engine_consts::is_metric_engine_internal_column; use table::TableRef; use table::metadata::TableInfoRef; @@ -383,6 +384,7 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St table_schema .column_schemas() .iter() + .filter(|column| !is_metric_engine_internal_column(&column.name)) .map(|column| &column.name) .collect() } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index ef597ecc38..067298ee18 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -46,6 +46,7 @@ use futures_util::StreamExt; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; use sqlparser::ast::AnalyzeFormat; +use store_api::metric_engine_consts::is_metric_engine_internal_column; use table::TableRef; use table::requests::{DeleteRequest, InsertRequest}; use tracing::Span; @@ -200,6 +201,7 @@ impl DatafusionQueryEngine { let rowkey_columns = table_info .meta .row_key_column_names() + .filter(|name| !is_metric_engine_internal_column(name)) .collect::>(); let column_vectors = column_vectors .into_iter() diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index faba24a742..331f8805ce 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -40,6 +40,7 @@ use sql::statements::explain::ExplainStatement; use sql::statements::query::Query; use sql::statements::statement::Statement; use sql::statements::tql::Tql; +use store_api::metric_engine_consts::is_metric_engine_internal_column; use crate::error::{ CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu, @@ -232,7 +233,35 @@ impl DfLogicalPlanner { .optimize_by_extension_rules(plan, &context)?; common_telemetry::debug!("Logical planner, optimize result: {plan}"); - Ok(plan) + Self::strip_metric_engine_internal_columns(plan) + } + + fn strip_metric_engine_internal_columns(plan: LogicalPlan) -> Result { + let schema = plan.schema(); + if !schema + .fields() + .iter() + .any(|field| is_metric_engine_internal_column(field.name())) + { + return Ok(plan); + } + + let project_exprs = schema + .fields() + .iter() + .filter(|field| !is_metric_engine_internal_column(field.name())) + .map(|field| col(field.name())) + .collect::>(); + + if project_exprs.is_empty() { + return Ok(plan); + } + + LogicalPlanBuilder::from(plan) + .project(project_exprs) + .context(PlanSqlSnafu)? + .build() + .context(PlanSqlSnafu) } /// Generate a relational expression from a SQL expression diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 61f5aeff43..75a94ac166 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -138,6 +138,13 @@ struct PromPlannerContext { time_index_column: Option, field_columns: Vec, tag_columns: Vec, + /// Metric engine internal series identifier column (`__tsid`). + /// + /// This column is optional: it is present only when the underlying table schema contains + /// [`DATA_SCHEMA_TSID_COLUMN_NAME`] with `UInt64` type. The planner uses it internally as the + /// series key for plans like [`SeriesDivide`] when available, and strips it from the final + /// output. + tsid_column: Option, /// The matcher for field columns `__field__`. field_column_matcher: Option>, /// The matcher for selectors (normal matchers). @@ -164,6 +171,7 @@ impl PromPlannerContext { self.time_index_column = None; self.field_columns = vec![]; self.tag_columns = vec![]; + self.tsid_column = None; self.field_column_matcher = None; self.selector_matcher.clear(); self.schema_name = None; @@ -204,11 +212,14 @@ impl PromPlanner { .await?; // Apply alias if provided - if let Some(alias_name) = alias { - planner.apply_alias_projection(plan, alias_name) + let plan = if let Some(alias_name) = alias { + planner.apply_alias_projection(plan, alias_name)? } else { - Ok(plan) - } + plan + }; + + // Never leak internal series identifier to output. + planner.strip_tsid_column(plan) } #[cfg(test)] @@ -342,19 +353,42 @@ impl PromPlanner { } = aggr_expr; let input = self.prom_expr_to_plan(expr, query_engine_state).await?; + let input_has_tsid = input.schema().fields().iter().any(|field| { + field.name() == DATA_SCHEMA_TSID_COLUMN_NAME + && field.data_type() == &ArrowDataType::UInt64 + }); match (*op).id() { token::T_TOPK | token::T_BOTTOMK => { self.prom_topk_bottomk_to_plan(aggr_expr, input).await } _ => { + let input_tag_columns = self.ctx.tag_columns.clone(); // calculate columns to group by // Need to append time index column into group by columns let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?; // convert op and value columns to aggregate exprs - let (aggr_exprs, prev_field_exprs) = + let (mut aggr_exprs, prev_field_exprs) = self.create_aggregate_exprs(*op, param, &input)?; + let keep_tsid = op.id() != token::T_COUNT_VALUES + && input_has_tsid + && input_tag_columns.iter().collect::>() + == self.ctx.tag_columns.iter().collect::>(); + + if keep_tsid { + aggr_exprs.push( + first_value( + DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)), + vec![], + ) + .alias(DATA_SCHEMA_TSID_COLUMN_NAME), + ); + self.ctx.tsid_column = Some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); + } else { + self.ctx.tsid_column = None; + } + // create plan let builder = LogicalPlanBuilder::from(input); let builder = if op.id() == token::T_COUNT_VALUES { @@ -404,6 +438,12 @@ impl PromPlanner { .. } = aggr_expr; + let input_has_tsid = input.schema().fields().iter().any(|field| { + field.name() == DATA_SCHEMA_TSID_COLUMN_NAME + && field.data_type() == &ArrowDataType::UInt64 + }); + self.ctx.tsid_column = input_has_tsid.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); + let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?; let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?; @@ -452,6 +492,13 @@ impl PromPlanner { .create_field_column_exprs()? .into_iter() .chain(self.create_tag_column_exprs()?) + .chain( + self.ctx + .tsid_column + .as_ref() + .map(|_| DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME))) + .into_iter(), + ) .chain(Some(self.create_time_index_column_expr()?)); LogicalPlanBuilder::from(input) @@ -1147,6 +1194,15 @@ impl PromPlanner { .into_iter() .map(|col| DfExpr::Column(Column::new_unqualified(col))) .chain(self.create_tag_column_exprs()?) + .chain( + self.ctx + .tsid_column + .as_ref() + .map(|_| { + DfExpr::Column(Column::new_unqualified(DATA_SCHEMA_TSID_COLUMN_NAME)) + }) + .into_iter(), + ) .chain(Some(self.create_time_index_column_expr()?)) .collect::>(); @@ -1159,8 +1215,23 @@ impl PromPlanner { } // make sort plan + let series_key_columns = if self.ctx.tsid_column.is_some() { + vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()] + } else { + self.ctx.tag_columns.clone() + }; + + let sort_exprs = if self.ctx.tsid_column.is_some() { + vec![ + DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true), + self.create_time_index_column_expr()?.sort(true, true), + ] + } else { + self.create_tag_and_time_index_column_sort_exprs()? + }; + let sort_plan = LogicalPlanBuilder::from(table_scan) - .sort(self.create_tag_and_time_index_column_sort_exprs()?) + .sort(sort_exprs) .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)?; @@ -1175,7 +1246,7 @@ impl PromPlanner { })?; let divide_plan = LogicalPlan::Extension(Extension { node: Arc::new(SeriesDivide::new( - self.ctx.tag_columns.clone(), + series_key_columns.clone(), time_index_column, sort_plan, )), @@ -1194,7 +1265,7 @@ impl PromPlanner { table: table_ref.to_quoted_string(), })?, is_range_selector, - self.ctx.tag_columns.clone(), + series_key_columns, divide_plan, ); let logical_plan = LogicalPlan::Extension(Extension { @@ -1486,6 +1557,18 @@ impl PromPlanner { .iter() .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone()))) .chain(self.create_tag_column_exprs()?) + .chain( + self.ctx + .tsid_column + .as_ref() + .map(|_| { + DfExpr::Column(Column::new( + Some(table_ref.clone()), + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + )) + }) + .into_iter(), + ) .chain(Some(DfExpr::Alias(Alias { expr: Box::new(DfExpr::Cast(Cast { expr: Box::new(self.create_time_index_column_expr()?), @@ -1571,6 +1654,15 @@ impl PromPlanner { .collect(); self.ctx.tag_columns = tags; + // Set internal tsid column if available from underlying storage engine. + self.ctx.tsid_column = table + .schema() + .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME) + .and_then(|col| { + matches!(col.data_type, ConcreteDataType::UInt64(_)) + .then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()) + }); + Ok(None) } @@ -1581,6 +1673,7 @@ impl PromPlanner { self.ctx.reset_table_name_and_schema(); self.ctx.tag_columns = vec![]; self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; + self.ctx.tsid_column = None; // The table doesn't have any data, so we set start to 0 and end to -1. let plan = LogicalPlan::Extension(Extension { @@ -2988,6 +3081,7 @@ impl PromPlanner { .chain([left_time_index.clone()]) .collect::>(); self.ctx.time_index_column = Some(left_time_index.clone()); + self.ctx.tsid_column = left_context.tsid_column.clone(); // alias right time index column if necessary if left_context.time_index_column != right_context.time_index_column { @@ -3127,6 +3221,16 @@ impl PromPlanner { // Take the name of first field column. The length is checked above. let left_field_col = left_context.field_columns.first().unwrap(); let right_field_col = right_context.field_columns.first().unwrap(); + let left_has_tsid = left + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME); + let right_has_tsid = right + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME); // step 0: fill all columns in output schema let mut all_columns_set = left @@ -3136,6 +3240,11 @@ impl PromPlanner { .chain(right.schema().fields().iter()) .map(|field| field.name().clone()) .collect::>(); + // Keep `__tsid` only when both sides contain it, otherwise it may break schema alignment + // (e.g. `unknown_metric or some_metric`). + if !(left_has_tsid && right_has_tsid) { + all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME); + } // remove time index column all_columns_set.remove(&left_time_index_column); all_columns_set.remove(&right_time_index_column); @@ -3243,6 +3352,8 @@ impl PromPlanner { self.ctx.time_index_column = Some(left_time_index_column); self.ctx.tag_columns = all_tags.into_iter().collect(); self.ctx.field_columns = vec![left_field_col.clone()]; + self.ctx.tsid_column = + (left_has_tsid && right_has_tsid).then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); Ok(result) } @@ -3349,6 +3460,30 @@ impl PromPlanner { Ok(fn_expr) } + fn strip_tsid_column(&self, plan: LogicalPlan) -> Result { + let schema = plan.schema(); + if !schema + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + { + return Ok(plan); + } + + let project_exprs = schema + .fields() + .iter() + .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME) + .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone())))) + .collect::>>()?; + + LogicalPlanBuilder::from(plan) + .project(project_exprs) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + /// Apply an alias to the query result by adding a projection with the alias name fn apply_alias_projection( &mut self, @@ -3517,6 +3652,84 @@ mod test { ) } + async fn build_test_table_provider_with_tsid( + table_name_tuples: &[(String, String)], + num_tag: usize, + num_field: usize, + ) -> DfTableSourceProvider { + let catalog_list = MemoryCatalogManager::with_default_setup(); + for (schema_name, table_name) in table_name_tuples { + let mut columns = vec![]; + for i in 0..num_tag { + columns.push(ColumnSchema::new( + format!("tag_{i}"), + ConcreteDataType::string_datatype(), + false, + )); + } + columns.push( + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ); + for i in 0..num_field { + columns.push(ColumnSchema::new( + format!("field_{i}"), + ConcreteDataType::float64_datatype(), + true, + )); + } + columns.push(ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + ConcreteDataType::uint64_datatype(), + false, + )); + + let schema = Arc::new(Schema::new(columns)); + + let tsid_idx = num_tag + 1 + num_field; + let mut primary_key_indices = (0..num_tag).collect::>(); + primary_key_indices.push(tsid_idx); + + let table_meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices(primary_key_indices) + .value_indices((num_tag + 1..num_tag + 1 + num_field).collect()) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name(table_name.clone()) + .meta(table_meta) + .build() + .unwrap(); + let table = EmptyTable::from_table_info(&table_info); + + assert!( + catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: schema_name.clone(), + table_name: table_name.clone(), + table_id: 1024, + table, + }) + .is_ok() + ); + } + + DfTableSourceProvider::new( + catalog_list, + false, + QueryContext::arc(), + DummyDecoder::arc(), + false, + ) + } + async fn build_test_table_provider_with_fields( table_name_tuples: &[(String, String)], tags: &[&str], @@ -3876,6 +4089,135 @@ mod test { do_aggregate_expr_plan("sum", "sum").await; } + #[tokio::test] + async fn tsid_is_used_for_series_divide_when_available() { + let prom_expr = parser::parse("some_metric").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]")); + assert!(plan_str.contains("__tsid ASC NULLS FIRST")); + assert!( + !plan + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + ); + } + + #[tokio::test] + async fn tsid_is_carried_only_when_aggregate_preserves_label_set() { + let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("first_value") && plan_str.contains("__tsid")); + assert!( + !plan + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + ); + + // Merging aggregate: label set is reduced, tsid should not be carried. + let prom_expr = parser::parse("sum(some_metric)").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + let plan_str = plan.display_indent_schema().to_string(); + assert!(!plan_str.contains("first_value")); + } + + #[tokio::test] + async fn or_operator_with_unknown_metric_does_not_require_tsid() { + let prom_expr = parser::parse("unknown_metric or some_metric").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + assert!( + !plan + .schema() + .fields() + .iter() + .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) + ); + } + #[tokio::test] async fn aggregate_avg() { do_aggregate_expr_plan("avg", "avg").await; diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index ee3049c9f7..8f538dc982 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -250,8 +250,9 @@ fn create_table_constraints( column: Ident::with_quote(quote_style, column_name), }); } - if !table_meta.primary_key_indices.is_empty() { - let columns = primary_key_columns_for_show_create(table_meta, engine) + let primary_key_columns = primary_key_columns_for_show_create(table_meta, engine); + if !primary_key_columns.is_empty() { + let columns = primary_key_columns .into_iter() .map(|name| Ident::with_quote(quote_style, name)) .collect(); @@ -314,6 +315,7 @@ mod tests { use datatypes::schema::{ FulltextOptions, Schema, SchemaRef, SkippingIndexOptions, VectorIndexOptions, }; + use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use table::metadata::*; use table::requests::{ FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions, @@ -482,4 +484,51 @@ WITH( sql ); } + + #[test] + fn test_show_create_metric_table_empty_primary_key_is_omitted() { + let schema = vec![ + ColumnSchema::new( + "greptime_timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("greptime_value", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME, + ConcreteDataType::uint64_datatype(), + false, + ), + ]; + let table_schema = SchemaRef::new(Schema::new(schema)); + let meta = TableMetaBuilder::empty() + .schema(table_schema) + .primary_key_indices(vec![2]) + .value_indices(vec![0, 1]) + .engine("metric".to_string()) + .next_column_id(0) + .options(Default::default()) + .created_on(Default::default()) + .build() + .unwrap(); + + let info = Arc::new( + TableInfoBuilder::default() + .table_id(1024) + .table_version(0 as TableVersion) + .name("test_metric_table") + .schema_name("public".to_string()) + .catalog_name("greptime".to_string()) + .desc(None) + .table_type(TableType::Base) + .meta(meta) + .build() + .unwrap(), + ); + + let stmt = create_table_stmt(&info, None, '"').unwrap(); + let sql = format!("\n{}", stmt); + assert!(!sql.contains("PRIMARY KEY")); + } } diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 487bd59812..db6f40db1a 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -34,6 +34,7 @@ use datafusion_expr::LogicalPlan; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; +use store_api::metric_engine_consts::is_metric_engine_internal_column; use crate::error::{self, Result}; use crate::row_writer::{self, MultiTableData}; @@ -269,7 +270,9 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec