mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-03 13:50:40 +00:00
detect table type on planner
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -138,7 +138,6 @@ impl CreateLogicalTablesProcedure {
|
||||
/// Abort(not-retry):
|
||||
/// - Failed to create table metadata.
|
||||
pub async fn on_create_metadata(&mut self) -> Result<Status> {
|
||||
self.add_tsid_column_to_logical_tables();
|
||||
self.update_physical_table_metadata().await?;
|
||||
let table_ids = self.create_logical_tables_metadata().await?;
|
||||
|
||||
|
||||
@@ -17,9 +17,7 @@ use std::ops::Deref;
|
||||
use common_telemetry::{info, warn};
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
@@ -29,20 +27,6 @@ use crate::error::{Result, TableInfoNotFoundSnafu};
|
||||
use crate::instruction::CacheIdent;
|
||||
|
||||
impl CreateLogicalTablesProcedure {
|
||||
pub(crate) fn add_tsid_column_to_logical_tables(&mut self) {
|
||||
for (task, table_id_already_exists) in self
|
||||
.data
|
||||
.tasks
|
||||
.iter_mut()
|
||||
.zip(self.data.table_ids_already_exists.iter())
|
||||
{
|
||||
if table_id_already_exists.is_some() {
|
||||
continue;
|
||||
}
|
||||
add_tsid_column_to_raw_table_info(&mut task.table_info);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
|
||||
if self.data.physical_columns.is_empty() {
|
||||
warn!(
|
||||
@@ -144,58 +128,3 @@ impl CreateLogicalTablesProcedure {
|
||||
Ok(table_ids)
|
||||
}
|
||||
}
|
||||
|
||||
fn add_tsid_column_to_raw_table_info(table_info: &mut RawTableInfo) {
|
||||
if table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.any(|col| col.name == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let should_update_column_ids =
|
||||
table_info.meta.column_ids.len() == table_info.meta.schema.column_schemas.len();
|
||||
let column_index = table_info.meta.schema.column_schemas.len();
|
||||
table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.push(datatypes::schema::ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
datatypes::prelude::ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
));
|
||||
table_info.meta.primary_key_indices.push(column_index);
|
||||
if should_update_column_ids {
|
||||
table_info.meta.column_ids.push(ReservedColumnId::tsid());
|
||||
}
|
||||
table_info.sort_columns();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::ddl::test_util::test_create_logical_table_task;
|
||||
|
||||
#[test]
|
||||
fn add_tsid_preserves_column_ids_when_present() {
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
let schema_len = task.table_info.meta.schema.column_schemas.len();
|
||||
task.table_info.meta.column_ids = (0..schema_len as u32).collect();
|
||||
|
||||
add_tsid_column_to_raw_table_info(&mut task.table_info);
|
||||
|
||||
assert_eq!(
|
||||
task.table_info.meta.column_ids.len(),
|
||||
task.table_info.meta.schema.column_schemas.len()
|
||||
);
|
||||
let name_to_ids = task.table_info.name_to_ids().unwrap();
|
||||
assert_eq!(
|
||||
name_to_ids.get(DATA_SCHEMA_TSID_COLUMN_NAME),
|
||||
Some(&ReservedColumnId::tsid())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -553,7 +553,6 @@ async fn test_on_part_duplicate_alter_request() {
|
||||
assert_eq!(
|
||||
table1_cols,
|
||||
vec![
|
||||
"__tsid".to_string(),
|
||||
"col_0".to_string(),
|
||||
"cpu".to_string(),
|
||||
"host".to_string(),
|
||||
@@ -573,7 +572,6 @@ async fn test_on_part_duplicate_alter_request() {
|
||||
assert_eq!(
|
||||
table2_cols,
|
||||
vec![
|
||||
"__tsid".to_string(),
|
||||
"col_0".to_string(),
|
||||
"cpu".to_string(),
|
||||
"host".to_string(),
|
||||
|
||||
@@ -343,7 +343,7 @@ mod test {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 3, 0, 1]);
|
||||
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
|
||||
assert_eq!(scan_req.filters.len(), 1);
|
||||
assert_eq!(
|
||||
scan_req.filters[0],
|
||||
|
||||
@@ -16,30 +16,13 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error::Result;
|
||||
|
||||
impl MetricEngineInner {
|
||||
fn tsid_column_metadata() -> ColumnMetadata {
|
||||
ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: ReservedColumnId::tsid(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load column metadata of a logical region.
|
||||
///
|
||||
/// The return value is ordered on column name.
|
||||
@@ -71,7 +54,6 @@ impl MetricEngineInner {
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(_, column_metadata)| column_metadata)
|
||||
.chain(std::iter::once(Self::tsid_column_metadata()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Update cache
|
||||
|
||||
@@ -73,7 +73,8 @@ use promql_parser::parser::{
|
||||
use regex::{self, Regex};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::metric_engine_consts::{
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
|
||||
METRIC_ENGINE_NAME,
|
||||
};
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
@@ -1527,7 +1528,7 @@ impl PromPlanner {
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
|
||||
let is_time_index_ms = provider
|
||||
let logical_table = provider
|
||||
.as_any()
|
||||
.downcast_ref::<DefaultTableSource>()
|
||||
.context(UnknownTableSnafu)?
|
||||
@@ -1535,19 +1536,145 @@ impl PromPlanner {
|
||||
.as_any()
|
||||
.downcast_ref::<DfTableProviderAdapter>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table()
|
||||
.table();
|
||||
|
||||
let mut scan_table_ref = table_ref.clone();
|
||||
let mut scan_provider = provider;
|
||||
let mut table_id_filter: Option<u32> = None;
|
||||
|
||||
// If it's a metric engine logical table, scan its physical table directly and filter by
|
||||
// `__table_id = logical_table_id` to get access to internal columns like `__tsid`.
|
||||
if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
|
||||
&& let Some(physical_table_name) = logical_table
|
||||
.table_info()
|
||||
.meta
|
||||
.options
|
||||
.extra_options
|
||||
.get(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
|
||||
TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
|
||||
} else {
|
||||
TableReference::bare(physical_table_name.as_str())
|
||||
};
|
||||
|
||||
let physical_provider = match self
|
||||
.table_provider
|
||||
.resolve_table(physical_table_ref.clone())
|
||||
.await
|
||||
{
|
||||
Ok(provider) => provider,
|
||||
Err(e) if e.status_code() == StatusCode::TableNotFound => {
|
||||
// Fall back to scanning the logical table. It still works, but without
|
||||
// `__tsid` optimization.
|
||||
scan_provider.clone()
|
||||
}
|
||||
Err(e) => return Err(e).context(CatalogSnafu),
|
||||
};
|
||||
|
||||
if !Arc::ptr_eq(&physical_provider, &scan_provider) {
|
||||
// Only rewrite when internal columns exist in physical schema.
|
||||
let physical_table = physical_provider
|
||||
.as_any()
|
||||
.downcast_ref::<DefaultTableSource>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table_provider
|
||||
.as_any()
|
||||
.downcast_ref::<DfTableProviderAdapter>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table();
|
||||
|
||||
let has_table_id = physical_table
|
||||
.schema()
|
||||
.column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
|
||||
.is_some();
|
||||
let has_tsid = physical_table
|
||||
.schema()
|
||||
.column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
|
||||
|
||||
if has_table_id && has_tsid {
|
||||
scan_table_ref = physical_table_ref;
|
||||
scan_provider = physical_provider;
|
||||
table_id_filter = Some(logical_table.table_info().ident.table_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let scan_table = scan_provider
|
||||
.as_any()
|
||||
.downcast_ref::<DefaultTableSource>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table_provider
|
||||
.as_any()
|
||||
.downcast_ref::<DfTableProviderAdapter>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table();
|
||||
|
||||
let use_tsid = table_id_filter.is_some()
|
||||
&& scan_table
|
||||
.schema()
|
||||
.column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
|
||||
self.ctx.tsid_column = use_tsid.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
|
||||
|
||||
let is_time_index_ms = scan_table
|
||||
.schema()
|
||||
.timestamp_column()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table_ref.to_quoted_string(),
|
||||
table: scan_table_ref.to_quoted_string(),
|
||||
})?
|
||||
.data_type
|
||||
== ConcreteDataType::timestamp_millisecond_datatype();
|
||||
|
||||
let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
let scan_projection = if table_id_filter.is_some() {
|
||||
let mut required_columns = HashSet::new();
|
||||
required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
|
||||
required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
|
||||
TimeIndexNotFoundSnafu {
|
||||
table: scan_table_ref.to_quoted_string(),
|
||||
}
|
||||
})?);
|
||||
for col in &self.ctx.tag_columns {
|
||||
required_columns.insert(col.clone());
|
||||
}
|
||||
for col in &self.ctx.field_columns {
|
||||
required_columns.insert(col.clone());
|
||||
}
|
||||
if use_tsid {
|
||||
required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
|
||||
}
|
||||
|
||||
let arrow_schema = scan_table.schema().arrow_schema().clone();
|
||||
Some(
|
||||
arrow_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, field)| required_columns.contains(field.name().as_str()))
|
||||
.map(|(idx, _)| idx)
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut scan_plan =
|
||||
LogicalPlanBuilder::scan(scan_table_ref.clone(), scan_provider, scan_projection)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
if let Some(table_id) = table_id_filter {
|
||||
scan_plan = LogicalPlanBuilder::from(scan_plan)
|
||||
.filter(
|
||||
DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
|
||||
.eq(lit(table_id)),
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
}
|
||||
|
||||
if !is_time_index_ms {
|
||||
// cast to ms if time_index not in Millisecond precision
|
||||
@@ -1555,7 +1682,7 @@ impl PromPlanner {
|
||||
.ctx
|
||||
.field_columns
|
||||
.iter()
|
||||
.map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
|
||||
.map(|col| DfExpr::Column(Column::new(Some(scan_table_ref.clone()), col.clone())))
|
||||
.chain(self.create_tag_column_exprs()?)
|
||||
.chain(
|
||||
self.ctx
|
||||
@@ -1563,7 +1690,7 @@ impl PromPlanner {
|
||||
.as_ref()
|
||||
.map(|_| {
|
||||
DfExpr::Column(Column::new(
|
||||
Some(table_ref.clone()),
|
||||
Some(scan_table_ref.clone()),
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
))
|
||||
})
|
||||
@@ -1574,13 +1701,13 @@ impl PromPlanner {
|
||||
expr: Box::new(self.create_time_index_column_expr()?),
|
||||
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
|
||||
})),
|
||||
relation: Some(table_ref.clone()),
|
||||
relation: Some(scan_table_ref.clone()),
|
||||
name: self
|
||||
.ctx
|
||||
.time_index_column
|
||||
.as_ref()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table_ref.to_quoted_string(),
|
||||
table: scan_table_ref.to_quoted_string(),
|
||||
})?
|
||||
.clone(),
|
||||
metadata: None,
|
||||
@@ -1591,6 +1718,27 @@ impl PromPlanner {
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
} else if table_id_filter.is_some() {
|
||||
// Drop the internal `__table_id` column after filtering.
|
||||
let project_exprs = self
|
||||
.create_field_column_exprs()?
|
||||
.into_iter()
|
||||
.chain(self.create_tag_column_exprs()?)
|
||||
.chain(
|
||||
self.ctx
|
||||
.tsid_column
|
||||
.as_ref()
|
||||
.map(|_| DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)))
|
||||
.into_iter(),
|
||||
)
|
||||
.chain(Some(self.create_time_index_column_expr()?))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
scan_plan = LogicalPlanBuilder::from(scan_plan)
|
||||
.project(project_exprs)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
}
|
||||
|
||||
let result = LogicalPlanBuilder::from(scan_plan)
|
||||
@@ -1654,14 +1802,7 @@ impl PromPlanner {
|
||||
.collect();
|
||||
self.ctx.tag_columns = tags;
|
||||
|
||||
// Set internal tsid column if available from underlying storage engine.
|
||||
self.ctx.tsid_column = table
|
||||
.schema()
|
||||
.column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.and_then(|col| {
|
||||
matches!(col.data_type, ConcreteDataType::UInt64(_))
|
||||
.then_some(DATA_SCHEMA_TSID_COLUMN_NAME.to_string())
|
||||
});
|
||||
self.ctx.tsid_column = None;
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
@@ -3658,7 +3799,80 @@ mod test {
|
||||
num_field: usize,
|
||||
) -> DfTableSourceProvider {
|
||||
let catalog_list = MemoryCatalogManager::with_default_setup();
|
||||
for (schema_name, table_name) in table_name_tuples {
|
||||
|
||||
let physical_table_name = "phy";
|
||||
let physical_table_id = 999u32;
|
||||
|
||||
// Register a metric engine physical table with internal columns.
|
||||
{
|
||||
let mut columns = vec![
|
||||
ColumnSchema::new(
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
];
|
||||
for i in 0..num_tag {
|
||||
columns.push(ColumnSchema::new(
|
||||
format!("tag_{i}"),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
));
|
||||
}
|
||||
columns.push(
|
||||
ColumnSchema::new(
|
||||
"timestamp".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
);
|
||||
for i in 0..num_field {
|
||||
columns.push(ColumnSchema::new(
|
||||
format!("field_{i}"),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
));
|
||||
}
|
||||
|
||||
let schema = Arc::new(Schema::new(columns));
|
||||
let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
|
||||
let table_meta = TableMetaBuilder::empty()
|
||||
.schema(schema)
|
||||
.primary_key_indices(primary_key_indices)
|
||||
.value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
|
||||
.engine(METRIC_ENGINE_NAME.to_string())
|
||||
.next_column_id(1024)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.table_id(physical_table_id)
|
||||
.name(physical_table_name)
|
||||
.meta(table_meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table = EmptyTable::from_table_info(&table_info);
|
||||
|
||||
assert!(
|
||||
catalog_list
|
||||
.register_table_sync(RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: physical_table_name.to_string(),
|
||||
table_id: physical_table_id,
|
||||
table,
|
||||
})
|
||||
.is_ok()
|
||||
);
|
||||
}
|
||||
|
||||
// Register metric engine logical tables without `__tsid`, referencing the physical table.
|
||||
for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
|
||||
let mut columns = vec![];
|
||||
for i in 0..num_tag {
|
||||
columns.push(ColumnSchema::new(
|
||||
@@ -3682,26 +3896,25 @@ mod test {
|
||||
true,
|
||||
));
|
||||
}
|
||||
columns.push(ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
));
|
||||
|
||||
let schema = Arc::new(Schema::new(columns));
|
||||
|
||||
let tsid_idx = num_tag + 1 + num_field;
|
||||
let mut primary_key_indices = (0..num_tag).collect::<Vec<_>>();
|
||||
primary_key_indices.push(tsid_idx);
|
||||
|
||||
let mut options = table::requests::TableOptions::default();
|
||||
options.extra_options.insert(
|
||||
LOGICAL_TABLE_METADATA_KEY.to_string(),
|
||||
physical_table_name.to_string(),
|
||||
);
|
||||
let table_id = 1024u32 + idx as u32;
|
||||
let table_meta = TableMetaBuilder::empty()
|
||||
.schema(schema)
|
||||
.primary_key_indices(primary_key_indices)
|
||||
.primary_key_indices((0..num_tag).collect())
|
||||
.value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
|
||||
.engine(METRIC_ENGINE_NAME.to_string())
|
||||
.options(options)
|
||||
.next_column_id(1024)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.table_id(table_id)
|
||||
.name(table_name.clone())
|
||||
.meta(table_meta)
|
||||
.build()
|
||||
@@ -3714,7 +3927,7 @@ mod test {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
table_id: 1024,
|
||||
table_id,
|
||||
table,
|
||||
})
|
||||
.is_ok()
|
||||
@@ -4125,6 +4338,89 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_is_not_used_when_physical_table_is_missing() {
|
||||
let prom_expr = parser::parse("some_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let catalog_list = MemoryCatalogManager::with_default_setup();
|
||||
|
||||
// Register a metric engine logical table referencing a missing physical table.
|
||||
let mut columns = vec![ColumnSchema::new(
|
||||
"tag_0".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
)];
|
||||
columns.push(
|
||||
ColumnSchema::new(
|
||||
"timestamp".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
);
|
||||
columns.push(ColumnSchema::new(
|
||||
"field_0".to_string(),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
));
|
||||
let schema = Arc::new(Schema::new(columns));
|
||||
let mut options = table::requests::TableOptions::default();
|
||||
options
|
||||
.extra_options
|
||||
.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
|
||||
let table_meta = TableMetaBuilder::empty()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![0])
|
||||
.value_indices(vec![2])
|
||||
.engine(METRIC_ENGINE_NAME.to_string())
|
||||
.options(options)
|
||||
.next_column_id(1024)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.table_id(1024)
|
||||
.name("some_metric")
|
||||
.meta(table_meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table = EmptyTable::from_table_info(&table_info);
|
||||
catalog_list
|
||||
.register_table_sync(RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "some_metric".to_string(),
|
||||
table_id: 1024,
|
||||
table,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let table_provider = DfTableSourceProvider::new(
|
||||
catalog_list,
|
||||
false,
|
||||
QueryContext::arc(),
|
||||
DummyDecoder::arc(),
|
||||
false,
|
||||
);
|
||||
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
|
||||
assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
|
||||
let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
|
||||
|
||||
@@ -33,7 +33,6 @@ DESC TABLE t1;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| host | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
@@ -44,7 +43,6 @@ DESC TABLE t2;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| job | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
@@ -76,7 +74,6 @@ DESC TABLE t1;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| host | String | PRI | YES | | TAG |
|
||||
| k | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
@@ -88,7 +85,6 @@ DESC TABLE t2;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| job | String | PRI | YES | | TAG |
|
||||
| k | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
|
||||
@@ -101,7 +101,6 @@ DESC TABLE t1;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| host | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
@@ -112,7 +111,6 @@ DESC TABLE t2;
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| __tsid | UInt64 | PRI | NO | | TAG |
|
||||
| job | String | PRI | YES | | TAG |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| val | Float64 | | YES | | FIELD |
|
||||
|
||||
@@ -432,8 +432,7 @@ EXPLAIN select * from logical_table_4;
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Projection: logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts_|
|
||||
|_|_Projection: logical_table_4.__tsid, logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts |
|
||||
|_| Projection: logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts |
|
||||
|_|_TableScan: logical_table_4_|
|
||||
|_| ]]_|
|
||||
| physical_plan | CooperativeExec_|
|
||||
|
||||
@@ -780,14 +780,14 @@ tql eval(1000, 2000, '300s') unknown_metric or node_network_transmit_bytes_total
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval(1000, 2000, '300s') sum by (cloud, tag0, tag1) (node_network_transmit_bytes_total) or unknown_metric;
|
||||
|
||||
+---------------------+---------+-------------------------------------------------------+
|
||||
| greptime_timestamp | cloud | sum(node_network_transmit_bytes_total.greptime_value) |
|
||||
+---------------------+---------+-------------------------------------------------------+
|
||||
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
|
||||
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
|
||||
+---------------------+---------+-------------------------------------------------------+
|
||||
+---------------------+---------+-----------------------------------+
|
||||
| greptime_timestamp | cloud | sum(test_physical.greptime_value) |
|
||||
+---------------------+---------+-----------------------------------+
|
||||
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
|
||||
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
|
||||
+---------------------+---------+-----------------------------------+
|
||||
|
||||
-- Or with unknown label and metric.
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
@@ -805,14 +805,14 @@ tql eval(1000, 2000, '300s') unknown_metric or unknown_metric1 or sum by (cloud,
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval(1000, 2000, '300s') sum by (cloud, tag0, tag1) (node_network_transmit_bytes_total) or sum by (cloud, tag0, tag1) (unknown_metric);
|
||||
|
||||
+---------------------+---------+-------------------------------------------------------+
|
||||
| greptime_timestamp | cloud | sum(node_network_transmit_bytes_total.greptime_value) |
|
||||
+---------------------+---------+-------------------------------------------------------+
|
||||
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
|
||||
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
|
||||
+---------------------+---------+-------------------------------------------------------+
|
||||
+---------------------+---------+-----------------------------------+
|
||||
| greptime_timestamp | cloud | sum(test_physical.greptime_value) |
|
||||
+---------------------+---------+-----------------------------------+
|
||||
| 1970-01-01T00:16:40 | cloud-1 | 2500.0 |
|
||||
| 1970-01-01T00:16:40 | cloud-2 | 800.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-1 | 4500.0 |
|
||||
| 1970-01-01T00:21:40 | cloud-2 | 1800.0 |
|
||||
+---------------------+---------+-----------------------------------+
|
||||
|
||||
-- Or with unknown label dst_namespace.
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
|
||||
@@ -47,16 +47,17 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_physical.val)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[ts@1 as ts, val@2 as val] REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_physical.val)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_SortExec: expr=[__tsid@0 ASC, ts@1 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_SortExec: expr=[__tsid@1 ASC, ts@2 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_ProjectionExec: expr=[val@1 as val, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
@@ -80,15 +81,16 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_physical.val), __tsid] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@3 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_physical.val), __tsid] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_SortExec: expr=[__tsid@0 ASC, ts@3 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_SortExec: expr=[__tsid@3 ASC, ts@4 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_ProjectionExec: expr=[val@1 as val, instance@3 as instance, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|
||||
Reference in New Issue
Block a user