From d17a1b91f8737b18726acc49dafb044758f48e32 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 18 Jan 2026 09:24:02 +0800 Subject: [PATCH] simplification Signed-off-by: Ruihang Xia --- .../create_logical_tables/region_request.rs | 2 +- .../reconcile_regions.rs | 2 +- src/metric-engine/src/engine/read.rs | 9 +- src/metric-engine/src/row_modifier.rs | 158 +++--------------- src/operator/src/delete.rs | 7 +- .../src/req_convert/delete/table_to_region.rs | 11 +- .../src/req_convert/insert/stmt_to_region.rs | 2 - src/query/src/datafusion.rs | 2 - src/query/src/planner.rs | 31 +--- src/query/src/promql/planner.rs | 158 +++++++++--------- src/query/src/sql/show_create_table.rs | 53 +----- src/servers/src/prom_store.rs | 5 +- tests/cases/standalone/common/basic.result | 4 - tests/cases/standalone/common/basic.sql | 2 - .../common/insert/logical_metric_table.result | 16 +- .../common/promql/set_operation.result | 32 ++-- .../tql-explain-analyze/tsid_column.result | 8 +- 17 files changed, 140 insertions(+), 362 deletions(-) diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index 1780ef739b..ea204078d3 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info, true)?; + let template = build_template_from_raw_table_info(raw_table_info, false)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs index b63ef2e15b..598fae4781 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs @@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info, true)?; + let template = build_template_from_raw_table_info(raw_table_info, false)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 05df27835b..13ae461db1 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -19,9 +19,7 @@ use common_telemetry::{debug, error, tracing}; use datafusion::logical_expr::{self, Expr}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef}; -use store_api::metric_engine_consts::{ - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, is_metric_engine_internal_column, -}; +use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::region_engine::{RegionEngine, RegionScannerRef}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -220,10 +218,7 @@ impl MetricEngineInner { .get_metadata(data_region_id) .await .context(MitoReadOperationSnafu)?; - for name in logical_columns - .into_iter() - .filter(|name| !is_metric_engine_internal_column(name)) - { + for name in logical_columns { // Safety: logical columns is a strict subset of physical columns projection.push(physical_metadata.column_index_by_name(&name).unwrap()); } diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs index df840afb70..d5ed1cc9b0 100644 --- a/src/metric-engine/src/row_modifier.rs +++ b/src/metric-engine/src/row_modifier.rs @@ -111,7 +111,7 @@ impl RowModifier { .encode_to_vec(internal_columns.into_iter(), &mut buffer) .context(EncodePrimaryKeySnafu)?; self.codec - .encode_to_vec(row_iter.user_primary_keys(), &mut buffer) + .encode_to_vec(row_iter.primary_keys(), &mut buffer) .context(EncodePrimaryKeySnafu)?; values.push(ValueData::BinaryValue(buffer.clone()).into()); @@ -138,50 +138,27 @@ impl RowModifier { /// Modifies rows with dense primary key encoding. /// It adds two columns(`__table_id`, `__tsid`) to the row. fn modify_rows_dense(&self, mut iter: RowsIter, table_ids: TableIdInput<'_>) -> Result { - let table_id_index = iter - .rows - .schema - .iter() - .position(|col| col.column_name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME); - let tsid_index = iter - .rows - .schema - .iter() - .position(|col| col.column_name == DATA_SCHEMA_TSID_COLUMN_NAME); - - if table_id_index.is_none() { - iter.rows.schema.push(ColumnSchema { - column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint32 as i32, - semantic_type: SemanticType::Tag as _, - datatype_extension: None, - options: None, - }); - } - if tsid_index.is_none() { - iter.rows.schema.push(ColumnSchema { - column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64 as i32, - semantic_type: SemanticType::Tag as _, - datatype_extension: None, - options: None, - }); - } - + // add table_name column + iter.rows.schema.push(ColumnSchema { + column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint32 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); + // add tsid column + iter.rows.schema.push(ColumnSchema { + column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); for (row_index, row_iter) in iter.iter_mut().enumerate() { let table_id = table_ids.table_id_for_row(row_index); let (table_id_value, tsid) = Self::fill_internal_columns(table_id, &row_iter); - if let Some(table_id_index) = table_id_index { - row_iter.row.values[table_id_index] = table_id_value; - } else { - row_iter.row.values.push(table_id_value); - } - - if let Some(tsid_index) = tsid_index { - row_iter.row.values[tsid_index] = tsid; - } else { - row_iter.row.values.push(tsid); - } + row_iter.row.values.push(table_id_value); + row_iter.row.values.push(tsid); } Ok(iter.rows) @@ -205,15 +182,7 @@ impl RowModifier { let ts_id = if !iter.has_null_labels() { // No null labels in row, we can safely reuse the precomputed label name hash. let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash); - for (name, value) in iter.primary_keys_with_name() { - // Internal columns are not part of TSID generation. - // They may appear when request rows are derived from scans that include them. - if matches!( - name.as_str(), - DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME - ) { - continue; - } + for (_, value) in iter.primary_keys_with_name() { // The type is checked before. So only null is ignored. if let Some(ValueData::StringValue(string)) = &value.value_data { ts_id_gen.write_str(string); @@ -230,13 +199,6 @@ impl RowModifier { let mut hasher = TsidGenerator::default(); // 1. Find out label names with non-null values and get the hash. for (name, value) in iter.primary_keys_with_name() { - // Internal columns are not part of TSID generation. - if matches!( - name.as_str(), - DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME - ) { - continue; - } // The type is checked before. So only null is ignored. if let Some(ValueData::StringValue(_)) = &value.value_data { hasher.write_str(name); @@ -246,14 +208,7 @@ impl RowModifier { // 2. Use label name hash as seed and continue with label values. let mut final_hasher = TsidGenerator::new(label_name_hash); - for (name, value) in iter.primary_keys_with_name() { - // Internal columns are not part of TSID generation. - if matches!( - name.as_str(), - DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME - ) { - continue; - } + for (_, value) in iter.primary_keys_with_name() { if let Some(ValueData::StringValue(value)) = &value.value_data { final_hasher.write_str(value); } @@ -413,13 +368,6 @@ pub struct RowIter<'a> { } impl RowIter<'_> { - fn is_internal_column(&self, idx: &ValueIndex) -> bool { - matches!( - self.schema[idx.index].column_name.as_str(), - DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME - ) - } - /// Returns the primary keys with their names. fn primary_keys_with_name(&self) -> impl Iterator { self.index.indices[..self.index.num_primary_key_column] @@ -436,9 +384,7 @@ impl RowIter<'_> { fn has_null_labels(&self) -> bool { self.index.indices[..self.index.num_primary_key_column] .iter() - .any(|idx| { - !self.is_internal_column(idx) && self.row.values[idx.index].value_data.is_none() - }) + .any(|idx| self.row.values[idx.index].value_data.is_none()) } /// Returns the primary keys. @@ -456,13 +402,6 @@ impl RowIter<'_> { }) } - /// Returns the primary keys excluding reserved internal columns. - pub fn user_primary_keys(&self) -> impl Iterator)> { - self.primary_keys().filter(|(column_id, _)| { - *column_id != ReservedColumnId::table_id() && *column_id != ReservedColumnId::tsid() - }) - } - /// Returns the remaining columns. fn remaining(&mut self) -> impl Iterator + '_ { self.index.indices[self.index.num_primary_key_column..] @@ -552,59 +491,6 @@ mod tests { assert_eq!(result.schema, expected_sparse_schema()); } - #[test] - fn test_encode_sparse_ignores_input_tsid_column() { - let name_to_column_id = test_name_to_column_id(); - let encoder = RowModifier::default(); - let table_id = 1025; - - let rows_without_tsid = Rows { - schema: test_schema(), - rows: vec![test_row("greptimedb", "127.0.0.1")], - }; - - let mut schema_with_tsid = test_schema(); - schema_with_tsid.push(ColumnSchema { - column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64 as i32, - semantic_type: SemanticType::Tag as _, - datatype_extension: None, - options: None, - }); - let rows_with_tsid = Rows { - schema: schema_with_tsid, - rows: vec![Row { - values: vec![ - ValueData::StringValue("greptimedb".to_string()).into(), - ValueData::StringValue("127.0.0.1".to_string()).into(), - ValueData::U64Value(123).into(), - ], - }], - }; - - let result_without_tsid = encoder - .modify_rows( - RowsIter::new(rows_without_tsid, &name_to_column_id), - TableIdInput::Single(table_id), - PrimaryKeyEncoding::Sparse, - ) - .unwrap(); - let result_with_tsid = encoder - .modify_rows( - RowsIter::new(rows_with_tsid, &name_to_column_id), - TableIdInput::Single(table_id), - PrimaryKeyEncoding::Sparse, - ) - .unwrap(); - - assert_eq!(result_without_tsid.schema, expected_sparse_schema()); - assert_eq!(result_with_tsid.schema, expected_sparse_schema()); - assert_eq!( - result_without_tsid.rows[0].values, - result_with_tsid.rows[0].values - ); - } - fn expected_sparse_schema() -> Vec { vec![ColumnSchema { column_name: PRIMARY_KEY_COLUMN_NAME.to_string(), diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index a85e370abd..1e9fef919a 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -27,7 +27,6 @@ use futures_util::future; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; -use store_api::metric_engine_consts::is_metric_engine_internal_column; use table::TableRef; use table::requests::DeleteRequest as TableDeleteRequest; @@ -100,12 +99,9 @@ impl Deleter { pub async fn handle_table_delete( &self, - mut request: TableDeleteRequest, + request: TableDeleteRequest, ctx: QueryContextRef, ) -> Result { - request - .key_column_values - .retain(|col, _| !is_metric_engine_internal_column(col)); let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table = request.table_name.as_str(); @@ -231,7 +227,6 @@ impl Deleter { .table_info() .meta .row_key_column_names() - .filter(|name| !is_metric_engine_internal_column(name)) .cloned() .chain(iter::once(time_index)) .collect(); diff --git a/src/operator/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs index 1ebfe896af..d68a8987cb 100644 --- a/src/operator/src/req_convert/delete/table_to_region.rs +++ b/src/operator/src/req_convert/delete/table_to_region.rs @@ -38,16 +38,7 @@ impl<'a> TableToRegion<'a> { pub async fn convert(&self, request: TableDeleteRequest) -> Result { let row_count = row_count(&request.key_column_values)?; let schema = column_schema(self.table_info, &request.key_column_values)?; - let vectors = schema - .iter() - .map(|col| { - request - .key_column_values - .get(&col.column_name) - .expect("schema column must exist in delete request") - }) - .collect::>(); - let rows = api::helper::vectors_to_rows(vectors.into_iter(), row_count); + let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count); let rows = Rows { schema, rows }; let requests = Partitioner::new(self.partition_manager) diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index df11dc3b3c..e2e0969035 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -36,7 +36,6 @@ use snafu::{OptionExt, ResultExt, ensure}; use sql::ast::ObjectNamePartExt; use sql::statements::insert::Insert; use sqlparser::ast::{ObjectName, Value as SqlValue}; -use store_api::metric_engine_consts::is_metric_engine_internal_column; use table::TableRef; use table::metadata::TableInfoRef; @@ -384,7 +383,6 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St table_schema .column_schemas() .iter() - .filter(|column| !is_metric_engine_internal_column(&column.name)) .map(|column| &column.name) .collect() } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 067298ee18..ef597ecc38 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -46,7 +46,6 @@ use futures_util::StreamExt; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; use sqlparser::ast::AnalyzeFormat; -use store_api::metric_engine_consts::is_metric_engine_internal_column; use table::TableRef; use table::requests::{DeleteRequest, InsertRequest}; use tracing::Span; @@ -201,7 +200,6 @@ impl DatafusionQueryEngine { let rowkey_columns = table_info .meta .row_key_column_names() - .filter(|name| !is_metric_engine_internal_column(name)) .collect::>(); let column_vectors = column_vectors .into_iter() diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 331f8805ce..faba24a742 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -40,7 +40,6 @@ use sql::statements::explain::ExplainStatement; use sql::statements::query::Query; use sql::statements::statement::Statement; use sql::statements::tql::Tql; -use store_api::metric_engine_consts::is_metric_engine_internal_column; use crate::error::{ CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu, @@ -233,35 +232,7 @@ impl DfLogicalPlanner { .optimize_by_extension_rules(plan, &context)?; common_telemetry::debug!("Logical planner, optimize result: {plan}"); - Self::strip_metric_engine_internal_columns(plan) - } - - fn strip_metric_engine_internal_columns(plan: LogicalPlan) -> Result { - let schema = plan.schema(); - if !schema - .fields() - .iter() - .any(|field| is_metric_engine_internal_column(field.name())) - { - return Ok(plan); - } - - let project_exprs = schema - .fields() - .iter() - .filter(|field| !is_metric_engine_internal_column(field.name())) - .map(|field| col(field.name())) - .collect::>(); - - if project_exprs.is_empty() { - return Ok(plan); - } - - LogicalPlanBuilder::from(plan) - .project(project_exprs) - .context(PlanSqlSnafu)? - .build() - .context(PlanSqlSnafu) + Ok(plan) } /// Generate a relational expression from a SQL expression diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index bdb8d3b1aa..d368ff3235 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -47,7 +47,7 @@ use datafusion::sql::TableReference; use datafusion_common::{DFSchema, NullEquality}; use datafusion_expr::expr::WindowFunctionParams; use datafusion_expr::utils::conjunction; -use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit}; +use datafusion_expr::{ExprSchemable, Literal, SortExpr, TableSource, col, lit}; use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -139,13 +139,12 @@ struct PromPlannerContext { time_index_column: Option, field_columns: Vec, tag_columns: Vec, - /// Metric engine internal series identifier column (`__tsid`). + /// Use metric engine internal series identifier column (`__tsid`) as series key. /// - /// This column is optional: it is present only when the underlying table schema contains - /// [`DATA_SCHEMA_TSID_COLUMN_NAME`] with `UInt64` type. The planner uses it internally as the - /// series key for plans like [`SeriesDivide`] when available, and strips it from the final - /// output. - tsid_column: Option, + /// This is enabled only when the underlying scan can provide `__tsid` (`UInt64`). The planner + /// uses it internally (e.g. as the series key for [`SeriesDivide`]) and strips it from the + /// final output. + use_tsid: bool, /// The matcher for field columns `__field__`. field_column_matcher: Option>, /// The matcher for selectors (normal matchers). @@ -172,7 +171,7 @@ impl PromPlannerContext { self.time_index_column = None; self.field_columns = vec![]; self.tag_columns = vec![]; - self.tsid_column = None; + self.use_tsid = false; self.field_column_matcher = None; self.selector_matcher.clear(); self.schema_name = None; @@ -385,10 +384,8 @@ impl PromPlanner { ) .alias(DATA_SCHEMA_TSID_COLUMN_NAME), ); - self.ctx.tsid_column = Some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); - } else { - self.ctx.tsid_column = None; } + self.ctx.use_tsid = keep_tsid; // create plan let builder = LogicalPlanBuilder::from(input); @@ -443,7 +440,7 @@ impl PromPlanner { field.name() == DATA_SCHEMA_TSID_COLUMN_NAME && field.data_type() == &ArrowDataType::UInt64 }); - self.ctx.tsid_column = input_has_tsid.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); + self.ctx.use_tsid = input_has_tsid; let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?; @@ -495,9 +492,10 @@ impl PromPlanner { .chain(self.create_tag_column_exprs()?) .chain( self.ctx - .tsid_column - .as_ref() - .map(|_| DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME))) + .use_tsid + .then_some(DfExpr::Column(Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + ))) .into_iter(), ) .chain(Some(self.create_time_index_column_expr()?)); @@ -1197,11 +1195,10 @@ impl PromPlanner { .chain(self.create_tag_column_exprs()?) .chain( self.ctx - .tsid_column - .as_ref() - .map(|_| { - DfExpr::Column(Column::new_unqualified(DATA_SCHEMA_TSID_COLUMN_NAME)) - }) + .use_tsid + .then_some(DfExpr::Column(Column::new_unqualified( + DATA_SCHEMA_TSID_COLUMN_NAME, + ))) .into_iter(), ) .chain(Some(self.create_time_index_column_expr()?)) @@ -1216,13 +1213,13 @@ impl PromPlanner { } // make sort plan - let series_key_columns = if self.ctx.tsid_column.is_some() { + let series_key_columns = if self.ctx.use_tsid { vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()] } else { self.ctx.tag_columns.clone() }; - let sort_exprs = if self.ctx.tsid_column.is_some() { + let sort_exprs = if self.ctx.use_tsid { vec![ DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true), self.create_time_index_column_expr()?.sort(true, true), @@ -1441,6 +1438,18 @@ impl PromPlanner { .map(|field| field.name().clone()) } + fn table_from_source(&self, source: &Arc) -> Result { + Ok(source + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table_provider + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table()) + } + fn table_ref(&self) -> Result { let table_name = self .ctx @@ -1528,17 +1537,9 @@ impl PromPlanner { .await .context(CatalogSnafu)?; - let logical_table = provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table(); + let logical_table = self.table_from_source(&provider)?; - let mut scan_table_ref = table_ref.clone(); + let scan_table_ref = table_ref.clone(); let mut scan_provider = provider; let mut table_id_filter: Option = None; @@ -1574,15 +1575,7 @@ impl PromPlanner { if !Arc::ptr_eq(&physical_provider, &scan_provider) { // Only rewrite when internal columns exist in physical schema. - let physical_table = physical_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table(); + let physical_table = self.table_from_source(&physical_provider)?; let has_table_id = physical_table .schema() @@ -1594,29 +1587,20 @@ impl PromPlanner { .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_))); if has_table_id && has_tsid { - scan_table_ref = physical_table_ref; scan_provider = physical_provider; table_id_filter = Some(logical_table.table_info().ident.table_id); } } } - let scan_table = scan_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table(); + let scan_table = self.table_from_source(&scan_provider)?; let use_tsid = table_id_filter.is_some() && scan_table .schema() .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME) .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_))); - self.ctx.tsid_column = use_tsid.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); + self.ctx.use_tsid = use_tsid; let is_time_index_ms = scan_table .schema() @@ -1686,14 +1670,11 @@ impl PromPlanner { .chain(self.create_tag_column_exprs()?) .chain( self.ctx - .tsid_column - .as_ref() - .map(|_| { - DfExpr::Column(Column::new( - Some(scan_table_ref.clone()), - DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - )) - }) + .use_tsid + .then_some(DfExpr::Column(Column::new( + Some(scan_table_ref.clone()), + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + ))) .into_iter(), ) .chain(Some(DfExpr::Alias(Alias { @@ -1726,9 +1707,10 @@ impl PromPlanner { .chain(self.create_tag_column_exprs()?) .chain( self.ctx - .tsid_column - .as_ref() - .map(|_| DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME))) + .use_tsid + .then_some(DfExpr::Column(Column::from_name( + DATA_SCHEMA_TSID_COLUMN_NAME, + ))) .into_iter(), ) .chain(Some(self.create_time_index_column_expr()?)) @@ -1752,22 +1734,14 @@ impl PromPlanner { /// Returns a logical plan for an empty metric. async fn setup_context(&mut self) -> Result> { let table_ref = self.table_ref()?; - let table = match self.table_provider.resolve_table(table_ref.clone()).await { + let source = match self.table_provider.resolve_table(table_ref.clone()).await { Err(e) if e.status_code() == StatusCode::TableNotFound => { let plan = self.setup_context_for_empty_metric()?; return Ok(Some(plan)); } res => res.context(CatalogSnafu)?, }; - let table = table - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table_provider - .as_any() - .downcast_ref::() - .context(UnknownTableSnafu)? - .table(); + let table = self.table_from_source(&source)?; // set time index column name let time_index = table @@ -1802,7 +1776,7 @@ impl PromPlanner { .collect(); self.ctx.tag_columns = tags; - self.ctx.tsid_column = None; + self.ctx.use_tsid = false; Ok(None) } @@ -1814,7 +1788,7 @@ impl PromPlanner { self.ctx.reset_table_name_and_schema(); self.ctx.tag_columns = vec![]; self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; - self.ctx.tsid_column = None; + self.ctx.use_tsid = false; // The table doesn't have any data, so we set start to 0 and end to -1. let plan = LogicalPlan::Extension(Extension { @@ -3222,7 +3196,7 @@ impl PromPlanner { .chain([left_time_index.clone()]) .collect::>(); self.ctx.time_index_column = Some(left_time_index.clone()); - self.ctx.tsid_column = left_context.tsid_column.clone(); + self.ctx.use_tsid = left_context.use_tsid; // alias right time index column if necessary if left_context.time_index_column != right_context.time_index_column { @@ -3493,8 +3467,7 @@ impl PromPlanner { self.ctx.time_index_column = Some(left_time_index_column); self.ctx.tag_columns = all_tags.into_iter().collect(); self.ctx.field_columns = vec![left_field_col.clone()]; - self.ctx.tsid_column = - (left_has_tsid && right_has_tsid).then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string()); + self.ctx.use_tsid = left_has_tsid && right_has_tsid; Ok(result) } @@ -4338,6 +4311,35 @@ mod test { ); } + #[tokio::test] + async fn physical_table_name_is_not_leaked_in_plan() { + let prom_expr = parser::parse("some_metric").unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert!(plan_str.contains("TableScan: some_metric")); + assert!(!plan_str.contains("TableScan: phy")); + } + #[tokio::test] async fn tsid_is_not_used_when_physical_table_is_missing() { let prom_expr = parser::parse("some_metric").unwrap(); diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index 8f538dc982..ee3049c9f7 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -250,9 +250,8 @@ fn create_table_constraints( column: Ident::with_quote(quote_style, column_name), }); } - let primary_key_columns = primary_key_columns_for_show_create(table_meta, engine); - if !primary_key_columns.is_empty() { - let columns = primary_key_columns + if !table_meta.primary_key_indices.is_empty() { + let columns = primary_key_columns_for_show_create(table_meta, engine) .into_iter() .map(|name| Ident::with_quote(quote_style, name)) .collect(); @@ -315,7 +314,6 @@ mod tests { use datatypes::schema::{ FulltextOptions, Schema, SchemaRef, SkippingIndexOptions, VectorIndexOptions, }; - use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use table::metadata::*; use table::requests::{ FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions, @@ -484,51 +482,4 @@ WITH( sql ); } - - #[test] - fn test_show_create_metric_table_empty_primary_key_is_omitted() { - let schema = vec![ - ColumnSchema::new( - "greptime_timestamp", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new("greptime_value", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new( - DATA_SCHEMA_TSID_COLUMN_NAME, - ConcreteDataType::uint64_datatype(), - false, - ), - ]; - let table_schema = SchemaRef::new(Schema::new(schema)); - let meta = TableMetaBuilder::empty() - .schema(table_schema) - .primary_key_indices(vec![2]) - .value_indices(vec![0, 1]) - .engine("metric".to_string()) - .next_column_id(0) - .options(Default::default()) - .created_on(Default::default()) - .build() - .unwrap(); - - let info = Arc::new( - TableInfoBuilder::default() - .table_id(1024) - .table_version(0 as TableVersion) - .name("test_metric_table") - .schema_name("public".to_string()) - .catalog_name("greptime".to_string()) - .desc(None) - .table_type(TableType::Base) - .meta(meta) - .build() - .unwrap(), - ); - - let stmt = create_table_stmt(&info, None, '"').unwrap(); - let sql = format!("\n{}", stmt); - assert!(!sql.contains("PRIMARY KEY")); - } } diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index db6f40db1a..487bd59812 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -34,7 +34,6 @@ use datafusion_expr::LogicalPlan; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; -use store_api::metric_engine_consts::is_metric_engine_internal_column; use crate::error::{self, Result}; use crate::row_writer::{self, MultiTableData}; @@ -270,9 +269,7 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec