use tsid in manipulate plan, resolve_tag_columns walks whole plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-20 07:06:11 +08:00
parent 456288df87
commit fe2a9f91d4
2 changed files with 111 additions and 8 deletions

View File

@@ -225,14 +225,19 @@ impl InstantManipulate {
return tag_columns.to_vec();
}
let LogicalPlan::Extension(Extension { node }) = input else {
return Vec::new();
};
Self::find_series_divide_tags(input).unwrap_or_default()
}
node.as_any()
.downcast_ref::<SeriesDivide>()
.map(|series_divide| series_divide.tags().to_vec())
.unwrap_or_default()
fn find_series_divide_tags(plan: &LogicalPlan) -> Option<Vec<String>> {
if let LogicalPlan::Extension(Extension { node }) = plan
&& let Some(series_divide) = node.as_any().downcast_ref::<SeriesDivide>()
{
return Some(series_divide.tags().to_vec());
}
plan.inputs()
.into_iter()
.find_map(Self::find_series_divide_tags)
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
@@ -780,6 +785,80 @@ mod test {
assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
}
#[test]
fn rebuild_should_recover_tag_columns_from_series_normalize_input() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let series_divide = LogicalPlan::Extension(Extension {
node: Arc::new(SeriesDivide::new(
vec!["__tsid".to_string()],
TIME_INDEX_COLUMN.to_string(),
input,
)),
});
let series_normalize = LogicalPlan::Extension(Extension {
node: Arc::new(crate::extension_plan::SeriesNormalize::new(
0,
TIME_INDEX_COLUMN,
false,
vec!["__tsid".to_string()],
series_divide,
)),
});
let bytes = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
vec!["__tsid".to_string()],
Some("value".to_string()),
series_normalize.clone(),
)
.serialize();
let plan = InstantManipulate::deserialize(&bytes)
.unwrap()
.with_exprs_and_inputs(vec![], vec![series_normalize])
.unwrap();
assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
}
#[test]
fn to_execution_plan_enables_tsid_fast_path() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
]));
let exec_input: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[], schema, None).unwrap(),
)));
let exec = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
vec!["__tsid".to_string()],
Some("value".to_string()),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(datafusion::common::DFSchema::empty()),
}),
)
.to_execution_plan(exec_input);
assert!(format!("{exec:?}").contains("reuse_all_non_sample_columns: true"));
}
#[tokio::test]
async fn tsid_fast_path_reuses_non_sample_columns_when_output_grows() {
let schema = Arc::new(Schema::new(vec![

View File

@@ -915,7 +915,11 @@ impl PromPlanner {
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.tag_columns.clone(),
if self.ctx.use_tsid {
vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
} else {
self.ctx.tag_columns.clone()
},
self.ctx.field_columns.first().cloned(),
normalize,
);
@@ -4021,6 +4025,10 @@ mod test {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::greptime_timestamp;
use common_query::test_util::DummyDecoder;
use datafusion::arrow::datatypes::Schema as ArrowSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::Extension;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use promql_parser::label::Labels;
@@ -4033,6 +4041,16 @@ mod test {
use crate::options::QueryOptions;
use crate::parser::QueryLanguageParser;
fn find_instant_manipulate(plan: &LogicalPlan) -> Option<&InstantManipulate> {
if let LogicalPlan::Extension(Extension { node }) = plan
&& let Some(instant_manipulate) = node.as_any().downcast_ref::<InstantManipulate>()
{
return Some(instant_manipulate);
}
plan.inputs().into_iter().find_map(find_instant_manipulate)
}
fn build_query_engine_state() -> QueryEngineState {
QueryEngineState::new(
new_memory_catalog_manager().unwrap(),
@@ -4657,6 +4675,12 @@ mod test {
.iter()
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
);
let manipulate = find_instant_manipulate(&plan).unwrap();
let exec = manipulate.to_execution_plan(Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[], Arc::new(ArrowSchema::empty()), None).unwrap(),
))));
assert!(format!("{exec:?}").contains("reuse_all_non_sample_columns: true"));
}
#[tokio::test]