simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-18 09:24:02 +08:00
parent e5396b9c30
commit d17a1b91f8
17 changed files with 140 additions and 362 deletions

View File

@@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info, true)?;
let template = build_template_from_raw_table_info(raw_table_info, false)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info, true)?;
let template = build_template_from_raw_table_info(raw_table_info, false)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -19,9 +19,7 @@ use common_telemetry::{debug, error, tracing};
use datafusion::logical_expr::{self, Expr};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, is_metric_engine_internal_column,
};
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::region_engine::{RegionEngine, RegionScannerRef};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -220,10 +218,7 @@ impl MetricEngineInner {
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
for name in logical_columns
.into_iter()
.filter(|name| !is_metric_engine_internal_column(name))
{
for name in logical_columns {
// Safety: logical columns is a strict subset of physical columns
projection.push(physical_metadata.column_index_by_name(&name).unwrap());
}

View File

@@ -111,7 +111,7 @@ impl RowModifier {
.encode_to_vec(internal_columns.into_iter(), &mut buffer)
.context(EncodePrimaryKeySnafu)?;
self.codec
.encode_to_vec(row_iter.user_primary_keys(), &mut buffer)
.encode_to_vec(row_iter.primary_keys(), &mut buffer)
.context(EncodePrimaryKeySnafu)?;
values.push(ValueData::BinaryValue(buffer.clone()).into());
@@ -138,50 +138,27 @@ impl RowModifier {
/// Modifies rows with dense primary key encoding.
/// It adds two columns(`__table_id`, `__tsid`) to the row.
fn modify_rows_dense(&self, mut iter: RowsIter, table_ids: TableIdInput<'_>) -> Result<Rows> {
let table_id_index = iter
.rows
.schema
.iter()
.position(|col| col.column_name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME);
let tsid_index = iter
.rows
.schema
.iter()
.position(|col| col.column_name == DATA_SCHEMA_TSID_COLUMN_NAME);
if table_id_index.is_none() {
iter.rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
}
if tsid_index.is_none() {
iter.rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
}
// add table_name column
iter.rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
// add tsid column
iter.rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
for (row_index, row_iter) in iter.iter_mut().enumerate() {
let table_id = table_ids.table_id_for_row(row_index);
let (table_id_value, tsid) = Self::fill_internal_columns(table_id, &row_iter);
if let Some(table_id_index) = table_id_index {
row_iter.row.values[table_id_index] = table_id_value;
} else {
row_iter.row.values.push(table_id_value);
}
if let Some(tsid_index) = tsid_index {
row_iter.row.values[tsid_index] = tsid;
} else {
row_iter.row.values.push(tsid);
}
row_iter.row.values.push(table_id_value);
row_iter.row.values.push(tsid);
}
Ok(iter.rows)
@@ -205,15 +182,7 @@ impl RowModifier {
let ts_id = if !iter.has_null_labels() {
// No null labels in row, we can safely reuse the precomputed label name hash.
let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash);
for (name, value) in iter.primary_keys_with_name() {
// Internal columns are not part of TSID generation.
// They may appear when request rows are derived from scans that include them.
if matches!(
name.as_str(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
) {
continue;
}
for (_, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = &value.value_data {
ts_id_gen.write_str(string);
@@ -230,13 +199,6 @@ impl RowModifier {
let mut hasher = TsidGenerator::default();
// 1. Find out label names with non-null values and get the hash.
for (name, value) in iter.primary_keys_with_name() {
// Internal columns are not part of TSID generation.
if matches!(
name.as_str(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
) {
continue;
}
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(_)) = &value.value_data {
hasher.write_str(name);
@@ -246,14 +208,7 @@ impl RowModifier {
// 2. Use label name hash as seed and continue with label values.
let mut final_hasher = TsidGenerator::new(label_name_hash);
for (name, value) in iter.primary_keys_with_name() {
// Internal columns are not part of TSID generation.
if matches!(
name.as_str(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
) {
continue;
}
for (_, value) in iter.primary_keys_with_name() {
if let Some(ValueData::StringValue(value)) = &value.value_data {
final_hasher.write_str(value);
}
@@ -413,13 +368,6 @@ pub struct RowIter<'a> {
}
impl RowIter<'_> {
fn is_internal_column(&self, idx: &ValueIndex) -> bool {
matches!(
self.schema[idx.index].column_name.as_str(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
)
}
/// Returns the primary keys with their names.
fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
self.index.indices[..self.index.num_primary_key_column]
@@ -436,9 +384,7 @@ impl RowIter<'_> {
fn has_null_labels(&self) -> bool {
self.index.indices[..self.index.num_primary_key_column]
.iter()
.any(|idx| {
!self.is_internal_column(idx) && self.row.values[idx.index].value_data.is_none()
})
.any(|idx| self.row.values[idx.index].value_data.is_none())
}
/// Returns the primary keys.
@@ -456,13 +402,6 @@ impl RowIter<'_> {
})
}
/// Returns the primary keys excluding reserved internal columns.
pub fn user_primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
self.primary_keys().filter(|(column_id, _)| {
*column_id != ReservedColumnId::table_id() && *column_id != ReservedColumnId::tsid()
})
}
/// Returns the remaining columns.
fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
self.index.indices[self.index.num_primary_key_column..]
@@ -552,59 +491,6 @@ mod tests {
assert_eq!(result.schema, expected_sparse_schema());
}
#[test]
fn test_encode_sparse_ignores_input_tsid_column() {
let name_to_column_id = test_name_to_column_id();
let encoder = RowModifier::default();
let table_id = 1025;
let rows_without_tsid = Rows {
schema: test_schema(),
rows: vec![test_row("greptimedb", "127.0.0.1")],
};
let mut schema_with_tsid = test_schema();
schema_with_tsid.push(ColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
let rows_with_tsid = Rows {
schema: schema_with_tsid,
rows: vec![Row {
values: vec![
ValueData::StringValue("greptimedb".to_string()).into(),
ValueData::StringValue("127.0.0.1".to_string()).into(),
ValueData::U64Value(123).into(),
],
}],
};
let result_without_tsid = encoder
.modify_rows(
RowsIter::new(rows_without_tsid, &name_to_column_id),
TableIdInput::Single(table_id),
PrimaryKeyEncoding::Sparse,
)
.unwrap();
let result_with_tsid = encoder
.modify_rows(
RowsIter::new(rows_with_tsid, &name_to_column_id),
TableIdInput::Single(table_id),
PrimaryKeyEncoding::Sparse,
)
.unwrap();
assert_eq!(result_without_tsid.schema, expected_sparse_schema());
assert_eq!(result_with_tsid.schema, expected_sparse_schema());
assert_eq!(
result_without_tsid.rows[0].values,
result_with_tsid.rows[0].values
);
}
fn expected_sparse_schema() -> Vec<ColumnSchema> {
vec![ColumnSchema {
column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),

View File

@@ -27,7 +27,6 @@ use futures_util::future;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metric_engine_consts::is_metric_engine_internal_column;
use table::TableRef;
use table::requests::DeleteRequest as TableDeleteRequest;
@@ -100,12 +99,9 @@ impl Deleter {
pub async fn handle_table_delete(
&self,
mut request: TableDeleteRequest,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
request
.key_column_values
.retain(|col, _| !is_metric_engine_internal_column(col));
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table = request.table_name.as_str();
@@ -231,7 +227,6 @@ impl Deleter {
.table_info()
.meta
.row_key_column_names()
.filter(|name| !is_metric_engine_internal_column(name))
.cloned()
.chain(iter::once(time_index))
.collect();

View File

@@ -38,16 +38,7 @@ impl<'a> TableToRegion<'a> {
pub async fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
let row_count = row_count(&request.key_column_values)?;
let schema = column_schema(self.table_info, &request.key_column_values)?;
let vectors = schema
.iter()
.map(|col| {
request
.key_column_values
.get(&col.column_name)
.expect("schema column must exist in delete request")
})
.collect::<Vec<_>>();
let rows = api::helper::vectors_to_rows(vectors.into_iter(), row_count);
let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
let rows = Rows { schema, rows };
let requests = Partitioner::new(self.partition_manager)

View File

@@ -36,7 +36,6 @@ use snafu::{OptionExt, ResultExt, ensure};
use sql::ast::ObjectNamePartExt;
use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use store_api::metric_engine_consts::is_metric_engine_internal_column;
use table::TableRef;
use table::metadata::TableInfoRef;
@@ -384,7 +383,6 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St
table_schema
.column_schemas()
.iter()
.filter(|column| !is_metric_engine_internal_column(&column.name))
.map(|column| &column.name)
.collect()
}

View File

@@ -46,7 +46,6 @@ use futures_util::StreamExt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use sqlparser::ast::AnalyzeFormat;
use store_api::metric_engine_consts::is_metric_engine_internal_column;
use table::TableRef;
use table::requests::{DeleteRequest, InsertRequest};
use tracing::Span;
@@ -201,7 +200,6 @@ impl DatafusionQueryEngine {
let rowkey_columns = table_info
.meta
.row_key_column_names()
.filter(|name| !is_metric_engine_internal_column(name))
.collect::<Vec<&String>>();
let column_vectors = column_vectors
.into_iter()

View File

@@ -40,7 +40,6 @@ use sql::statements::explain::ExplainStatement;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use store_api::metric_engine_consts::is_metric_engine_internal_column;
use crate::error::{
CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
@@ -233,35 +232,7 @@ impl DfLogicalPlanner {
.optimize_by_extension_rules(plan, &context)?;
common_telemetry::debug!("Logical planner, optimize result: {plan}");
Self::strip_metric_engine_internal_columns(plan)
}
fn strip_metric_engine_internal_columns(plan: LogicalPlan) -> Result<LogicalPlan> {
let schema = plan.schema();
if !schema
.fields()
.iter()
.any(|field| is_metric_engine_internal_column(field.name()))
{
return Ok(plan);
}
let project_exprs = schema
.fields()
.iter()
.filter(|field| !is_metric_engine_internal_column(field.name()))
.map(|field| col(field.name()))
.collect::<Vec<_>>();
if project_exprs.is_empty() {
return Ok(plan);
}
LogicalPlanBuilder::from(plan)
.project(project_exprs)
.context(PlanSqlSnafu)?
.build()
.context(PlanSqlSnafu)
Ok(plan)
}
/// Generate a relational expression from a SQL expression

View File

@@ -47,7 +47,7 @@ use datafusion::sql::TableReference;
use datafusion_common::{DFSchema, NullEquality};
use datafusion_expr::expr::WindowFunctionParams;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit};
use datafusion_expr::{ExprSchemable, Literal, SortExpr, TableSource, col, lit};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
@@ -139,13 +139,12 @@ struct PromPlannerContext {
time_index_column: Option<String>,
field_columns: Vec<String>,
tag_columns: Vec<String>,
/// Metric engine internal series identifier column (`__tsid`).
/// Use metric engine internal series identifier column (`__tsid`) as series key.
///
/// This column is optional: it is present only when the underlying table schema contains
/// [`DATA_SCHEMA_TSID_COLUMN_NAME`] with `UInt64` type. The planner uses it internally as the
/// series key for plans like [`SeriesDivide`] when available, and strips it from the final
/// output.
tsid_column: Option<String>,
/// This is enabled only when the underlying scan can provide `__tsid` (`UInt64`). The planner
/// uses it internally (e.g. as the series key for [`SeriesDivide`]) and strips it from the
/// final output.
use_tsid: bool,
/// The matcher for field columns `__field__`.
field_column_matcher: Option<Vec<Matcher>>,
/// The matcher for selectors (normal matchers).
@@ -172,7 +171,7 @@ impl PromPlannerContext {
self.time_index_column = None;
self.field_columns = vec![];
self.tag_columns = vec![];
self.tsid_column = None;
self.use_tsid = false;
self.field_column_matcher = None;
self.selector_matcher.clear();
self.schema_name = None;
@@ -385,10 +384,8 @@ impl PromPlanner {
)
.alias(DATA_SCHEMA_TSID_COLUMN_NAME),
);
self.ctx.tsid_column = Some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
} else {
self.ctx.tsid_column = None;
}
self.ctx.use_tsid = keep_tsid;
// create plan
let builder = LogicalPlanBuilder::from(input);
@@ -443,7 +440,7 @@ impl PromPlanner {
field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
&& field.data_type() == &ArrowDataType::UInt64
});
self.ctx.tsid_column = input_has_tsid.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
self.ctx.use_tsid = input_has_tsid;
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
@@ -495,9 +492,10 @@ impl PromPlanner {
.chain(self.create_tag_column_exprs()?)
.chain(
self.ctx
.tsid_column
.as_ref()
.map(|_| DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)))
.use_tsid
.then_some(DfExpr::Column(Column::from_name(
DATA_SCHEMA_TSID_COLUMN_NAME,
)))
.into_iter(),
)
.chain(Some(self.create_time_index_column_expr()?));
@@ -1197,11 +1195,10 @@ impl PromPlanner {
.chain(self.create_tag_column_exprs()?)
.chain(
self.ctx
.tsid_column
.as_ref()
.map(|_| {
DfExpr::Column(Column::new_unqualified(DATA_SCHEMA_TSID_COLUMN_NAME))
})
.use_tsid
.then_some(DfExpr::Column(Column::new_unqualified(
DATA_SCHEMA_TSID_COLUMN_NAME,
)))
.into_iter(),
)
.chain(Some(self.create_time_index_column_expr()?))
@@ -1216,13 +1213,13 @@ impl PromPlanner {
}
// make sort plan
let series_key_columns = if self.ctx.tsid_column.is_some() {
let series_key_columns = if self.ctx.use_tsid {
vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
} else {
self.ctx.tag_columns.clone()
};
let sort_exprs = if self.ctx.tsid_column.is_some() {
let sort_exprs = if self.ctx.use_tsid {
vec![
DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
self.create_time_index_column_expr()?.sort(true, true),
@@ -1441,6 +1438,18 @@ impl PromPlanner {
.map(|field| field.name().clone())
}
fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
Ok(source
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table())
}
fn table_ref(&self) -> Result<TableReference> {
let table_name = self
.ctx
@@ -1528,17 +1537,9 @@ impl PromPlanner {
.await
.context(CatalogSnafu)?;
let logical_table = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table();
let logical_table = self.table_from_source(&provider)?;
let mut scan_table_ref = table_ref.clone();
let scan_table_ref = table_ref.clone();
let mut scan_provider = provider;
let mut table_id_filter: Option<u32> = None;
@@ -1574,15 +1575,7 @@ impl PromPlanner {
if !Arc::ptr_eq(&physical_provider, &scan_provider) {
// Only rewrite when internal columns exist in physical schema.
let physical_table = physical_provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table();
let physical_table = self.table_from_source(&physical_provider)?;
let has_table_id = physical_table
.schema()
@@ -1594,29 +1587,20 @@ impl PromPlanner {
.is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
if has_table_id && has_tsid {
scan_table_ref = physical_table_ref;
scan_provider = physical_provider;
table_id_filter = Some(logical_table.table_info().ident.table_id);
}
}
}
let scan_table = scan_provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table();
let scan_table = self.table_from_source(&scan_provider)?;
let use_tsid = table_id_filter.is_some()
&& scan_table
.schema()
.column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
self.ctx.tsid_column = use_tsid.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
self.ctx.use_tsid = use_tsid;
let is_time_index_ms = scan_table
.schema()
@@ -1686,14 +1670,11 @@ impl PromPlanner {
.chain(self.create_tag_column_exprs()?)
.chain(
self.ctx
.tsid_column
.as_ref()
.map(|_| {
DfExpr::Column(Column::new(
Some(scan_table_ref.clone()),
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
))
})
.use_tsid
.then_some(DfExpr::Column(Column::new(
Some(scan_table_ref.clone()),
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
)))
.into_iter(),
)
.chain(Some(DfExpr::Alias(Alias {
@@ -1726,9 +1707,10 @@ impl PromPlanner {
.chain(self.create_tag_column_exprs()?)
.chain(
self.ctx
.tsid_column
.as_ref()
.map(|_| DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)))
.use_tsid
.then_some(DfExpr::Column(Column::from_name(
DATA_SCHEMA_TSID_COLUMN_NAME,
)))
.into_iter(),
)
.chain(Some(self.create_time_index_column_expr()?))
@@ -1752,22 +1734,14 @@ impl PromPlanner {
/// Returns a logical plan for an empty metric.
async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
let table_ref = self.table_ref()?;
let table = match self.table_provider.resolve_table(table_ref.clone()).await {
let source = match self.table_provider.resolve_table(table_ref.clone()).await {
Err(e) if e.status_code() == StatusCode::TableNotFound => {
let plan = self.setup_context_for_empty_metric()?;
return Ok(Some(plan));
}
res => res.context(CatalogSnafu)?,
};
let table = table
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table();
let table = self.table_from_source(&source)?;
// set time index column name
let time_index = table
@@ -1802,7 +1776,7 @@ impl PromPlanner {
.collect();
self.ctx.tag_columns = tags;
self.ctx.tsid_column = None;
self.ctx.use_tsid = false;
Ok(None)
}
@@ -1814,7 +1788,7 @@ impl PromPlanner {
self.ctx.reset_table_name_and_schema();
self.ctx.tag_columns = vec![];
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.tsid_column = None;
self.ctx.use_tsid = false;
// The table doesn't have any data, so we set start to 0 and end to -1.
let plan = LogicalPlan::Extension(Extension {
@@ -3222,7 +3196,7 @@ impl PromPlanner {
.chain([left_time_index.clone()])
.collect::<Vec<_>>();
self.ctx.time_index_column = Some(left_time_index.clone());
self.ctx.tsid_column = left_context.tsid_column.clone();
self.ctx.use_tsid = left_context.use_tsid;
// alias right time index column if necessary
if left_context.time_index_column != right_context.time_index_column {
@@ -3493,8 +3467,7 @@ impl PromPlanner {
self.ctx.time_index_column = Some(left_time_index_column);
self.ctx.tag_columns = all_tags.into_iter().collect();
self.ctx.field_columns = vec![left_field_col.clone()];
self.ctx.tsid_column =
(left_has_tsid && right_has_tsid).then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
self.ctx.use_tsid = left_has_tsid && right_has_tsid;
Ok(result)
}
@@ -4338,6 +4311,35 @@ mod test {
);
}
#[tokio::test]
async fn physical_table_name_is_not_leaked_in_plan() {
let prom_expr = parser::parse("some_metric").unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let table_provider = build_test_table_provider_with_tsid(
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
1,
1,
)
.await;
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let plan_str = plan.display_indent_schema().to_string();
assert!(plan_str.contains("TableScan: some_metric"));
assert!(!plan_str.contains("TableScan: phy"));
}
#[tokio::test]
async fn tsid_is_not_used_when_physical_table_is_missing() {
let prom_expr = parser::parse("some_metric").unwrap();

View File

@@ -250,9 +250,8 @@ fn create_table_constraints(
column: Ident::with_quote(quote_style, column_name),
});
}
let primary_key_columns = primary_key_columns_for_show_create(table_meta, engine);
if !primary_key_columns.is_empty() {
let columns = primary_key_columns
if !table_meta.primary_key_indices.is_empty() {
let columns = primary_key_columns_for_show_create(table_meta, engine)
.into_iter()
.map(|name| Ident::with_quote(quote_style, name))
.collect();
@@ -315,7 +314,6 @@ mod tests {
use datatypes::schema::{
FulltextOptions, Schema, SchemaRef, SkippingIndexOptions, VectorIndexOptions,
};
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use table::metadata::*;
use table::requests::{
FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions,
@@ -484,51 +482,4 @@ WITH(
sql
);
}
#[test]
fn test_show_create_metric_table_empty_primary_key_is_omitted() {
let schema = vec![
ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("greptime_value", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::uint64_datatype(),
false,
),
];
let table_schema = SchemaRef::new(Schema::new(schema));
let meta = TableMetaBuilder::empty()
.schema(table_schema)
.primary_key_indices(vec![2])
.value_indices(vec![0, 1])
.engine("metric".to_string())
.next_column_id(0)
.options(Default::default())
.created_on(Default::default())
.build()
.unwrap();
let info = Arc::new(
TableInfoBuilder::default()
.table_id(1024)
.table_version(0 as TableVersion)
.name("test_metric_table")
.schema_name("public".to_string())
.catalog_name("greptime".to_string())
.desc(None)
.table_type(TableType::Base)
.meta(meta)
.build()
.unwrap(),
);
let stmt = create_table_stmt(&info, None, '"').unwrap();
let sql = format!("\n{}", stmt);
assert!(!sql.contains("PRIMARY KEY"));
}
}

View File

@@ -34,7 +34,6 @@ use datafusion_expr::LogicalPlan;
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
use store_api::metric_engine_consts::is_metric_engine_internal_column;
use crate::error::{self, Result};
use crate::row_writer::{self, MultiTableData};
@@ -270,9 +269,7 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<Ti
let columns = column_names
.enumerate()
.filter(|(_, column_name)| {
*column_name != greptime_timestamp()
&& *column_name != greptime_value()
&& !is_metric_engine_internal_column(column_name.as_str())
*column_name != greptime_timestamp() && *column_name != greptime_value()
})
.map(|(i, column_name)| {
(

View File

@@ -1,7 +1,3 @@
USE public;
Affected Rows: 0
CREATE TABLE system_metrics (
host STRING,
idc STRING,

View File

@@ -1,5 +1,3 @@
USE public;
CREATE TABLE system_metrics (
host STRING,
idc STRING,

View File

@@ -64,14 +64,14 @@ DESC TABLE phy;
SELECT ts, val, __tsid, host, job FROM phy;
+-------------------------+-----+-------+------+
| ts | val | host | job |
+-------------------------+-----+-------+------+
| 1970-01-01T00:00:00.001 | 1.0 | host2 | |
| 1970-01-01T00:00:00 | 0.0 | host1 | |
| 1970-01-01T00:00:00 | 0.0 | | job1 |
| 1970-01-01T00:00:00.001 | 1.0 | | job2 |
+-------------------------+-----+-------+------+
+-------------------------+-----+----------------------+-------+------+
| ts | val | __tsid | host | job |
+-------------------------+-----+----------------------+-------+------+
| 1970-01-01T00:00:00.001 | 1.0 | 7947983149541006936 | host2 | |
| 1970-01-01T00:00:00 | 0.0 | 13882403126406556045 | host1 | |
| 1970-01-01T00:00:00 | 0.0 | 6248409809737953425 | | job1 |
| 1970-01-01T00:00:00.001 | 1.0 | 12867770218286207316 | | job2 |
+-------------------------+-----+----------------------+-------+------+
DROP TABLE phy;

View File

@@ -780,14 +780,14 @@ tql eval(1000, 2000, '300s') unknown_metric or node_network_transmit_bytes_total
-- SQLNESS SORT_RESULT 3 1
tql eval(1000, 2000, '300s') sum by (cloud, tag0, tag1) (node_network_transmit_bytes_total) or unknown_metric;
+---------------------+---------+-----------------------------------+
| greptime_timestamp | cloud | sum(test_physical.greptime_value) |
+---------------------+---------+-----------------------------------+
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
+---------------------+---------+-----------------------------------+
+---------------------+---------+-------------------------------------------------------+
| greptime_timestamp | cloud | sum(node_network_transmit_bytes_total.greptime_value) |
+---------------------+---------+-------------------------------------------------------+
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
+---------------------+---------+-------------------------------------------------------+
-- Or with unknown label and metric.
-- SQLNESS SORT_RESULT 3 1
@@ -805,14 +805,14 @@ tql eval(1000, 2000, '300s') unknown_metric or unknown_metric1 or sum by (cloud,
-- SQLNESS SORT_RESULT 3 1
tql eval(1000, 2000, '300s') sum by (cloud, tag0, tag1) (node_network_transmit_bytes_total) or sum by (cloud, tag0, tag1) (unknown_metric);
+---------------------+---------+-----------------------------------+
| greptime_timestamp | cloud | sum(test_physical.greptime_value) |
+---------------------+---------+-----------------------------------+
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
+---------------------+---------+-----------------------------------+
+---------------------+---------+-------------------------------------------------------+
| greptime_timestamp | cloud | sum(node_network_transmit_bytes_total.greptime_value) |
+---------------------+---------+-------------------------------------------------------+
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
+---------------------+---------+-------------------------------------------------------+
-- Or with unknown label dst_namespace.
-- SQLNESS SORT_RESULT 3 1

View File

@@ -47,10 +47,10 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_physical.val)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_physical.val)] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
@@ -81,10 +81,10 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_physical.val), __tsid] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_physical.val), __tsid] REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_SortExec: expr=[__tsid@3 ASC, ts@4 ASC], preserve_partitioning=[true] REDACTED