diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index fe0a6eb329..7318b9cba5 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -19,13 +19,15 @@ use std::sync::Arc; use std::task::{Context, Poll}; use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array}; -use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::{DataType, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::stats::Precision; -use datafusion::common::{DFSchema, DFSchemaRef}; +use datafusion::common::{DFSchema, DFSchemaRef, ScalarValue}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::logical_expr::{ + EmptyRelation, Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore, +}; use datafusion::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, }; @@ -35,18 +37,20 @@ use datafusion::physical_plan::{ }; use datafusion_expr::col; use datatypes::arrow::compute; -use datatypes::arrow::error::Result as ArrowResult; use futures::{Stream, StreamExt, ready}; use greptime_proto::substrait_extension as pb; use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; +use crate::extension_plan::series_divide::SeriesDivide; use crate::extension_plan::{ METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index, }; use crate::metrics::PROMQL_SERIES_COUNT; +const MAX_INSTANT_MANIPULATE_OUTPUT_POINTS: usize = 1_000_000; + /// Manipulate the input record batch to make it suitable for Instant Operator. /// /// This plan will try to align the input time series, for every timestamp between @@ -59,6 +63,8 @@ pub struct InstantManipulate { lookback_delta: Millisecond, interval: Millisecond, time_index_column: String, + // Planner-provided tag-column hint for execution fast paths. + tag_columns: Vec, /// A optional column for validating staleness field_column: Option, input: LogicalPlan, @@ -166,6 +172,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { lookback_delta: self.lookback_delta, interval: self.interval, time_index_column, + tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns), field_column, input, unfix: None, @@ -177,6 +184,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { lookback_delta: self.lookback_delta, interval: self.interval, time_index_column: self.time_index_column.clone(), + tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns), field_column: self.field_column.clone(), input, unfix: None, @@ -186,12 +194,14 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { } impl InstantManipulate { + #[allow(clippy::too_many_arguments)] pub fn new( start: Millisecond, end: Millisecond, lookback_delta: Millisecond, interval: Millisecond, time_index_column: String, + tag_columns: Vec, field_column: Option, input: LogicalPlan, ) -> Self { @@ -201,6 +211,7 @@ impl InstantManipulate { lookback_delta, interval, time_index_column, + tag_columns, field_column, input, unfix: None, @@ -211,7 +222,29 @@ impl InstantManipulate { "InstantManipulate" } + fn resolve_tag_columns(input: &LogicalPlan, tag_columns: &[String]) -> Vec { + if !tag_columns.is_empty() { + return tag_columns.to_vec(); + } + + Self::find_series_divide_tags(input).unwrap_or_default() + } + + fn find_series_divide_tags(plan: &LogicalPlan) -> Option> { + if let LogicalPlan::Extension(Extension { node }) = plan + && let Some(series_divide) = node.as_any().downcast_ref::() + { + return Some(series_divide.tags().to_vec()); + } + + plan.inputs() + .into_iter() + .find_map(Self::find_series_divide_tags) + } + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + let reuse_tsid_column = matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid"); + Arc::new(InstantManipulateExec { start: self.start, end: self.end, @@ -219,6 +252,7 @@ impl InstantManipulate { interval: self.interval, time_index_column: self.time_index_column.clone(), field_column: self.field_column.clone(), + reuse_tsid_column, input: exec_input, metric: ExecutionPlanMetricsSet::new(), }) @@ -264,6 +298,7 @@ impl InstantManipulate { lookback_delta: pb_instant_manipulate.lookback_delta, interval: pb_instant_manipulate.interval, time_index_column: String::new(), + tag_columns: Vec::new(), field_column: None, input: placeholder_plan, unfix: Some(unfix), @@ -279,6 +314,7 @@ pub struct InstantManipulateExec { interval: Millisecond, time_index_column: String, field_column: Option, + reuse_tsid_column: bool, input: Arc, metric: ExecutionPlanMetricsSet, @@ -322,6 +358,7 @@ impl ExecutionPlan for InstantManipulateExec { interval: self.interval, time_index_column: self.time_index_column.clone(), field_column: self.field_column.clone(), + reuse_tsid_column: self.reuse_tsid_column, input: children[0].clone(), metric: self.metric.clone(), })) @@ -333,9 +370,8 @@ impl ExecutionPlan for InstantManipulateExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); - let metrics_builder = MetricBuilder::new(&self.metric); let num_series = Count::new(); - metrics_builder + MetricBuilder::new(&self.metric) .with_partition(partition) .build(MetricValue::Count { name: METRIC_NUM_SERIES.into(), @@ -353,6 +389,10 @@ impl ExecutionPlan for InstantManipulateExec { .as_ref() .and_then(|name| schema.column_with_name(name)) .map(|x| x.0); + let tsid_index = schema + .column_with_name("__tsid") + .filter(|(_, field)| field.data_type() == &DataType::UInt64) + .map(|(index, _)| index); Ok(Box::pin(InstantManipulateStream { start: self.start, end: self.end, @@ -360,6 +400,8 @@ impl ExecutionPlan for InstantManipulateExec { interval: self.interval, time_index, field_index, + tsid_index, + reuse_tsid_column: self.reuse_tsid_column && tsid_index.is_some(), schema, input, metric: baseline_metric, @@ -425,6 +467,8 @@ pub struct InstantManipulateStream { // Column index of TIME INDEX column's position in schema time_index: usize, field_index: Option, + tsid_index: Option, + reuse_tsid_column: bool, schema: SchemaRef, input: SendableRecordBatchStream, @@ -510,12 +554,22 @@ impl InstantManipulateStream { let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval; let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval; - let mut take_indices = vec![]; + let estimated_points = if aligned_end >= aligned_start { + ((aligned_end - aligned_start) / self.interval).saturating_add(1) as usize + } else { + 0 + }; + if estimated_points > MAX_INSTANT_MANIPULATE_OUTPUT_POINTS { + return Err(DataFusionError::Execution(format!( + "InstantManipulate output points exceed limit: {estimated_points} > {MAX_INSTANT_MANIPULATE_OUTPUT_POINTS}" + ))); + } + let mut take_indices = Vec::with_capacity(estimated_points); let mut cursor = 0; let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize); - let mut aligned_ts = vec![]; + let mut aligned_ts = Vec::with_capacity(estimated_points); // calculate the offsets to take 'next: for expected_ts in aligned_ts_iter { @@ -593,13 +647,26 @@ impl InstantManipulateStream { ) -> DataFusionResult { assert_eq!(take_indices.len(), aligned_ts.len()); - let indices_array = UInt64Array::from(take_indices); - let mut arrays = record_batch - .columns() - .iter() - .map(|array| compute::take(array, &indices_array, None)) - .collect::>>()?; - arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts)); + let output_len = aligned_ts.len(); + let mut indices_array = None; + let mut arrays = Vec::with_capacity(record_batch.num_columns()); + let aligned_ts = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as Arc; + + for (index, array) in record_batch.columns().iter().enumerate() { + if index == self.time_index { + arrays.push(aligned_ts.clone()); + continue; + } + + if self.reuse_tsid_column && self.tsid_index == Some(index) { + arrays.push(reuse_constant_column(array, output_len)?); + continue; + } + + let indices_array = + indices_array.get_or_insert_with(|| UInt64Array::from(take_indices.clone())); + arrays.push(compute::take(array, indices_array, None)?); + } let result = RecordBatch::try_new(record_batch.schema(), arrays) .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; @@ -607,9 +674,24 @@ impl InstantManipulateStream { } } +fn reuse_constant_column(array: &Arc, len: usize) -> DataFusionResult> { + if len <= array.len() { + return Ok(array.slice(0, len)); + } + + if array.is_empty() { + return Ok(array.slice(0, 0)); + } + + ScalarValue::try_from_array(array.as_ref(), 0)?.to_array_of_size(len) +} + #[cfg(test)] mod test { + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::ToDFSchema; + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::datasource::source::DataSourceExec; use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use datafusion::prelude::SessionContext; @@ -638,6 +720,7 @@ mod test { interval, time_index_column: TIME_INDEX_COLUMN.to_string(), field_column: Some("value".to_string()), + reuse_tsid_column: false, input: memory_exec, metric: ExecutionPlanMetricsSet::new(), }); @@ -665,6 +748,7 @@ mod test { 0, 0, TIME_INDEX_COLUMN.to_string(), + Vec::new(), Some("value".to_string()), input, ); @@ -676,6 +760,277 @@ mod test { assert_eq!(required.as_slice(), &[0, 1, 2]); } + #[test] + fn rebuild_should_recover_tag_columns_from_series_divide_input() { + let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let series_divide = LogicalPlan::Extension(Extension { + node: Arc::new(SeriesDivide::new( + vec!["__tsid".to_string()], + TIME_INDEX_COLUMN.to_string(), + input, + )), + }); + let bytes = InstantManipulate::new( + 0, + 0, + 0, + 0, + TIME_INDEX_COLUMN.to_string(), + vec!["__tsid".to_string()], + Some("value".to_string()), + series_divide.clone(), + ) + .serialize(); + let plan = InstantManipulate::deserialize(&bytes) + .unwrap() + .with_exprs_and_inputs(vec![], vec![series_divide]) + .unwrap(); + + assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]); + } + + #[test] + fn rebuild_should_recover_tag_columns_from_series_normalize_input() { + let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let series_divide = LogicalPlan::Extension(Extension { + node: Arc::new(SeriesDivide::new( + vec!["__tsid".to_string()], + TIME_INDEX_COLUMN.to_string(), + input, + )), + }); + let series_normalize = LogicalPlan::Extension(Extension { + node: Arc::new(crate::extension_plan::SeriesNormalize::new( + 0, + TIME_INDEX_COLUMN, + false, + vec!["__tsid".to_string()], + series_divide, + )), + }); + let bytes = InstantManipulate::new( + 0, + 0, + 0, + 0, + TIME_INDEX_COLUMN.to_string(), + vec!["__tsid".to_string()], + Some("value".to_string()), + series_normalize.clone(), + ) + .serialize(); + let plan = InstantManipulate::deserialize(&bytes) + .unwrap() + .with_exprs_and_inputs(vec![], vec![series_normalize]) + .unwrap(); + + assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]); + } + + #[test] + fn to_execution_plan_enables_tsid_fast_path() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + ])); + let exec_input: Arc = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[], schema, None).unwrap(), + ))); + + let exec = InstantManipulate::new( + 0, + 0, + 0, + 0, + TIME_INDEX_COLUMN.to_string(), + vec!["__tsid".to_string()], + Some("value".to_string()), + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(datafusion::common::DFSchema::empty()), + }), + ) + .to_execution_plan(exec_input); + + assert!(format!("{exec:?}").contains("reuse_tsid_column: true")); + } + + #[tokio::test] + async fn tsid_fast_path_reuses_tsid_column_when_output_grows() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + Field::new("host", DataType::Utf8, true), + Field::new("__tsid", DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])), + Arc::new(Float64Array::from(vec![1.0, 2.0])), + Arc::new(datafusion::arrow::array::StringArray::from(vec![ + "foo", "foo", + ])), + Arc::new(UInt64Array::from(vec![42, 42])), + ], + ) + .unwrap(); + let input = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + ))); + let normalize_exec = Arc::new(InstantManipulateExec { + start: 0, + end: 1_500, + lookback_delta: 1_000, + interval: 500, + time_index_column: TIME_INDEX_COLUMN.to_string(), + field_column: Some("value".to_string()), + reuse_tsid_column: true, + input, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + assert_eq!( + result_literal, + "+-------------------------+-------+------+--------+\ + \n| timestamp | value | host | __tsid |\ + \n+-------------------------+-------+------+--------+\ + \n| 1970-01-01T00:00:00 | 1.0 | foo | 42 |\ + \n| 1970-01-01T00:00:00.500 | 1.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01 | 2.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01.500 | 2.0 | foo | 42 |\ + \n+-------------------------+-------+------+--------+" + ); + } + + #[tokio::test] + async fn tsid_fast_path_still_takes_additional_field_columns() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + Field::new("value_2", DataType::Float64, true), + Field::new("host", DataType::Utf8, true), + Field::new("__tsid", DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])), + Arc::new(Float64Array::from(vec![1.0, 2.0])), + Arc::new(Float64Array::from(vec![10.0, 20.0])), + Arc::new(datafusion::arrow::array::StringArray::from(vec![ + "foo", "foo", + ])), + Arc::new(UInt64Array::from(vec![42, 42])), + ], + ) + .unwrap(); + let input = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + ))); + let normalize_exec = Arc::new(InstantManipulateExec { + start: 0, + end: 1_500, + lookback_delta: 1_000, + interval: 500, + time_index_column: TIME_INDEX_COLUMN.to_string(), + field_column: Some("value".to_string()), + reuse_tsid_column: true, + input, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + assert_eq!( + result_literal, + "+-------------------------+-------+---------+------+--------+\ + \n| timestamp | value | value_2 | host | __tsid |\ + \n+-------------------------+-------+---------+------+--------+\ + \n| 1970-01-01T00:00:00 | 1.0 | 10.0 | foo | 42 |\ + \n| 1970-01-01T00:00:00.500 | 1.0 | 10.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01 | 2.0 | 20.0 | foo | 42 |\ + \n| 1970-01-01T00:00:01.500 | 2.0 | 20.0 | foo | 42 |\ + \n+-------------------------+-------+---------+------+--------+" + ); + } + + #[tokio::test] + async fn manipulate_should_reject_too_many_output_points() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0])), + Arc::new(Float64Array::from(vec![1.0])), + ], + ) + .unwrap(); + let input = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + ))); + let too_many_points = MAX_INSTANT_MANIPULATE_OUTPUT_POINTS as Millisecond + 1; + let normalize_exec = Arc::new(InstantManipulateExec { + start: 0, + end: too_many_points, + lookback_delta: too_many_points + 1, + interval: 1, + time_index_column: TIME_INDEX_COLUMN.to_string(), + field_column: Some("value".to_string()), + reuse_tsid_column: false, + input, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let err = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap_err(); + + assert!( + err.to_string() + .contains("InstantManipulate output points exceed limit") + ); + } + #[tokio::test] async fn lookback_10s_interval_30s() { let expected = String::from( diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 0c889529eb..6b17b7837a 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -919,6 +919,11 @@ impl PromPlanner { .time_index_column .clone() .expect("time index should be set in `setup_context`"), + if self.ctx.use_tsid { + vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()] + } else { + self.ctx.tag_columns.clone() + }, self.ctx.field_columns.first().cloned(), normalize, ); @@ -4104,6 +4109,10 @@ mod test { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::greptime_timestamp; use common_query::test_util::DummyDecoder; + use datafusion::arrow::datatypes::Schema as ArrowSchema; + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::datasource::source::DataSourceExec; + use datafusion::logical_expr::Extension; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use promql_parser::label::Labels; @@ -4117,6 +4126,16 @@ mod test { use crate::options::QueryOptions; use crate::parser::QueryLanguageParser; + fn find_instant_manipulate(plan: &LogicalPlan) -> Option<&InstantManipulate> { + if let LogicalPlan::Extension(Extension { node }) = plan + && let Some(instant_manipulate) = node.as_any().downcast_ref::() + { + return Some(instant_manipulate); + } + + plan.inputs().into_iter().find_map(find_instant_manipulate) + } + fn build_query_engine_state() -> QueryEngineState { QueryEngineState::new( new_memory_catalog_manager().unwrap(), @@ -4827,6 +4846,12 @@ mod test { .iter() .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME) ); + + let manipulate = find_instant_manipulate(&plan).unwrap(); + let exec = manipulate.to_execution_plan(Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[], Arc::new(ArrowSchema::empty()), None).unwrap(), + )))); + assert!(format!("{exec:?}").contains("reuse_tsid_column: true")); } #[tokio::test]