diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 1e01166689..67f05961ff 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -241,8 +241,7 @@ impl InstantManipulate { } pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { - let reuse_all_non_sample_columns = - matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid"); + let reuse_tsid_column = matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid"); Arc::new(InstantManipulateExec { start: self.start, @@ -251,7 +250,7 @@ impl InstantManipulate { interval: self.interval, time_index_column: self.time_index_column.clone(), field_column: self.field_column.clone(), - reuse_all_non_sample_columns, + reuse_tsid_column, input: exec_input, metric: ExecutionPlanMetricsSet::new(), }) @@ -313,7 +312,7 @@ pub struct InstantManipulateExec { interval: Millisecond, time_index_column: String, field_column: Option, - reuse_all_non_sample_columns: bool, + reuse_tsid_column: bool, input: Arc, metric: ExecutionPlanMetricsSet, @@ -357,7 +356,7 @@ impl ExecutionPlan for InstantManipulateExec { interval: self.interval, time_index_column: self.time_index_column.clone(), field_column: self.field_column.clone(), - reuse_all_non_sample_columns: self.reuse_all_non_sample_columns, + reuse_tsid_column: self.reuse_tsid_column, input: children[0].clone(), metric: self.metric.clone(), })) @@ -388,10 +387,10 @@ impl ExecutionPlan for InstantManipulateExec { .as_ref() .and_then(|name| schema.column_with_name(name)) .map(|x| x.0); - let reuse_all_non_sample_columns = self.reuse_all_non_sample_columns - && schema - .column_with_name("__tsid") - .is_some_and(|(_, field)| field.data_type() == &DataType::UInt64); + let tsid_index = schema + .column_with_name("__tsid") + .filter(|(_, field)| field.data_type() == &DataType::UInt64) + .map(|(index, _)| index); Ok(Box::pin(InstantManipulateStream { start: self.start, end: self.end, @@ -399,7 +398,8 @@ impl ExecutionPlan for InstantManipulateExec { interval: self.interval, time_index, field_index, - reuse_all_non_sample_columns, + tsid_index, + reuse_tsid_column: self.reuse_tsid_column && tsid_index.is_some(), schema, input, metric: baseline_metric, @@ -465,7 +465,8 @@ pub struct InstantManipulateStream { // Column index of TIME INDEX column's position in schema time_index: usize, field_index: Option, - reuse_all_non_sample_columns: bool, + tsid_index: Option, + reuse_tsid_column: bool, schema: SchemaRef, input: SendableRecordBatchStream, @@ -650,7 +651,7 @@ impl InstantManipulateStream { continue; } - if self.reuse_all_non_sample_columns && Some(index) != self.field_index { + if self.reuse_tsid_column && self.tsid_index == Some(index) { arrays.push(reuse_constant_column(array, output_len)?); continue; } @@ -712,7 +713,7 @@ mod test { interval, time_index_column: TIME_INDEX_COLUMN.to_string(), field_column: Some("value".to_string()), - reuse_all_non_sample_columns: false, + reuse_tsid_column: false, input: memory_exec, metric: ExecutionPlanMetricsSet::new(), }); @@ -860,7 +861,7 @@ mod test { } #[tokio::test] - async fn tsid_fast_path_reuses_non_sample_columns_when_output_grows() { + async fn tsid_fast_path_reuses_tsid_column_when_output_grows() { let schema = Arc::new(Schema::new(vec![ Field::new( TIME_INDEX_COLUMN, @@ -893,7 +894,7 @@ mod test { interval: 500, time_index_column: TIME_INDEX_COLUMN.to_string(), field_column: Some("value".to_string()), - reuse_all_non_sample_columns: true, + reuse_tsid_column: true, input, metric: ExecutionPlanMetricsSet::new(), }); @@ -918,6 +919,67 @@ mod test { ); } + #[tokio::test] + async fn tsid_fast_path_still_takes_additional_field_columns() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + Field::new("value_2", DataType::Float64, true), + Field::new("host", DataType::Utf8, true), + Field::new("__tsid", DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])), + Arc::new(Float64Array::from(vec![1.0, 2.0])), + Arc::new(Float64Array::from(vec![10.0, 20.0])), + Arc::new(datafusion::arrow::array::StringArray::from(vec![ + "foo", "foo", + ])), + Arc::new(UInt64Array::from(vec![42, 42])), + ], + ) + .unwrap(); + let input = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + ))); + let normalize_exec = Arc::new(InstantManipulateExec { + start: 0, + end: 1_500, + lookback_delta: 1_000, + interval: 500, + time_index_column: TIME_INDEX_COLUMN.to_string(), + field_column: Some("value".to_string()), + reuse_tsid_column: true, + input, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + assert_eq!( + result_literal, + "+-------------------------+-------+---------+------+--------+\ + \n| timestamp | value | value_2 | host | __tsid |\ + \n+-------------------------+-------+---------+------+--------+\ + \n| 1970-01-01T00:00:00 | 1.0 | 10.0 | foo | 42 |\ + \n| 1970-01-01T00:00:00.500 | 1.0 | 10.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01 | 2.0 | 20.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01.500 | 2.0 | 20.0 | foo | 42 |\ + \n+-------------------------+-------+---------+------+--------+" + ); + } + #[tokio::test] async fn lookback_10s_interval_30s() { let expected = String::from(