mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-22 16:00:38 +00:00
perf: join metrics tables on the tsid key whenever possible (#7927)
* feat: prefilter flat parquet scans by primary key
* perf: skip redundant series divide repartitions
* perf: optimize tsid promql join planning
* perf: preserve tsid distribution through merge scans
* perf: remove redundant tsid join repartitions
* fix multi-field join case
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* Revert "feat: prefilter flat parquet scans by primary key"
This reverts commit 767c3b44c8.
* simplification
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* isolate rule into a dedicated mod
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* remove rule
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* more sqlness cases
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* fix filter join case
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* fix: normalize sqlness repartition input count
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* fix: normalize sqlness partition count in promql regression
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* fix: normalize sqlness hash partition fanout
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* simplification
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
---------
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -433,14 +433,15 @@ impl MergeScanExec {
|
||||
.values()
|
||||
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
|
||||
.collect();
|
||||
let mut overlaps = vec![];
|
||||
for expr in &hash_exprs {
|
||||
if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
|
||||
&& all_partition_col_aliases.contains(col_expr.name())
|
||||
{
|
||||
overlaps.push(expr.clone());
|
||||
}
|
||||
}
|
||||
let overlaps: Vec<_> = hash_exprs
|
||||
.iter()
|
||||
.filter(|expr| {
|
||||
expr.as_any()
|
||||
.downcast_ref::<Column>()
|
||||
.is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if overlaps.is_empty() {
|
||||
return None;
|
||||
|
||||
@@ -17,8 +17,10 @@ use std::sync::Arc;
|
||||
use datafusion::config::ConfigOptions;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::physical_plan::projection::ProjectionExec;
|
||||
use datafusion_common::Result as DfResult;
|
||||
use datafusion_physical_expr::Distribution;
|
||||
use datafusion_physical_expr::utils::map_columns_before_projection;
|
||||
|
||||
use crate::dist_plan::MergeScanExec;
|
||||
|
||||
@@ -83,6 +85,9 @@ impl PassDistribution {
|
||||
let mut new_children = Vec::with_capacity(children.len());
|
||||
for (idx, child) in children.into_iter().enumerate() {
|
||||
let child_req = match required.get(idx) {
|
||||
Some(Distribution::UnspecifiedDistribution) if idx == 0 => {
|
||||
Self::map_hash_requirement_through_projection(plan.as_ref(), ¤t_req)
|
||||
}
|
||||
Some(Distribution::UnspecifiedDistribution) => None,
|
||||
None => current_req.clone(),
|
||||
Some(req) => Some(req.clone()),
|
||||
@@ -103,4 +108,200 @@ impl PassDistribution {
|
||||
plan.with_new_children(new_children)
|
||||
}
|
||||
}
|
||||
|
||||
fn map_hash_requirement_through_projection(
|
||||
plan: &dyn ExecutionPlan,
|
||||
current_req: &Option<Distribution>,
|
||||
) -> Option<Distribution> {
|
||||
let Some(Distribution::HashPartitioned(required_exprs)) = current_req else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let projection = plan.as_any().downcast_ref::<ProjectionExec>()?;
|
||||
let proj_exprs = projection
|
||||
.expr()
|
||||
.iter()
|
||||
.map(|expr| (Arc::clone(&expr.expr), expr.alias.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
let mapped = map_columns_before_projection(required_exprs, &proj_exprs);
|
||||
|
||||
(mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
|
||||
use async_trait::async_trait;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datafusion::common::NullEquality;
|
||||
use datafusion::execution::SessionStateBuilder;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
|
||||
use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
|
||||
use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning};
|
||||
use datafusion_expr::{JoinType, LogicalPlanBuilder};
|
||||
use datafusion_physical_expr::PhysicalExpr;
|
||||
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
|
||||
use session::ReadPreference;
|
||||
use session::context::QueryContext;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
|
||||
use store_api::storage::RegionId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Result as QueryResult;
|
||||
use crate::region_query::RegionQueryHandler;
|
||||
|
||||
struct NoopRegionQueryHandler;
|
||||
|
||||
#[async_trait]
|
||||
impl RegionQueryHandler for NoopRegionQueryHandler {
|
||||
async fn do_get(
|
||||
&self,
|
||||
_read_preference: ReadPreference,
|
||||
_request: QueryRequest,
|
||||
) -> QueryResult<SendableRecordBatchStream> {
|
||||
unreachable!("pass distribution tests should not execute remote queries")
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn passes_hash_requirement_through_projection_to_merge_scan() {
|
||||
let schema = test_schema();
|
||||
let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
|
||||
let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
|
||||
let left_projection = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![
|
||||
ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"),
|
||||
ProjectionExpr::new(
|
||||
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
),
|
||||
ProjectionExpr::new(
|
||||
partition_column("greptime_timestamp", 2),
|
||||
"greptime_timestamp",
|
||||
),
|
||||
],
|
||||
left_merge_scan,
|
||||
)
|
||||
.unwrap(),
|
||||
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
|
||||
let join = Arc::new(
|
||||
HashJoinExec::try_new(
|
||||
left_projection,
|
||||
right_merge_scan,
|
||||
vec![
|
||||
(
|
||||
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
|
||||
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
|
||||
),
|
||||
(
|
||||
partition_column("greptime_timestamp", 2),
|
||||
partition_column("greptime_timestamp", 2),
|
||||
),
|
||||
],
|
||||
None,
|
||||
&JoinType::Inner,
|
||||
None,
|
||||
PartitionMode::Partitioned,
|
||||
NullEquality::NullEqualsNull,
|
||||
false,
|
||||
)
|
||||
.unwrap(),
|
||||
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
|
||||
|
||||
let optimized = PassDistribution
|
||||
.optimize(join, &ConfigOptions::default())
|
||||
.unwrap();
|
||||
let hash_join = optimized.as_any().downcast_ref::<HashJoinExec>().unwrap();
|
||||
let left_projection = hash_join
|
||||
.left()
|
||||
.as_any()
|
||||
.downcast_ref::<ProjectionExec>()
|
||||
.unwrap();
|
||||
let left_partitioning = left_projection.input().output_partitioning();
|
||||
let right_partitioning = hash_join.right().output_partitioning();
|
||||
|
||||
let Partitioning::Hash(left_exprs, left_count) = left_partitioning else {
|
||||
panic!("expected left merge scan hash partitioning");
|
||||
};
|
||||
let Partitioning::Hash(right_exprs, right_count) = right_partitioning else {
|
||||
panic!("expected right merge scan hash partitioning");
|
||||
};
|
||||
|
||||
assert_eq!(*left_count, 32);
|
||||
assert_eq!(*right_count, 32);
|
||||
assert_eq!(
|
||||
column_names(left_exprs),
|
||||
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
|
||||
);
|
||||
assert_eq!(
|
||||
column_names(right_exprs),
|
||||
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
|
||||
);
|
||||
}
|
||||
|
||||
fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec {
|
||||
let session_state = SessionStateBuilder::new().with_default_features().build();
|
||||
let partition_cols = BTreeMap::from([
|
||||
(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
BTreeSet::from([datafusion_common::Column::from_name(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
)]),
|
||||
),
|
||||
(
|
||||
"greptime_timestamp".to_string(),
|
||||
BTreeSet::from([datafusion_common::Column::from_name("greptime_timestamp")]),
|
||||
),
|
||||
]);
|
||||
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
|
||||
|
||||
MergeScanExec::new(
|
||||
&session_state,
|
||||
TableName::new("greptime", "public", "test"),
|
||||
vec![RegionId::new(1, 0), RegionId::new(1, 1)],
|
||||
plan,
|
||||
schema.as_ref(),
|
||||
Arc::new(NoopRegionQueryHandler),
|
||||
QueryContext::arc(),
|
||||
32,
|
||||
partition_cols,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn test_schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
|
||||
Field::new(
|
||||
"greptime_timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new("greptime_value", DataType::Float64, true),
|
||||
]))
|
||||
}
|
||||
|
||||
fn partition_column(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
|
||||
Arc::new(PhysicalColumn::new(name, index))
|
||||
}
|
||||
|
||||
fn column_names(exprs: &[Arc<dyn PhysicalExpr>]) -> Vec<&str> {
|
||||
exprs
|
||||
.iter()
|
||||
.map(|expr| {
|
||||
expr.as_any()
|
||||
.downcast_ref::<PhysicalColumn>()
|
||||
.unwrap()
|
||||
.name()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -709,7 +709,13 @@ impl PromPlanner {
|
||||
self.ctx.table_name = Some("rhs".to_string());
|
||||
}
|
||||
}
|
||||
let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
|
||||
let (output_field_columns, field_columns) =
|
||||
Self::align_binary_field_columns(&left_field_columns, &right_field_columns);
|
||||
// PromQL binary arithmetic only combines the shared prefix of value columns.
|
||||
// Keep the output field count aligned with that zipped prefix so planning
|
||||
// remains stable even when the two sides have uneven multi-field schemas.
|
||||
self.ctx.field_columns = output_field_columns;
|
||||
let mut field_columns = field_columns.into_iter();
|
||||
|
||||
let join_plan = self.join_on_non_field_columns(
|
||||
left_input,
|
||||
@@ -810,11 +816,10 @@ impl PromPlanner {
|
||||
|
||||
// Preserve `__tsid` if present, so it can still be used internally downstream. It's
|
||||
// stripped from the final output anyway.
|
||||
if context.use_tsid
|
||||
&& let Ok(tsid_col) =
|
||||
schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
if let Some(tsid_col) =
|
||||
Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid)
|
||||
{
|
||||
project_exprs.push(DfExpr::Column(tsid_col.into()));
|
||||
project_exprs.push(tsid_col);
|
||||
}
|
||||
|
||||
let plan = LogicalPlanBuilder::from(input)
|
||||
@@ -3393,6 +3398,97 @@ impl PromPlanner {
|
||||
)
|
||||
}
|
||||
|
||||
fn align_binary_field_columns<'a>(
|
||||
left_field_columns: &'a [String],
|
||||
right_field_columns: &'a [String],
|
||||
) -> (Vec<String>, Vec<(&'a String, &'a String)>) {
|
||||
let field_pairs = left_field_columns
|
||||
.iter()
|
||||
.zip(right_field_columns.iter())
|
||||
.collect::<Vec<_>>();
|
||||
let output_field_columns = field_pairs
|
||||
.iter()
|
||||
.map(|(left_col_name, _)| (*left_col_name).clone())
|
||||
.collect();
|
||||
(output_field_columns, field_pairs)
|
||||
}
|
||||
|
||||
fn plan_has_tsid_column(plan: &LogicalPlan) -> bool {
|
||||
plan.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
}
|
||||
|
||||
fn optional_tsid_projection(
|
||||
schema: &DFSchemaRef,
|
||||
table_ref: Option<&TableReference>,
|
||||
keep_tsid: bool,
|
||||
) -> Option<DfExpr> {
|
||||
keep_tsid.then_some(()).and_then(|_| {
|
||||
schema
|
||||
.qualified_field_with_name(table_ref, DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.ok()
|
||||
.map(|field| DfExpr::Column(field.into()))
|
||||
})
|
||||
}
|
||||
|
||||
fn binary_join_key_columns(
|
||||
&self,
|
||||
left: &LogicalPlan,
|
||||
right: &LogicalPlan,
|
||||
only_join_time_index: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> (BTreeSet<String>, BTreeSet<String>) {
|
||||
let use_tsid_join = !only_join_time_index
|
||||
&& modifier.as_ref().is_none_or(|modifier| {
|
||||
modifier.matching.is_none()
|
||||
&& matches!(modifier.card, VectorMatchCardinality::OneToOne)
|
||||
})
|
||||
&& Self::plan_has_tsid_column(left)
|
||||
&& Self::plan_has_tsid_column(right);
|
||||
|
||||
let (mut left_tag_columns, mut right_tag_columns) = if use_tsid_join {
|
||||
(
|
||||
BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]),
|
||||
BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]),
|
||||
)
|
||||
} else {
|
||||
let left_tag_columns = if only_join_time_index {
|
||||
BTreeSet::new()
|
||||
} else {
|
||||
self.ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<BTreeSet<_>>()
|
||||
};
|
||||
let right_tag_columns = left_tag_columns.clone();
|
||||
(left_tag_columns, right_tag_columns)
|
||||
};
|
||||
|
||||
if !use_tsid_join
|
||||
&& let Some(modifier) = modifier
|
||||
&& let Some(matching) = &modifier.matching
|
||||
{
|
||||
match matching {
|
||||
LabelModifier::Include(on) => {
|
||||
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
|
||||
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
|
||||
right_tag_columns = right_tag_columns.intersection(&mask).cloned().collect();
|
||||
}
|
||||
LabelModifier::Exclude(ignoring) => {
|
||||
for label in &ignoring.labels {
|
||||
let _ = left_tag_columns.remove(label);
|
||||
let _ = right_tag_columns.remove(label);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(left_tag_columns, right_tag_columns)
|
||||
}
|
||||
|
||||
/// Build a inner join on time index column and tag columns to concat two logical plans.
|
||||
/// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -3407,40 +3503,8 @@ impl PromPlanner {
|
||||
only_join_time_index: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> Result<LogicalPlan> {
|
||||
let mut left_tag_columns = if only_join_time_index {
|
||||
BTreeSet::new()
|
||||
} else {
|
||||
self.ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<BTreeSet<_>>()
|
||||
};
|
||||
let mut right_tag_columns = left_tag_columns.clone();
|
||||
|
||||
// apply modifier
|
||||
if let Some(modifier) = modifier {
|
||||
// apply label modifier
|
||||
if let Some(matching) = &modifier.matching {
|
||||
match matching {
|
||||
// keeps columns mentioned in `on`
|
||||
LabelModifier::Include(on) => {
|
||||
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
|
||||
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
|
||||
right_tag_columns =
|
||||
right_tag_columns.intersection(&mask).cloned().collect();
|
||||
}
|
||||
// removes columns memtioned in `ignoring`
|
||||
LabelModifier::Exclude(ignoring) => {
|
||||
// doesn't check existence of label
|
||||
for label in &ignoring.labels {
|
||||
let _ = left_tag_columns.remove(label);
|
||||
let _ = right_tag_columns.remove(label);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let (mut left_tag_columns, mut right_tag_columns) =
|
||||
self.binary_join_key_columns(&left, &right, only_join_time_index, modifier);
|
||||
|
||||
// push time index column if it exists
|
||||
if let (Some(left_time_index_column), Some(right_time_index_column)) =
|
||||
@@ -3856,17 +3920,17 @@ impl PromPlanner {
|
||||
where
|
||||
F: FnMut(&String) -> Result<DfExpr>,
|
||||
{
|
||||
let table_ref = self.ctx.table_name.clone().map(TableReference::bare);
|
||||
let non_field_columns_iter = self
|
||||
.ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.chain(self.ctx.time_index_column.iter())
|
||||
.map(|col| {
|
||||
Ok(DfExpr::Column(Column::new(
|
||||
self.ctx.table_name.clone().map(TableReference::bare),
|
||||
col,
|
||||
)))
|
||||
});
|
||||
.map(|col| Ok(DfExpr::Column(Column::new(table_ref.clone(), col))));
|
||||
let tsid_iter =
|
||||
Self::optional_tsid_projection(input.schema(), table_ref.as_ref(), self.ctx.use_tsid)
|
||||
.into_iter()
|
||||
.map(Ok);
|
||||
|
||||
// build computation exprs
|
||||
let result_field_columns = self
|
||||
@@ -3888,6 +3952,7 @@ impl PromPlanner {
|
||||
|
||||
// chain non-field columns (unchanged) and field columns (applied computation then alias)
|
||||
let project_fields = non_field_columns_iter
|
||||
.chain(tsid_iter)
|
||||
.chain(field_columns_iter)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
@@ -4124,6 +4189,18 @@ mod test {
|
||||
assert!(!plan_str.contains("Distinct:"), "{plan_str}");
|
||||
}
|
||||
|
||||
fn build_eval_stmt(expr: &str) -> EvalStmt {
|
||||
EvalStmt {
|
||||
expr: parser::parse(expr).unwrap(),
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_test_table_provider(
|
||||
table_name_tuples: &[(String, String)],
|
||||
num_tag: usize,
|
||||
@@ -4195,11 +4272,27 @@ mod test {
|
||||
table_name_tuples: &[(String, String)],
|
||||
num_tag: usize,
|
||||
num_field: usize,
|
||||
) -> DfTableSourceProvider {
|
||||
let table_specs = table_name_tuples
|
||||
.iter()
|
||||
.map(|(schema_name, table_name)| ((schema_name.clone(), table_name.clone()), num_field))
|
||||
.collect::<Vec<_>>();
|
||||
build_test_table_provider_with_tsid_fields(&table_specs, num_tag).await
|
||||
}
|
||||
|
||||
async fn build_test_table_provider_with_tsid_fields(
|
||||
table_specs: &[((String, String), usize)],
|
||||
num_tag: usize,
|
||||
) -> DfTableSourceProvider {
|
||||
let catalog_list = MemoryCatalogManager::with_default_setup();
|
||||
|
||||
let physical_table_name = "phy";
|
||||
let physical_table_id = 999u32;
|
||||
let physical_num_field = table_specs
|
||||
.iter()
|
||||
.map(|(_, num_field)| *num_field)
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
|
||||
// Register a metric engine physical table with internal columns.
|
||||
{
|
||||
@@ -4230,7 +4323,7 @@ mod test {
|
||||
)
|
||||
.with_time_index(true),
|
||||
);
|
||||
for i in 0..num_field {
|
||||
for i in 0..physical_num_field {
|
||||
columns.push(ColumnSchema::new(
|
||||
format!("field_{i}"),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
@@ -4243,7 +4336,7 @@ mod test {
|
||||
let table_meta = TableMetaBuilder::empty()
|
||||
.schema(schema)
|
||||
.primary_key_indices(primary_key_indices)
|
||||
.value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
|
||||
.value_indices((2 + num_tag..2 + num_tag + 1 + physical_num_field).collect())
|
||||
.engine(METRIC_ENGINE_NAME.to_string())
|
||||
.next_column_id(1024)
|
||||
.build()
|
||||
@@ -4270,7 +4363,7 @@ mod test {
|
||||
}
|
||||
|
||||
// Register metric engine logical tables without `__tsid`, referencing the physical table.
|
||||
for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
|
||||
for (idx, ((schema_name, table_name), num_field)) in table_specs.iter().enumerate() {
|
||||
let mut columns = vec![];
|
||||
for i in 0..num_tag {
|
||||
columns.push(ColumnSchema::new(
|
||||
@@ -4287,7 +4380,7 @@ mod test {
|
||||
)
|
||||
.with_time_index(true),
|
||||
);
|
||||
for i in 0..num_field {
|
||||
for i in 0..*num_field {
|
||||
columns.push(ColumnSchema::new(
|
||||
format!("field_{i}"),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
@@ -4305,7 +4398,7 @@ mod test {
|
||||
let table_meta = TableMetaBuilder::empty()
|
||||
.schema(schema)
|
||||
.primary_key_indices((0..num_tag).collect())
|
||||
.value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
|
||||
.value_indices((num_tag + 1..num_tag + 1 + *num_field).collect())
|
||||
.engine(METRIC_ENGINE_NAME.to_string())
|
||||
.options(options)
|
||||
.next_column_id(1024)
|
||||
@@ -4736,6 +4829,285 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_binary_join_uses_tsid_when_available() {
|
||||
let eval_stmt = build_eval_stmt("some_metric / some_alt_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(
|
||||
plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
|
||||
"{plan_str}"
|
||||
);
|
||||
assert!(
|
||||
!plan_str.contains("some_metric.tag_0 = some_alt_metric.tag_0"),
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_is_preserved_for_nested_default_binary_joins() {
|
||||
let eval_stmt = build_eval_stmt("(some_metric - some_alt_metric) / some_third_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_third_metric".to_string(),
|
||||
),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() {
|
||||
let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn repeated_tsid_binary_operand_keeps_shorter_field_side() {
|
||||
let eval_stmt =
|
||||
build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid_fields(
|
||||
&[
|
||||
(
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"two_field_metric".to_string(),
|
||||
),
|
||||
2,
|
||||
),
|
||||
(
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"one_field_metric".to_string(),
|
||||
),
|
||||
1,
|
||||
),
|
||||
],
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let field_names = plan
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.map(|field| field.name().clone())
|
||||
.collect::<Vec<_>>();
|
||||
let value_columns = field_names
|
||||
.iter()
|
||||
.filter(|name| {
|
||||
*name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
})
|
||||
.count();
|
||||
assert_eq!(value_columns, 1, "{field_names:?}");
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_binary_join_uses_shorter_field_side() {
|
||||
let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid_fields(
|
||||
&[
|
||||
(
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"one_field_metric".to_string(),
|
||||
),
|
||||
1,
|
||||
),
|
||||
(
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"two_field_metric".to_string(),
|
||||
),
|
||||
2,
|
||||
),
|
||||
],
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let field_names = plan
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.map(|field| field.name().clone())
|
||||
.collect::<Vec<_>>();
|
||||
let value_columns = field_names
|
||||
.iter()
|
||||
.filter(|name| {
|
||||
*name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME
|
||||
})
|
||||
.count();
|
||||
assert_eq!(value_columns, 1, "{field_names:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn label_matching_modifier_disables_tsid_binary_join() {
|
||||
let eval_stmt = build_eval_stmt("some_metric / ignoring(tag_0) some_alt_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
2,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(!plan_str.contains("__tsid ="), "{plan_str}");
|
||||
assert!(
|
||||
plan_str.contains("some_metric.tag_1 = some_alt_metric.tag_1"),
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn comparison_binary_join_uses_tsid_and_keeps_it_in_filtered_result() {
|
||||
let eval_stmt = build_eval_stmt("some_metric > some_alt_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
2,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let mut planner = PromPlanner {
|
||||
table_provider,
|
||||
ctx: PromPlannerContext::from_eval_stmt(&eval_stmt),
|
||||
};
|
||||
let plan = planner
|
||||
.prom_expr_to_plan(&eval_stmt.expr, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(
|
||||
plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
|
||||
"{plan_str}"
|
||||
);
|
||||
assert!(
|
||||
plan.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME),
|
||||
"{plan_str}"
|
||||
);
|
||||
assert!(planner.ctx.use_tsid, "{plan_str}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn comparison_bool_binary_join_uses_tsid_when_available() {
|
||||
let eval_stmt = build_eval_stmt("some_metric > bool some_alt_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
2,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert!(
|
||||
plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
|
||||
"{plan_str}"
|
||||
);
|
||||
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
|
||||
assert!(!plan_str.contains("tag_1 ="), "{plan_str}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn scalar_count_count_range_keeps_full_window() {
|
||||
let plan_str = build_optimized_tsid_plan(
|
||||
|
||||
@@ -0,0 +1,237 @@
|
||||
-- Regression test for TSID-backed PromQL binary joins on metric-engine tables.
|
||||
-- Default arithmetic and comparison joins should use `__tsid` when matching is the
|
||||
-- default one-to-one case. Label modifiers still have to stay label-based.
|
||||
CREATE TABLE tsid_binary_join_physical (
|
||||
ts TIMESTAMP(3) TIME INDEX,
|
||||
greptime_value DOUBLE,
|
||||
) ENGINE = metric WITH ("physical_metric_table" = "");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE tsid_binary_join_left (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE tsid_binary_join_right (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 12),
|
||||
('host2', 'job2', 0, 18),
|
||||
('host1', 'job1', 5000, 15),
|
||||
('host2', 'job2', 5000, 21);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 3),
|
||||
('host2', 'job2', 0, 6),
|
||||
('host1', 'job1', 5000, 5),
|
||||
('host2', 'job2', 5000, 7);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- Default vector-vector arithmetic should join on `__tsid` and time index.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[job@1,\sts@2\],.* Hash([job@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[job@2,\sts@4\],.* Hash([job@2, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(job@1, job@2), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([job@1, ts@2],REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, job@2 as job, ts@4 as ts] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([job@2, ts@4],REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Comparison filters can join on `__tsid`, but the filtered result must still behave like
|
||||
-- a regular derived vector downstream.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[ts@4 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job, __tsid@3 as __tsid] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@3, __tsid@1), (ts@4, ts@2)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4], NullsEqual: true REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- `bool` comparison should follow the same TSID-backed matching path.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, CAST(greptime_value@1 < greptime_value@0 AS Float64) as tsid_binary_join_left.greptime_value > tsid_binary_join_right.greptime_value] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
+-------+------+---------------------+------------------------------------------------------------------------------+
|
||||
| host | job | ts | tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value |
|
||||
+-------+------+---------------------+------------------------------------------------------------------------------+
|
||||
| host1 | job1 | 1970-01-01T00:00:00 | 4.0 |
|
||||
| host1 | job1 | 1970-01-01T00:00:05 | 3.0 |
|
||||
| host2 | job2 | 1970-01-01T00:00:00 | 3.0 |
|
||||
| host2 | job2 | 1970-01-01T00:00:05 | 3.0 |
|
||||
+-------+------+---------------------+------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE tsid_binary_join_right;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE tsid_binary_join_left;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE tsid_binary_join_physical;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,106 @@
|
||||
-- Regression test for TSID-backed PromQL binary joins on metric-engine tables.
|
||||
-- Default arithmetic and comparison joins should use `__tsid` when matching is the
|
||||
-- default one-to-one case. Label modifiers still have to stay label-based.
|
||||
|
||||
CREATE TABLE tsid_binary_join_physical (
|
||||
ts TIMESTAMP(3) TIME INDEX,
|
||||
greptime_value DOUBLE,
|
||||
) ENGINE = metric WITH ("physical_metric_table" = "");
|
||||
|
||||
CREATE TABLE tsid_binary_join_left (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
CREATE TABLE tsid_binary_join_right (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 12),
|
||||
('host2', 'job2', 0, 18),
|
||||
('host1', 'job1', 5000, 15),
|
||||
('host2', 'job2', 5000, 21);
|
||||
|
||||
INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 3),
|
||||
('host2', 'job2', 0, 6),
|
||||
('host1', 'job1', 5000, 5),
|
||||
('host2', 'job2', 5000, 7);
|
||||
|
||||
-- Default vector-vector arithmetic should join on `__tsid` and time index.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[job@1,\sts@2\],.* Hash([job@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[job@2,\sts@4\],.* Hash([job@2, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right;
|
||||
|
||||
-- Comparison filters can join on `__tsid`, but the filtered result must still behave like
|
||||
-- a regular derived vector downstream.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right;
|
||||
|
||||
-- `bool` comparison should follow the same TSID-backed matching path.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
DROP TABLE tsid_binary_join_right;
|
||||
DROP TABLE tsid_binary_join_left;
|
||||
DROP TABLE tsid_binary_join_physical;
|
||||
Reference in New Issue
Block a user