From 456288df873da5e69661f328812bc6dc26276b3c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 14 Mar 2026 12:32:36 +0800 Subject: [PATCH] feat: some optimistic paths for instant manipulate Signed-off-by: Ruihang Xia --- .../src/extension_plan/instant_manipulate.rs | 193 ++++++++++++++++-- src/query/src/promql/planner.rs | 1 + 2 files changed, 179 insertions(+), 15 deletions(-) diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index fe0a6eb329..14cf629c39 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -19,13 +19,15 @@ use std::sync::Arc; use std::task::{Context, Poll}; use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array}; -use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::{DataType, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::stats::Precision; -use datafusion::common::{DFSchema, DFSchemaRef}; +use datafusion::common::{DFSchema, DFSchemaRef, ScalarValue}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::logical_expr::{ + EmptyRelation, Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore, +}; use datafusion::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, }; @@ -35,13 +37,13 @@ use datafusion::physical_plan::{ }; use datafusion_expr::col; use datatypes::arrow::compute; -use datatypes::arrow::error::Result as ArrowResult; use futures::{Stream, StreamExt, ready}; use greptime_proto::substrait_extension as pb; use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; +use crate::extension_plan::series_divide::SeriesDivide; use crate::extension_plan::{ METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index, }; @@ -59,6 +61,8 @@ pub struct InstantManipulate { lookback_delta: Millisecond, interval: Millisecond, time_index_column: String, + // Planner-provided tag-column hint for execution fast paths. + tag_columns: Vec, /// A optional column for validating staleness field_column: Option, input: LogicalPlan, @@ -166,6 +170,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { lookback_delta: self.lookback_delta, interval: self.interval, time_index_column, + tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns), field_column, input, unfix: None, @@ -177,6 +182,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { lookback_delta: self.lookback_delta, interval: self.interval, time_index_column: self.time_index_column.clone(), + tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns), field_column: self.field_column.clone(), input, unfix: None, @@ -186,12 +192,14 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { } impl InstantManipulate { + #[allow(clippy::too_many_arguments)] pub fn new( start: Millisecond, end: Millisecond, lookback_delta: Millisecond, interval: Millisecond, time_index_column: String, + tag_columns: Vec, field_column: Option, input: LogicalPlan, ) -> Self { @@ -201,6 +209,7 @@ impl InstantManipulate { lookback_delta, interval, time_index_column, + tag_columns, field_column, input, unfix: None, @@ -211,7 +220,25 @@ impl InstantManipulate { "InstantManipulate" } + fn resolve_tag_columns(input: &LogicalPlan, tag_columns: &[String]) -> Vec { + if !tag_columns.is_empty() { + return tag_columns.to_vec(); + } + + let LogicalPlan::Extension(Extension { node }) = input else { + return Vec::new(); + }; + + node.as_any() + .downcast_ref::() + .map(|series_divide| series_divide.tags().to_vec()) + .unwrap_or_default() + } + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + let reuse_all_non_sample_columns = + matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid"); + Arc::new(InstantManipulateExec { start: self.start, end: self.end, @@ -219,6 +246,7 @@ impl InstantManipulate { interval: self.interval, time_index_column: self.time_index_column.clone(), field_column: self.field_column.clone(), + reuse_all_non_sample_columns, input: exec_input, metric: ExecutionPlanMetricsSet::new(), }) @@ -264,6 +292,7 @@ impl InstantManipulate { lookback_delta: pb_instant_manipulate.lookback_delta, interval: pb_instant_manipulate.interval, time_index_column: String::new(), + tag_columns: Vec::new(), field_column: None, input: placeholder_plan, unfix: Some(unfix), @@ -279,6 +308,7 @@ pub struct InstantManipulateExec { interval: Millisecond, time_index_column: String, field_column: Option, + reuse_all_non_sample_columns: bool, input: Arc, metric: ExecutionPlanMetricsSet, @@ -322,6 +352,7 @@ impl ExecutionPlan for InstantManipulateExec { interval: self.interval, time_index_column: self.time_index_column.clone(), field_column: self.field_column.clone(), + reuse_all_non_sample_columns: self.reuse_all_non_sample_columns, input: children[0].clone(), metric: self.metric.clone(), })) @@ -333,9 +364,8 @@ impl ExecutionPlan for InstantManipulateExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); - let metrics_builder = MetricBuilder::new(&self.metric); let num_series = Count::new(); - metrics_builder + MetricBuilder::new(&self.metric) .with_partition(partition) .build(MetricValue::Count { name: METRIC_NUM_SERIES.into(), @@ -353,6 +383,10 @@ impl ExecutionPlan for InstantManipulateExec { .as_ref() .and_then(|name| schema.column_with_name(name)) .map(|x| x.0); + let reuse_all_non_sample_columns = self.reuse_all_non_sample_columns + && schema + .column_with_name("__tsid") + .is_some_and(|(_, field)| field.data_type() == &DataType::UInt64); Ok(Box::pin(InstantManipulateStream { start: self.start, end: self.end, @@ -360,6 +394,7 @@ impl ExecutionPlan for InstantManipulateExec { interval: self.interval, time_index, field_index, + reuse_all_non_sample_columns, schema, input, metric: baseline_metric, @@ -425,6 +460,7 @@ pub struct InstantManipulateStream { // Column index of TIME INDEX column's position in schema time_index: usize, field_index: Option, + reuse_all_non_sample_columns: bool, schema: SchemaRef, input: SendableRecordBatchStream, @@ -510,12 +546,17 @@ impl InstantManipulateStream { let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval; let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval; - let mut take_indices = vec![]; + let estimated_points = if aligned_end >= aligned_start { + ((aligned_end - aligned_start) / self.interval).saturating_add(1) as usize + } else { + 0 + }; + let mut take_indices = Vec::with_capacity(estimated_points); let mut cursor = 0; let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize); - let mut aligned_ts = vec![]; + let mut aligned_ts = Vec::with_capacity(estimated_points); // calculate the offsets to take 'next: for expected_ts in aligned_ts_iter { @@ -593,13 +634,26 @@ impl InstantManipulateStream { ) -> DataFusionResult { assert_eq!(take_indices.len(), aligned_ts.len()); - let indices_array = UInt64Array::from(take_indices); - let mut arrays = record_batch - .columns() - .iter() - .map(|array| compute::take(array, &indices_array, None)) - .collect::>>()?; - arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts)); + let output_len = aligned_ts.len(); + let mut indices_array = None; + let mut arrays = Vec::with_capacity(record_batch.num_columns()); + let aligned_ts = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as Arc; + + for (index, array) in record_batch.columns().iter().enumerate() { + if index == self.time_index { + arrays.push(aligned_ts.clone()); + continue; + } + + if self.reuse_all_non_sample_columns && Some(index) != self.field_index { + arrays.push(reuse_constant_column(array, output_len)?); + continue; + } + + let indices_array = + indices_array.get_or_insert_with(|| UInt64Array::from(take_indices.clone())); + arrays.push(compute::take(array, indices_array, None)?); + } let result = RecordBatch::try_new(record_batch.schema(), arrays) .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; @@ -607,9 +661,24 @@ impl InstantManipulateStream { } } +fn reuse_constant_column(array: &Arc, len: usize) -> DataFusionResult> { + if len <= array.len() { + return Ok(array.slice(0, len)); + } + + if array.is_empty() { + return Ok(array.slice(0, 0)); + } + + ScalarValue::try_from_array(array.as_ref(), 0)?.to_array_of_size(len) +} + #[cfg(test)] mod test { + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::ToDFSchema; + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::datasource::source::DataSourceExec; use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use datafusion::prelude::SessionContext; @@ -638,6 +707,7 @@ mod test { interval, time_index_column: TIME_INDEX_COLUMN.to_string(), field_column: Some("value".to_string()), + reuse_all_non_sample_columns: false, input: memory_exec, metric: ExecutionPlanMetricsSet::new(), }); @@ -665,6 +735,7 @@ mod test { 0, 0, TIME_INDEX_COLUMN.to_string(), + Vec::new(), Some("value".to_string()), input, ); @@ -676,6 +747,98 @@ mod test { assert_eq!(required.as_slice(), &[0, 1, 2]); } + #[test] + fn rebuild_should_recover_tag_columns_from_series_divide_input() { + let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let series_divide = LogicalPlan::Extension(Extension { + node: Arc::new(SeriesDivide::new( + vec!["__tsid".to_string()], + TIME_INDEX_COLUMN.to_string(), + input, + )), + }); + let bytes = InstantManipulate::new( + 0, + 0, + 0, + 0, + TIME_INDEX_COLUMN.to_string(), + vec!["__tsid".to_string()], + Some("value".to_string()), + series_divide.clone(), + ) + .serialize(); + let plan = InstantManipulate::deserialize(&bytes) + .unwrap() + .with_exprs_and_inputs(vec![], vec![series_divide]) + .unwrap(); + + assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]); + } + + #[tokio::test] + async fn tsid_fast_path_reuses_non_sample_columns_when_output_grows() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + Field::new("host", DataType::Utf8, true), + Field::new("__tsid", DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])), + Arc::new(Float64Array::from(vec![1.0, 2.0])), + Arc::new(datafusion::arrow::array::StringArray::from(vec![ + "foo", "foo", + ])), + Arc::new(UInt64Array::from(vec![42, 42])), + ], + ) + .unwrap(); + let input = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + ))); + let normalize_exec = Arc::new(InstantManipulateExec { + start: 0, + end: 1_500, + lookback_delta: 1_000, + interval: 500, + time_index_column: TIME_INDEX_COLUMN.to_string(), + field_column: Some("value".to_string()), + reuse_all_non_sample_columns: true, + input, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + assert_eq!( + result_literal, + "+-------------------------+-------+------+--------+\ + \n| timestamp | value | host | __tsid |\ + \n+-------------------------+-------+------+--------+\ + \n| 1970-01-01T00:00:00 | 1.0 | foo | 42 |\ + \n| 1970-01-01T00:00:00.500 | 1.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01 | 2.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01.500 | 2.0 | foo | 42 |\ + \n+-------------------------+-------+------+--------+" + ); + } + #[tokio::test] async fn lookback_10s_interval_30s() { let expected = String::from( diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 427644e26a..0eff87952c 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -915,6 +915,7 @@ impl PromPlanner { .time_index_column .clone() .expect("time index should be set in `setup_context`"), + self.ctx.tag_columns.clone(), self.ctx.field_columns.first().cloned(), normalize, );