mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-03 22:00:38 +00:00
expose tsid on logical table's schema and use it on planner
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -138,6 +138,7 @@ impl CreateLogicalTablesProcedure {
|
||||
/// Abort(not-retry):
|
||||
/// - Failed to create table metadata.
|
||||
pub async fn on_create_metadata(&mut self) -> Result<Status> {
|
||||
self.add_tsid_column_to_logical_tables();
|
||||
self.update_physical_table_metadata().await?;
|
||||
let table_ids = self.create_logical_tables_metadata().await?;
|
||||
|
||||
|
||||
@@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info(
|
||||
raw_table_info: &RawTableInfo,
|
||||
physical_table_id: TableId,
|
||||
) -> Result<CreateRequestBuilder> {
|
||||
let template = build_template_from_raw_table_info(raw_table_info, false)?;
|
||||
let template = build_template_from_raw_table_info(raw_table_info, true)?;
|
||||
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
|
||||
}
|
||||
|
||||
@@ -17,7 +17,9 @@ use std::ops::Deref;
|
||||
use common_telemetry::{info, warn};
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
@@ -27,6 +29,20 @@ use crate::error::{Result, TableInfoNotFoundSnafu};
|
||||
use crate::instruction::CacheIdent;
|
||||
|
||||
impl CreateLogicalTablesProcedure {
|
||||
pub(crate) fn add_tsid_column_to_logical_tables(&mut self) {
|
||||
for (task, table_id_already_exists) in self
|
||||
.data
|
||||
.tasks
|
||||
.iter_mut()
|
||||
.zip(self.data.table_ids_already_exists.iter())
|
||||
{
|
||||
if table_id_already_exists.is_some() {
|
||||
continue;
|
||||
}
|
||||
add_tsid_column_to_raw_table_info(&mut task.table_info);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
|
||||
if self.data.physical_columns.is_empty() {
|
||||
warn!(
|
||||
@@ -128,3 +144,58 @@ impl CreateLogicalTablesProcedure {
|
||||
Ok(table_ids)
|
||||
}
|
||||
}
|
||||
|
||||
fn add_tsid_column_to_raw_table_info(table_info: &mut RawTableInfo) {
|
||||
if table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.any(|col| col.name == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let should_update_column_ids =
|
||||
table_info.meta.column_ids.len() == table_info.meta.schema.column_schemas.len();
|
||||
let column_index = table_info.meta.schema.column_schemas.len();
|
||||
table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.push(datatypes::schema::ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
datatypes::prelude::ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
));
|
||||
table_info.meta.primary_key_indices.push(column_index);
|
||||
if should_update_column_ids {
|
||||
table_info.meta.column_ids.push(ReservedColumnId::tsid());
|
||||
}
|
||||
table_info.sort_columns();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::ddl::test_util::test_create_logical_table_task;
|
||||
|
||||
#[test]
|
||||
fn add_tsid_preserves_column_ids_when_present() {
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
let schema_len = task.table_info.meta.schema.column_schemas.len();
|
||||
task.table_info.meta.column_ids = (0..schema_len as u32).collect();
|
||||
|
||||
add_tsid_column_to_raw_table_info(&mut task.table_info);
|
||||
|
||||
assert_eq!(
|
||||
task.table_info.meta.column_ids.len(),
|
||||
task.table_info.meta.schema.column_schemas.len()
|
||||
);
|
||||
let name_to_ids = task.table_info.name_to_ids().unwrap();
|
||||
assert_eq!(
|
||||
name_to_ids.get(DATA_SCHEMA_TSID_COLUMN_NAME),
|
||||
Some(&ReservedColumnId::tsid())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -553,6 +553,7 @@ async fn test_on_part_duplicate_alter_request() {
|
||||
assert_eq!(
|
||||
table1_cols,
|
||||
vec![
|
||||
"__tsid".to_string(),
|
||||
"col_0".to_string(),
|
||||
"cpu".to_string(),
|
||||
"host".to_string(),
|
||||
@@ -572,6 +573,7 @@ async fn test_on_part_duplicate_alter_request() {
|
||||
assert_eq!(
|
||||
table2_cols,
|
||||
vec![
|
||||
"__tsid".to_string(),
|
||||
"col_0".to_string(),
|
||||
"cpu".to_string(),
|
||||
"host".to_string(),
|
||||
|
||||
@@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info(
|
||||
raw_table_info: &RawTableInfo,
|
||||
physical_table_id: TableId,
|
||||
) -> Result<CreateRequestBuilder> {
|
||||
let template = build_template_from_raw_table_info(raw_table_info, false)?;
|
||||
let template = build_template_from_raw_table_info(raw_table_info, true)?;
|
||||
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,9 @@ use common_telemetry::{debug, error, tracing};
|
||||
use datafusion::logical_expr::{self, Expr};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
|
||||
use store_api::metric_engine_consts::{
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, is_metric_engine_internal_column,
|
||||
};
|
||||
use store_api::region_engine::{RegionEngine, RegionScannerRef};
|
||||
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
|
||||
|
||||
@@ -218,7 +220,10 @@ impl MetricEngineInner {
|
||||
.get_metadata(data_region_id)
|
||||
.await
|
||||
.context(MitoReadOperationSnafu)?;
|
||||
for name in logical_columns {
|
||||
for name in logical_columns
|
||||
.into_iter()
|
||||
.filter(|name| !is_metric_engine_internal_column(name))
|
||||
{
|
||||
// Safety: logical columns is a strict subset of physical columns
|
||||
projection.push(physical_metadata.column_index_by_name(&name).unwrap());
|
||||
}
|
||||
@@ -338,7 +343,7 @@ mod test {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
|
||||
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 3, 0, 1]);
|
||||
assert_eq!(scan_req.filters.len(), 1);
|
||||
assert_eq!(
|
||||
scan_req.filters[0],
|
||||
|
||||
@@ -16,13 +16,30 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error::Result;
|
||||
|
||||
impl MetricEngineInner {
|
||||
fn tsid_column_metadata() -> ColumnMetadata {
|
||||
ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: ReservedColumnId::tsid(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load column metadata of a logical region.
|
||||
///
|
||||
/// The return value is ordered on column name.
|
||||
@@ -54,6 +71,7 @@ impl MetricEngineInner {
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(_, column_metadata)| column_metadata)
|
||||
.chain(std::iter::once(Self::tsid_column_metadata()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Update cache
|
||||
|
||||
@@ -111,7 +111,7 @@ impl RowModifier {
|
||||
.encode_to_vec(internal_columns.into_iter(), &mut buffer)
|
||||
.context(EncodePrimaryKeySnafu)?;
|
||||
self.codec
|
||||
.encode_to_vec(row_iter.primary_keys(), &mut buffer)
|
||||
.encode_to_vec(row_iter.user_primary_keys(), &mut buffer)
|
||||
.context(EncodePrimaryKeySnafu)?;
|
||||
|
||||
values.push(ValueData::BinaryValue(buffer.clone()).into());
|
||||
@@ -138,27 +138,50 @@ impl RowModifier {
|
||||
/// Modifies rows with dense primary key encoding.
|
||||
/// It adds two columns(`__table_id`, `__tsid`) to the row.
|
||||
fn modify_rows_dense(&self, mut iter: RowsIter, table_ids: TableIdInput<'_>) -> Result<Rows> {
|
||||
// add table_name column
|
||||
iter.rows.schema.push(ColumnSchema {
|
||||
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint32 as i32,
|
||||
semantic_type: SemanticType::Tag as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
// add tsid column
|
||||
iter.rows.schema.push(ColumnSchema {
|
||||
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64 as i32,
|
||||
semantic_type: SemanticType::Tag as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
let table_id_index = iter
|
||||
.rows
|
||||
.schema
|
||||
.iter()
|
||||
.position(|col| col.column_name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME);
|
||||
let tsid_index = iter
|
||||
.rows
|
||||
.schema
|
||||
.iter()
|
||||
.position(|col| col.column_name == DATA_SCHEMA_TSID_COLUMN_NAME);
|
||||
|
||||
if table_id_index.is_none() {
|
||||
iter.rows.schema.push(ColumnSchema {
|
||||
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint32 as i32,
|
||||
semantic_type: SemanticType::Tag as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
}
|
||||
if tsid_index.is_none() {
|
||||
iter.rows.schema.push(ColumnSchema {
|
||||
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64 as i32,
|
||||
semantic_type: SemanticType::Tag as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
}
|
||||
|
||||
for (row_index, row_iter) in iter.iter_mut().enumerate() {
|
||||
let table_id = table_ids.table_id_for_row(row_index);
|
||||
let (table_id_value, tsid) = Self::fill_internal_columns(table_id, &row_iter);
|
||||
row_iter.row.values.push(table_id_value);
|
||||
row_iter.row.values.push(tsid);
|
||||
if let Some(table_id_index) = table_id_index {
|
||||
row_iter.row.values[table_id_index] = table_id_value;
|
||||
} else {
|
||||
row_iter.row.values.push(table_id_value);
|
||||
}
|
||||
|
||||
if let Some(tsid_index) = tsid_index {
|
||||
row_iter.row.values[tsid_index] = tsid;
|
||||
} else {
|
||||
row_iter.row.values.push(tsid);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(iter.rows)
|
||||
@@ -182,7 +205,15 @@ impl RowModifier {
|
||||
let ts_id = if !iter.has_null_labels() {
|
||||
// No null labels in row, we can safely reuse the precomputed label name hash.
|
||||
let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash);
|
||||
for (_, value) in iter.primary_keys_with_name() {
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// Internal columns are not part of TSID generation.
|
||||
// They may appear when request rows are derived from scans that include them.
|
||||
if matches!(
|
||||
name.as_str(),
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
// The type is checked before. So only null is ignored.
|
||||
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
||||
ts_id_gen.write_str(string);
|
||||
@@ -199,6 +230,13 @@ impl RowModifier {
|
||||
let mut hasher = TsidGenerator::default();
|
||||
// 1. Find out label names with non-null values and get the hash.
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// Internal columns are not part of TSID generation.
|
||||
if matches!(
|
||||
name.as_str(),
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
// The type is checked before. So only null is ignored.
|
||||
if let Some(ValueData::StringValue(_)) = &value.value_data {
|
||||
hasher.write_str(name);
|
||||
@@ -208,7 +246,14 @@ impl RowModifier {
|
||||
|
||||
// 2. Use label name hash as seed and continue with label values.
|
||||
let mut final_hasher = TsidGenerator::new(label_name_hash);
|
||||
for (_, value) in iter.primary_keys_with_name() {
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// Internal columns are not part of TSID generation.
|
||||
if matches!(
|
||||
name.as_str(),
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if let Some(ValueData::StringValue(value)) = &value.value_data {
|
||||
final_hasher.write_str(value);
|
||||
}
|
||||
@@ -368,6 +413,13 @@ pub struct RowIter<'a> {
|
||||
}
|
||||
|
||||
impl RowIter<'_> {
|
||||
fn is_internal_column(&self, idx: &ValueIndex) -> bool {
|
||||
matches!(
|
||||
self.schema[idx.index].column_name.as_str(),
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME | DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the primary keys with their names.
|
||||
fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
|
||||
self.index.indices[..self.index.num_primary_key_column]
|
||||
@@ -384,7 +436,9 @@ impl RowIter<'_> {
|
||||
fn has_null_labels(&self) -> bool {
|
||||
self.index.indices[..self.index.num_primary_key_column]
|
||||
.iter()
|
||||
.any(|idx| self.row.values[idx.index].value_data.is_none())
|
||||
.any(|idx| {
|
||||
!self.is_internal_column(idx) && self.row.values[idx.index].value_data.is_none()
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the primary keys.
|
||||
@@ -402,6 +456,13 @@ impl RowIter<'_> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the primary keys excluding reserved internal columns.
|
||||
pub fn user_primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
|
||||
self.primary_keys().filter(|(column_id, _)| {
|
||||
*column_id != ReservedColumnId::table_id() && *column_id != ReservedColumnId::tsid()
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the remaining columns.
|
||||
fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
|
||||
self.index.indices[self.index.num_primary_key_column..]
|
||||
@@ -491,6 +552,59 @@ mod tests {
|
||||
assert_eq!(result.schema, expected_sparse_schema());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_sparse_ignores_input_tsid_column() {
|
||||
let name_to_column_id = test_name_to_column_id();
|
||||
let encoder = RowModifier::default();
|
||||
let table_id = 1025;
|
||||
|
||||
let rows_without_tsid = Rows {
|
||||
schema: test_schema(),
|
||||
rows: vec![test_row("greptimedb", "127.0.0.1")],
|
||||
};
|
||||
|
||||
let mut schema_with_tsid = test_schema();
|
||||
schema_with_tsid.push(ColumnSchema {
|
||||
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64 as i32,
|
||||
semantic_type: SemanticType::Tag as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
let rows_with_tsid = Rows {
|
||||
schema: schema_with_tsid,
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
ValueData::StringValue("greptimedb".to_string()).into(),
|
||||
ValueData::StringValue("127.0.0.1".to_string()).into(),
|
||||
ValueData::U64Value(123).into(),
|
||||
],
|
||||
}],
|
||||
};
|
||||
|
||||
let result_without_tsid = encoder
|
||||
.modify_rows(
|
||||
RowsIter::new(rows_without_tsid, &name_to_column_id),
|
||||
TableIdInput::Single(table_id),
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
)
|
||||
.unwrap();
|
||||
let result_with_tsid = encoder
|
||||
.modify_rows(
|
||||
RowsIter::new(rows_with_tsid, &name_to_column_id),
|
||||
TableIdInput::Single(table_id),
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result_without_tsid.schema, expected_sparse_schema());
|
||||
assert_eq!(result_with_tsid.schema, expected_sparse_schema());
|
||||
assert_eq!(
|
||||
result_without_tsid.rows[0].values,
|
||||
result_with_tsid.rows[0].values
|
||||
);
|
||||
}
|
||||
|
||||
fn expected_sparse_schema() -> Vec<ColumnSchema> {
|
||||
vec![ColumnSchema {
|
||||
column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
|
||||
|
||||
@@ -27,6 +27,7 @@ use futures_util::future;
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::metric_engine_consts::is_metric_engine_internal_column;
|
||||
use table::TableRef;
|
||||
use table::requests::DeleteRequest as TableDeleteRequest;
|
||||
|
||||
@@ -99,9 +100,12 @@ impl Deleter {
|
||||
|
||||
pub async fn handle_table_delete(
|
||||
&self,
|
||||
request: TableDeleteRequest,
|
||||
mut request: TableDeleteRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
request
|
||||
.key_column_values
|
||||
.retain(|col, _| !is_metric_engine_internal_column(col));
|
||||
let catalog = request.catalog_name.as_str();
|
||||
let schema = request.schema_name.as_str();
|
||||
let table = request.table_name.as_str();
|
||||
@@ -227,6 +231,7 @@ impl Deleter {
|
||||
.table_info()
|
||||
.meta
|
||||
.row_key_column_names()
|
||||
.filter(|name| !is_metric_engine_internal_column(name))
|
||||
.cloned()
|
||||
.chain(iter::once(time_index))
|
||||
.collect();
|
||||
|
||||
@@ -38,7 +38,16 @@ impl<'a> TableToRegion<'a> {
|
||||
pub async fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
|
||||
let row_count = row_count(&request.key_column_values)?;
|
||||
let schema = column_schema(self.table_info, &request.key_column_values)?;
|
||||
let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
|
||||
let vectors = schema
|
||||
.iter()
|
||||
.map(|col| {
|
||||
request
|
||||
.key_column_values
|
||||
.get(&col.column_name)
|
||||
.expect("schema column must exist in delete request")
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let rows = api::helper::vectors_to_rows(vectors.into_iter(), row_count);
|
||||
let rows = Rows { schema, rows };
|
||||
|
||||
let requests = Partitioner::new(self.partition_manager)
|
||||
|
||||
@@ -36,6 +36,7 @@ use snafu::{OptionExt, ResultExt, ensure};
|
||||
use sql::ast::ObjectNamePartExt;
|
||||
use sql::statements::insert::Insert;
|
||||
use sqlparser::ast::{ObjectName, Value as SqlValue};
|
||||
use store_api::metric_engine_consts::is_metric_engine_internal_column;
|
||||
use table::TableRef;
|
||||
use table::metadata::TableInfoRef;
|
||||
|
||||
@@ -383,6 +384,7 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St
|
||||
table_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.filter(|column| !is_metric_engine_internal_column(&column.name))
|
||||
.map(|column| &column.name)
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ use futures_util::StreamExt;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use sqlparser::ast::AnalyzeFormat;
|
||||
use store_api::metric_engine_consts::is_metric_engine_internal_column;
|
||||
use table::TableRef;
|
||||
use table::requests::{DeleteRequest, InsertRequest};
|
||||
use tracing::Span;
|
||||
@@ -200,6 +201,7 @@ impl DatafusionQueryEngine {
|
||||
let rowkey_columns = table_info
|
||||
.meta
|
||||
.row_key_column_names()
|
||||
.filter(|name| !is_metric_engine_internal_column(name))
|
||||
.collect::<Vec<&String>>();
|
||||
let column_vectors = column_vectors
|
||||
.into_iter()
|
||||
|
||||
@@ -40,6 +40,7 @@ use sql::statements::explain::ExplainStatement;
|
||||
use sql::statements::query::Query;
|
||||
use sql::statements::statement::Statement;
|
||||
use sql::statements::tql::Tql;
|
||||
use store_api::metric_engine_consts::is_metric_engine_internal_column;
|
||||
|
||||
use crate::error::{
|
||||
CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
|
||||
@@ -232,7 +233,35 @@ impl DfLogicalPlanner {
|
||||
.optimize_by_extension_rules(plan, &context)?;
|
||||
common_telemetry::debug!("Logical planner, optimize result: {plan}");
|
||||
|
||||
Ok(plan)
|
||||
Self::strip_metric_engine_internal_columns(plan)
|
||||
}
|
||||
|
||||
fn strip_metric_engine_internal_columns(plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
let schema = plan.schema();
|
||||
if !schema
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| is_metric_engine_internal_column(field.name()))
|
||||
{
|
||||
return Ok(plan);
|
||||
}
|
||||
|
||||
let project_exprs = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter(|field| !is_metric_engine_internal_column(field.name()))
|
||||
.map(|field| col(field.name()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if project_exprs.is_empty() {
|
||||
return Ok(plan);
|
||||
}
|
||||
|
||||
LogicalPlanBuilder::from(plan)
|
||||
.project(project_exprs)
|
||||
.context(PlanSqlSnafu)?
|
||||
.build()
|
||||
.context(PlanSqlSnafu)
|
||||
}
|
||||
|
||||
/// Generate a relational expression from a SQL expression
|
||||
|
||||
@@ -138,6 +138,13 @@ struct PromPlannerContext {
|
||||
time_index_column: Option<String>,
|
||||
field_columns: Vec<String>,
|
||||
tag_columns: Vec<String>,
|
||||
/// Metric engine internal series identifier column (`__tsid`).
|
||||
///
|
||||
/// This column is optional: it is present only when the underlying table schema contains
|
||||
/// [`DATA_SCHEMA_TSID_COLUMN_NAME`] with `UInt64` type. The planner uses it internally as the
|
||||
/// series key for plans like [`SeriesDivide`] when available, and strips it from the final
|
||||
/// output.
|
||||
tsid_column: Option<String>,
|
||||
/// The matcher for field columns `__field__`.
|
||||
field_column_matcher: Option<Vec<Matcher>>,
|
||||
/// The matcher for selectors (normal matchers).
|
||||
@@ -164,6 +171,7 @@ impl PromPlannerContext {
|
||||
self.time_index_column = None;
|
||||
self.field_columns = vec![];
|
||||
self.tag_columns = vec![];
|
||||
self.tsid_column = None;
|
||||
self.field_column_matcher = None;
|
||||
self.selector_matcher.clear();
|
||||
self.schema_name = None;
|
||||
@@ -204,11 +212,14 @@ impl PromPlanner {
|
||||
.await?;
|
||||
|
||||
// Apply alias if provided
|
||||
if let Some(alias_name) = alias {
|
||||
planner.apply_alias_projection(plan, alias_name)
|
||||
let plan = if let Some(alias_name) = alias {
|
||||
planner.apply_alias_projection(plan, alias_name)?
|
||||
} else {
|
||||
Ok(plan)
|
||||
}
|
||||
plan
|
||||
};
|
||||
|
||||
// Never leak internal series identifier to output.
|
||||
planner.strip_tsid_column(plan)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -342,19 +353,42 @@ impl PromPlanner {
|
||||
} = aggr_expr;
|
||||
|
||||
let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
|
||||
let input_has_tsid = input.schema().fields().iter().any(|field| {
|
||||
field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
&& field.data_type() == &ArrowDataType::UInt64
|
||||
});
|
||||
|
||||
match (*op).id() {
|
||||
token::T_TOPK | token::T_BOTTOMK => {
|
||||
self.prom_topk_bottomk_to_plan(aggr_expr, input).await
|
||||
}
|
||||
_ => {
|
||||
let input_tag_columns = self.ctx.tag_columns.clone();
|
||||
// calculate columns to group by
|
||||
// Need to append time index column into group by columns
|
||||
let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
|
||||
// convert op and value columns to aggregate exprs
|
||||
let (aggr_exprs, prev_field_exprs) =
|
||||
let (mut aggr_exprs, prev_field_exprs) =
|
||||
self.create_aggregate_exprs(*op, param, &input)?;
|
||||
|
||||
let keep_tsid = op.id() != token::T_COUNT_VALUES
|
||||
&& input_has_tsid
|
||||
&& input_tag_columns.iter().collect::<HashSet<_>>()
|
||||
== self.ctx.tag_columns.iter().collect::<HashSet<_>>();
|
||||
|
||||
if keep_tsid {
|
||||
aggr_exprs.push(
|
||||
first_value(
|
||||
DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
|
||||
vec![],
|
||||
)
|
||||
.alias(DATA_SCHEMA_TSID_COLUMN_NAME),
|
||||
);
|
||||
self.ctx.tsid_column = Some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
|
||||
} else {
|
||||
self.ctx.tsid_column = None;
|
||||
}
|
||||
|
||||
// create plan
|
||||
let builder = LogicalPlanBuilder::from(input);
|
||||
let builder = if op.id() == token::T_COUNT_VALUES {
|
||||
@@ -404,6 +438,12 @@ impl PromPlanner {
|
||||
..
|
||||
} = aggr_expr;
|
||||
|
||||
let input_has_tsid = input.schema().fields().iter().any(|field| {
|
||||
field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
&& field.data_type() == &ArrowDataType::UInt64
|
||||
});
|
||||
self.ctx.tsid_column = input_has_tsid.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
|
||||
|
||||
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
|
||||
|
||||
let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
|
||||
@@ -452,6 +492,13 @@ impl PromPlanner {
|
||||
.create_field_column_exprs()?
|
||||
.into_iter()
|
||||
.chain(self.create_tag_column_exprs()?)
|
||||
.chain(
|
||||
self.ctx
|
||||
.tsid_column
|
||||
.as_ref()
|
||||
.map(|_| DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)))
|
||||
.into_iter(),
|
||||
)
|
||||
.chain(Some(self.create_time_index_column_expr()?));
|
||||
|
||||
LogicalPlanBuilder::from(input)
|
||||
@@ -1147,6 +1194,15 @@ impl PromPlanner {
|
||||
.into_iter()
|
||||
.map(|col| DfExpr::Column(Column::new_unqualified(col)))
|
||||
.chain(self.create_tag_column_exprs()?)
|
||||
.chain(
|
||||
self.ctx
|
||||
.tsid_column
|
||||
.as_ref()
|
||||
.map(|_| {
|
||||
DfExpr::Column(Column::new_unqualified(DATA_SCHEMA_TSID_COLUMN_NAME))
|
||||
})
|
||||
.into_iter(),
|
||||
)
|
||||
.chain(Some(self.create_time_index_column_expr()?))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -1159,8 +1215,23 @@ impl PromPlanner {
|
||||
}
|
||||
|
||||
// make sort plan
|
||||
let series_key_columns = if self.ctx.tsid_column.is_some() {
|
||||
vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
|
||||
} else {
|
||||
self.ctx.tag_columns.clone()
|
||||
};
|
||||
|
||||
let sort_exprs = if self.ctx.tsid_column.is_some() {
|
||||
vec![
|
||||
DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
|
||||
self.create_time_index_column_expr()?.sort(true, true),
|
||||
]
|
||||
} else {
|
||||
self.create_tag_and_time_index_column_sort_exprs()?
|
||||
};
|
||||
|
||||
let sort_plan = LogicalPlanBuilder::from(table_scan)
|
||||
.sort(self.create_tag_and_time_index_column_sort_exprs()?)
|
||||
.sort(sort_exprs)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
@@ -1175,7 +1246,7 @@ impl PromPlanner {
|
||||
})?;
|
||||
let divide_plan = LogicalPlan::Extension(Extension {
|
||||
node: Arc::new(SeriesDivide::new(
|
||||
self.ctx.tag_columns.clone(),
|
||||
series_key_columns.clone(),
|
||||
time_index_column,
|
||||
sort_plan,
|
||||
)),
|
||||
@@ -1194,7 +1265,7 @@ impl PromPlanner {
|
||||
table: table_ref.to_quoted_string(),
|
||||
})?,
|
||||
is_range_selector,
|
||||
self.ctx.tag_columns.clone(),
|
||||
series_key_columns,
|
||||
divide_plan,
|
||||
);
|
||||
let logical_plan = LogicalPlan::Extension(Extension {
|
||||
@@ -1486,6 +1557,18 @@ impl PromPlanner {
|
||||
.iter()
|
||||
.map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
|
||||
.chain(self.create_tag_column_exprs()?)
|
||||
.chain(
|
||||
self.ctx
|
||||
.tsid_column
|
||||
.as_ref()
|
||||
.map(|_| {
|
||||
DfExpr::Column(Column::new(
|
||||
Some(table_ref.clone()),
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
))
|
||||
})
|
||||
.into_iter(),
|
||||
)
|
||||
.chain(Some(DfExpr::Alias(Alias {
|
||||
expr: Box::new(DfExpr::Cast(Cast {
|
||||
expr: Box::new(self.create_time_index_column_expr()?),
|
||||
@@ -1571,6 +1654,15 @@ impl PromPlanner {
|
||||
.collect();
|
||||
self.ctx.tag_columns = tags;
|
||||
|
||||
// Set internal tsid column if available from underlying storage engine.
|
||||
self.ctx.tsid_column = table
|
||||
.schema()
|
||||
.column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.and_then(|col| {
|
||||
matches!(col.data_type, ConcreteDataType::UInt64(_))
|
||||
.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string())
|
||||
});
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -1581,6 +1673,7 @@ impl PromPlanner {
|
||||
self.ctx.reset_table_name_and_schema();
|
||||
self.ctx.tag_columns = vec![];
|
||||
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
|
||||
self.ctx.tsid_column = None;
|
||||
|
||||
// The table doesn't have any data, so we set start to 0 and end to -1.
|
||||
let plan = LogicalPlan::Extension(Extension {
|
||||
@@ -2988,6 +3081,7 @@ impl PromPlanner {
|
||||
.chain([left_time_index.clone()])
|
||||
.collect::<Vec<_>>();
|
||||
self.ctx.time_index_column = Some(left_time_index.clone());
|
||||
self.ctx.tsid_column = left_context.tsid_column.clone();
|
||||
|
||||
// alias right time index column if necessary
|
||||
if left_context.time_index_column != right_context.time_index_column {
|
||||
@@ -3127,6 +3221,16 @@ impl PromPlanner {
|
||||
// Take the name of first field column. The length is checked above.
|
||||
let left_field_col = left_context.field_columns.first().unwrap();
|
||||
let right_field_col = right_context.field_columns.first().unwrap();
|
||||
let left_has_tsid = left
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
|
||||
let right_has_tsid = right
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
|
||||
|
||||
// step 0: fill all columns in output schema
|
||||
let mut all_columns_set = left
|
||||
@@ -3136,6 +3240,11 @@ impl PromPlanner {
|
||||
.chain(right.schema().fields().iter())
|
||||
.map(|field| field.name().clone())
|
||||
.collect::<HashSet<_>>();
|
||||
// Keep `__tsid` only when both sides contain it, otherwise it may break schema alignment
|
||||
// (e.g. `unknown_metric or some_metric`).
|
||||
if !(left_has_tsid && right_has_tsid) {
|
||||
all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
|
||||
}
|
||||
// remove time index column
|
||||
all_columns_set.remove(&left_time_index_column);
|
||||
all_columns_set.remove(&right_time_index_column);
|
||||
@@ -3243,6 +3352,8 @@ impl PromPlanner {
|
||||
self.ctx.time_index_column = Some(left_time_index_column);
|
||||
self.ctx.tag_columns = all_tags.into_iter().collect();
|
||||
self.ctx.field_columns = vec![left_field_col.clone()];
|
||||
self.ctx.tsid_column =
|
||||
(left_has_tsid && right_has_tsid).then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -3349,6 +3460,30 @@ impl PromPlanner {
|
||||
Ok(fn_expr)
|
||||
}
|
||||
|
||||
fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
let schema = plan.schema();
|
||||
if !schema
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
{
|
||||
return Ok(plan);
|
||||
}
|
||||
|
||||
let project_exprs = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
LogicalPlanBuilder::from(plan)
|
||||
.project(project_exprs)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)
|
||||
}
|
||||
|
||||
/// Apply an alias to the query result by adding a projection with the alias name
|
||||
fn apply_alias_projection(
|
||||
&mut self,
|
||||
@@ -3517,6 +3652,84 @@ mod test {
|
||||
)
|
||||
}
|
||||
|
||||
async fn build_test_table_provider_with_tsid(
|
||||
table_name_tuples: &[(String, String)],
|
||||
num_tag: usize,
|
||||
num_field: usize,
|
||||
) -> DfTableSourceProvider {
|
||||
let catalog_list = MemoryCatalogManager::with_default_setup();
|
||||
for (schema_name, table_name) in table_name_tuples {
|
||||
let mut columns = vec![];
|
||||
for i in 0..num_tag {
|
||||
columns.push(ColumnSchema::new(
|
||||
format!("tag_{i}"),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
));
|
||||
}
|
||||
columns.push(
|
||||
ColumnSchema::new(
|
||||
"timestamp".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
);
|
||||
for i in 0..num_field {
|
||||
columns.push(ColumnSchema::new(
|
||||
format!("field_{i}"),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
));
|
||||
}
|
||||
columns.push(ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
));
|
||||
|
||||
let schema = Arc::new(Schema::new(columns));
|
||||
|
||||
let tsid_idx = num_tag + 1 + num_field;
|
||||
let mut primary_key_indices = (0..num_tag).collect::<Vec<_>>();
|
||||
primary_key_indices.push(tsid_idx);
|
||||
|
||||
let table_meta = TableMetaBuilder::empty()
|
||||
.schema(schema)
|
||||
.primary_key_indices(primary_key_indices)
|
||||
.value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
|
||||
.next_column_id(1024)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.name(table_name.clone())
|
||||
.meta(table_meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table = EmptyTable::from_table_info(&table_info);
|
||||
|
||||
assert!(
|
||||
catalog_list
|
||||
.register_table_sync(RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
table_id: 1024,
|
||||
table,
|
||||
})
|
||||
.is_ok()
|
||||
);
|
||||
}
|
||||
|
||||
DfTableSourceProvider::new(
|
||||
catalog_list,
|
||||
false,
|
||||
QueryContext::arc(),
|
||||
DummyDecoder::arc(),
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
||||
async fn build_test_table_provider_with_fields(
|
||||
table_name_tuples: &[(String, String)],
|
||||
tags: &[&str],
|
||||
@@ -3876,6 +4089,135 @@ mod test {
|
||||
do_aggregate_expr_plan("sum", "sum").await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_is_used_for_series_divide_when_available() {
|
||||
let prom_expr = parser::parse("some_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
|
||||
assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
|
||||
assert!(
|
||||
!plan
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
|
||||
let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
|
||||
assert!(
|
||||
!plan
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
);
|
||||
|
||||
// Merging aggregate: label set is reduced, tsid should not be carried.
|
||||
let prom_expr = parser::parse("sum(some_metric)").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(!plan_str.contains("first_value"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn or_operator_with_unknown_metric_does_not_require_tsid() {
|
||||
let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
!plan
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn aggregate_avg() {
|
||||
do_aggregate_expr_plan("avg", "avg").await;
|
||||
|
||||
@@ -250,8 +250,9 @@ fn create_table_constraints(
|
||||
column: Ident::with_quote(quote_style, column_name),
|
||||
});
|
||||
}
|
||||
if !table_meta.primary_key_indices.is_empty() {
|
||||
let columns = primary_key_columns_for_show_create(table_meta, engine)
|
||||
let primary_key_columns = primary_key_columns_for_show_create(table_meta, engine);
|
||||
if !primary_key_columns.is_empty() {
|
||||
let columns = primary_key_columns
|
||||
.into_iter()
|
||||
.map(|name| Ident::with_quote(quote_style, name))
|
||||
.collect();
|
||||
@@ -314,6 +315,7 @@ mod tests {
|
||||
use datatypes::schema::{
|
||||
FulltextOptions, Schema, SchemaRef, SkippingIndexOptions, VectorIndexOptions,
|
||||
};
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
|
||||
use table::metadata::*;
|
||||
use table::requests::{
|
||||
FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions,
|
||||
@@ -482,4 +484,51 @@ WITH(
|
||||
sql
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_show_create_metric_table_empty_primary_key_is_omitted() {
|
||||
let schema = vec![
|
||||
ColumnSchema::new(
|
||||
"greptime_timestamp",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
ColumnSchema::new("greptime_value", ConcreteDataType::float64_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
];
|
||||
let table_schema = SchemaRef::new(Schema::new(schema));
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(table_schema)
|
||||
.primary_key_indices(vec![2])
|
||||
.value_indices(vec![0, 1])
|
||||
.engine("metric".to_string())
|
||||
.next_column_id(0)
|
||||
.options(Default::default())
|
||||
.created_on(Default::default())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let info = Arc::new(
|
||||
TableInfoBuilder::default()
|
||||
.table_id(1024)
|
||||
.table_version(0 as TableVersion)
|
||||
.name("test_metric_table")
|
||||
.schema_name("public".to_string())
|
||||
.catalog_name("greptime".to_string())
|
||||
.desc(None)
|
||||
.table_type(TableType::Base)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let stmt = create_table_stmt(&info, None, '"').unwrap();
|
||||
let sql = format!("\n{}", stmt);
|
||||
assert!(!sql.contains("PRIMARY KEY"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ use datafusion_expr::LogicalPlan;
|
||||
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snap::raw::{Decoder, Encoder};
|
||||
use store_api::metric_engine_consts::is_metric_engine_internal_column;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::row_writer::{self, MultiTableData};
|
||||
@@ -269,7 +270,9 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<Ti
|
||||
let columns = column_names
|
||||
.enumerate()
|
||||
.filter(|(_, column_name)| {
|
||||
*column_name != greptime_timestamp() && *column_name != greptime_value()
|
||||
*column_name != greptime_timestamp()
|
||||
&& *column_name != greptime_value()
|
||||
&& !is_metric_engine_internal_column(column_name.as_str())
|
||||
})
|
||||
.map(|(i, column_name)| {
|
||||
(
|
||||
|
||||
@@ -33,6 +33,7 @@ DESC TABLE t1;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| host | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
@@ -43,6 +44,7 @@ DESC TABLE t2;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| job | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
@@ -74,6 +76,7 @@ DESC TABLE t1;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| host | String | PRI | YES | | TAG |
|
||||
| k | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
@@ -85,6 +88,7 @@ DESC TABLE t2;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| job | String | PRI | YES | | TAG |
|
||||
| k | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
USE public;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE system_metrics (
|
||||
host STRING,
|
||||
idc STRING,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
USE public;
|
||||
|
||||
CREATE TABLE system_metrics (
|
||||
host STRING,
|
||||
idc STRING,
|
||||
|
||||
@@ -101,6 +101,7 @@ DESC TABLE t1;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| host | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
@@ -111,6 +112,7 @@ DESC TABLE t2;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| job | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
|
||||
@@ -432,7 +432,8 @@ EXPLAIN select * from logical_table_4;
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Projection: logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts |
|
||||
|_| Projection: logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts_|
|
||||
|_|_Projection: logical_table_4.__tsid, logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts |
|
||||
|_|_TableScan: logical_table_4_|
|
||||
|_| ]]_|
|
||||
| physical_plan | CooperativeExec_|
|
||||
|
||||
@@ -64,14 +64,14 @@ DESC TABLE phy;
|
||||
|
||||
SELECT ts, val, __tsid, host, job FROM phy;
|
||||
|
||||
+-------------------------+-----+----------------------+-------+------+
|
||||
| ts | val | __tsid | host | job |
|
||||
+-------------------------+-----+----------------------+-------+------+
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 7947983149541006936 | host2 | |
|
||||
| 1970-01-01T00:00:00 | 0.0 | 13882403126406556045 | host1 | |
|
||||
| 1970-01-01T00:00:00 | 0.0 | 6248409809737953425 | | job1 |
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | 12867770218286207316 | | job2 |
|
||||
+-------------------------+-----+----------------------+-------+------+
|
||||
+-------------------------+-----+-------+------+
|
||||
| ts | val | host | job |
|
||||
+-------------------------+-----+-------+------+
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | host2 | |
|
||||
| 1970-01-01T00:00:00 | 0.0 | host1 | |
|
||||
| 1970-01-01T00:00:00 | 0.0 | | job1 |
|
||||
| 1970-01-01T00:00:00.001 | 1.0 | | job2 |
|
||||
+-------------------------+-----+-------+------+
|
||||
|
||||
DROP TABLE phy;
|
||||
|
||||
|
||||
105
tests/cases/standalone/tql-explain-analyze/tsid_column.result
Normal file
105
tests/cases/standalone/tql-explain-analyze/tsid_column.result
Normal file
@@ -0,0 +1,105 @@
|
||||
CREATE TABLE tsid_physical (
|
||||
ts TIMESTAMP(3) TIME INDEX,
|
||||
val DOUBLE,
|
||||
) ENGINE = metric WITH ("physical_metric_table" = "");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE tsid_metric (
|
||||
job STRING NULL,
|
||||
instance STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
val DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(job, instance),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_physical'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO tsid_metric VALUES
|
||||
('job1', 'instance1', 0, 1),
|
||||
('job1', 'instance2', 0, 2),
|
||||
('job1', 'instance1', 5000, 3),
|
||||
('job1', 'instance2', 5000, 4),
|
||||
('job1', 'instance1', 10000, 5),
|
||||
('job1', 'instance2', 10000, 6);
|
||||
|
||||
Affected Rows: 6
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_CooperativeExec REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[ts@1 as ts, val@2 as val] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_SortExec: expr=[__tsid@0 ASC, ts@1 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 3_|
|
||||
+-+-+-+
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_CooperativeExec REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@3 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_SortExec: expr=[__tsid@0 ASC, ts@3 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 6_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE tsid_metric;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE tsid_physical;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
47
tests/cases/standalone/tql-explain-analyze/tsid_column.sql
Normal file
47
tests/cases/standalone/tql-explain-analyze/tsid_column.sql
Normal file
@@ -0,0 +1,47 @@
|
||||
CREATE TABLE tsid_physical (
|
||||
ts TIMESTAMP(3) TIME INDEX,
|
||||
val DOUBLE,
|
||||
) ENGINE = metric WITH ("physical_metric_table" = "");
|
||||
|
||||
CREATE TABLE tsid_metric (
|
||||
job STRING NULL,
|
||||
instance STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
val DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(job, instance),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_physical'
|
||||
);
|
||||
|
||||
INSERT INTO tsid_metric VALUES
|
||||
('job1', 'instance1', 0, 1),
|
||||
('job1', 'instance2', 0, 2),
|
||||
('job1', 'instance1', 5000, 3),
|
||||
('job1', 'instance2', 5000, 4),
|
||||
('job1', 'instance1', 10000, 5),
|
||||
('job1', 'instance2', 10000, 6);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
|
||||
|
||||
DROP TABLE tsid_metric;
|
||||
DROP TABLE tsid_physical;
|
||||
|
||||
Reference in New Issue
Block a user