feat: some optimistic paths for instant manipulate (#7812)

* feat: some optimistic paths for instant manipulate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use tsid in manipulate plan, resolve_tag_columns walks whole plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tsid reuse

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test assertions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* cap max points

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-14 10:36:25 +08:00
committed by GitHub
parent a04fa52486
commit aa0ff3cce7
2 changed files with 395 additions and 15 deletions

View File

@@ -19,13 +19,15 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::{DataType, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::stats::Precision;
use datafusion::common::{DFSchema, DFSchemaRef};
use datafusion::common::{DFSchema, DFSchemaRef, ScalarValue};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{
EmptyRelation, Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore,
};
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
@@ -35,18 +37,20 @@ use datafusion::physical_plan::{
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt, ready};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::series_divide::SeriesDivide;
use crate::extension_plan::{
METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
};
use crate::metrics::PROMQL_SERIES_COUNT;
const MAX_INSTANT_MANIPULATE_OUTPUT_POINTS: usize = 1_000_000;
/// Manipulate the input record batch to make it suitable for Instant Operator.
///
/// This plan will try to align the input time series, for every timestamp between
@@ -59,6 +63,8 @@ pub struct InstantManipulate {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
// Planner-provided tag-column hint for execution fast paths.
tag_columns: Vec<String>,
/// A optional column for validating staleness
field_column: Option<String>,
input: LogicalPlan,
@@ -166,6 +172,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column,
tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
field_column,
input,
unfix: None,
@@ -177,6 +184,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
field_column: self.field_column.clone(),
input,
unfix: None,
@@ -186,12 +194,14 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
}
impl InstantManipulate {
#[allow(clippy::too_many_arguments)]
pub fn new(
start: Millisecond,
end: Millisecond,
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
tag_columns: Vec<String>,
field_column: Option<String>,
input: LogicalPlan,
) -> Self {
@@ -201,6 +211,7 @@ impl InstantManipulate {
lookback_delta,
interval,
time_index_column,
tag_columns,
field_column,
input,
unfix: None,
@@ -211,7 +222,29 @@ impl InstantManipulate {
"InstantManipulate"
}
fn resolve_tag_columns(input: &LogicalPlan, tag_columns: &[String]) -> Vec<String> {
if !tag_columns.is_empty() {
return tag_columns.to_vec();
}
Self::find_series_divide_tags(input).unwrap_or_default()
}
fn find_series_divide_tags(plan: &LogicalPlan) -> Option<Vec<String>> {
if let LogicalPlan::Extension(Extension { node }) = plan
&& let Some(series_divide) = node.as_any().downcast_ref::<SeriesDivide>()
{
return Some(series_divide.tags().to_vec());
}
plan.inputs()
.into_iter()
.find_map(Self::find_series_divide_tags)
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let reuse_tsid_column = matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid");
Arc::new(InstantManipulateExec {
start: self.start,
end: self.end,
@@ -219,6 +252,7 @@ impl InstantManipulate {
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
reuse_tsid_column,
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
@@ -264,6 +298,7 @@ impl InstantManipulate {
lookback_delta: pb_instant_manipulate.lookback_delta,
interval: pb_instant_manipulate.interval,
time_index_column: String::new(),
tag_columns: Vec::new(),
field_column: None,
input: placeholder_plan,
unfix: Some(unfix),
@@ -279,6 +314,7 @@ pub struct InstantManipulateExec {
interval: Millisecond,
time_index_column: String,
field_column: Option<String>,
reuse_tsid_column: bool,
input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
@@ -322,6 +358,7 @@ impl ExecutionPlan for InstantManipulateExec {
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
reuse_tsid_column: self.reuse_tsid_column,
input: children[0].clone(),
metric: self.metric.clone(),
}))
@@ -333,9 +370,8 @@ impl ExecutionPlan for InstantManipulateExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let metrics_builder = MetricBuilder::new(&self.metric);
let num_series = Count::new();
metrics_builder
MetricBuilder::new(&self.metric)
.with_partition(partition)
.build(MetricValue::Count {
name: METRIC_NUM_SERIES.into(),
@@ -353,6 +389,10 @@ impl ExecutionPlan for InstantManipulateExec {
.as_ref()
.and_then(|name| schema.column_with_name(name))
.map(|x| x.0);
let tsid_index = schema
.column_with_name("__tsid")
.filter(|(_, field)| field.data_type() == &DataType::UInt64)
.map(|(index, _)| index);
Ok(Box::pin(InstantManipulateStream {
start: self.start,
end: self.end,
@@ -360,6 +400,8 @@ impl ExecutionPlan for InstantManipulateExec {
interval: self.interval,
time_index,
field_index,
tsid_index,
reuse_tsid_column: self.reuse_tsid_column && tsid_index.is_some(),
schema,
input,
metric: baseline_metric,
@@ -425,6 +467,8 @@ pub struct InstantManipulateStream {
// Column index of TIME INDEX column's position in schema
time_index: usize,
field_index: Option<usize>,
tsid_index: Option<usize>,
reuse_tsid_column: bool,
schema: SchemaRef,
input: SendableRecordBatchStream,
@@ -510,12 +554,22 @@ impl InstantManipulateStream {
let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
let mut take_indices = vec![];
let estimated_points = if aligned_end >= aligned_start {
((aligned_end - aligned_start) / self.interval).saturating_add(1) as usize
} else {
0
};
if estimated_points > MAX_INSTANT_MANIPULATE_OUTPUT_POINTS {
return Err(DataFusionError::Execution(format!(
"InstantManipulate output points exceed limit: {estimated_points} > {MAX_INSTANT_MANIPULATE_OUTPUT_POINTS}"
)));
}
let mut take_indices = Vec::with_capacity(estimated_points);
let mut cursor = 0;
let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
let mut aligned_ts = vec![];
let mut aligned_ts = Vec::with_capacity(estimated_points);
// calculate the offsets to take
'next: for expected_ts in aligned_ts_iter {
@@ -593,13 +647,26 @@ impl InstantManipulateStream {
) -> DataFusionResult<RecordBatch> {
assert_eq!(take_indices.len(), aligned_ts.len());
let indices_array = UInt64Array::from(take_indices);
let mut arrays = record_batch
.columns()
.iter()
.map(|array| compute::take(array, &indices_array, None))
.collect::<ArrowResult<Vec<_>>>()?;
arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
let output_len = aligned_ts.len();
let mut indices_array = None;
let mut arrays = Vec::with_capacity(record_batch.num_columns());
let aligned_ts = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as Arc<dyn Array>;
for (index, array) in record_batch.columns().iter().enumerate() {
if index == self.time_index {
arrays.push(aligned_ts.clone());
continue;
}
if self.reuse_tsid_column && self.tsid_index == Some(index) {
arrays.push(reuse_constant_column(array, output_len)?);
continue;
}
let indices_array =
indices_array.get_or_insert_with(|| UInt64Array::from(take_indices.clone()));
arrays.push(compute::take(array, indices_array, None)?);
}
let result = RecordBatch::try_new(record_batch.schema(), arrays)
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
@@ -607,9 +674,24 @@ impl InstantManipulateStream {
}
}
fn reuse_constant_column(array: &Arc<dyn Array>, len: usize) -> DataFusionResult<Arc<dyn Array>> {
if len <= array.len() {
return Ok(array.slice(0, len));
}
if array.is_empty() {
return Ok(array.slice(0, 0));
}
ScalarValue::try_from_array(array.as_ref(), 0)?.to_array_of_size(len)
}
#[cfg(test)]
mod test {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
@@ -638,6 +720,7 @@ mod test {
interval,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_tsid_column: false,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
@@ -665,6 +748,7 @@ mod test {
0,
0,
TIME_INDEX_COLUMN.to_string(),
Vec::new(),
Some("value".to_string()),
input,
);
@@ -676,6 +760,277 @@ mod test {
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[test]
fn rebuild_should_recover_tag_columns_from_series_divide_input() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let series_divide = LogicalPlan::Extension(Extension {
node: Arc::new(SeriesDivide::new(
vec!["__tsid".to_string()],
TIME_INDEX_COLUMN.to_string(),
input,
)),
});
let bytes = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
vec!["__tsid".to_string()],
Some("value".to_string()),
series_divide.clone(),
)
.serialize();
let plan = InstantManipulate::deserialize(&bytes)
.unwrap()
.with_exprs_and_inputs(vec![], vec![series_divide])
.unwrap();
assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
}
#[test]
fn rebuild_should_recover_tag_columns_from_series_normalize_input() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let series_divide = LogicalPlan::Extension(Extension {
node: Arc::new(SeriesDivide::new(
vec!["__tsid".to_string()],
TIME_INDEX_COLUMN.to_string(),
input,
)),
});
let series_normalize = LogicalPlan::Extension(Extension {
node: Arc::new(crate::extension_plan::SeriesNormalize::new(
0,
TIME_INDEX_COLUMN,
false,
vec!["__tsid".to_string()],
series_divide,
)),
});
let bytes = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
vec!["__tsid".to_string()],
Some("value".to_string()),
series_normalize.clone(),
)
.serialize();
let plan = InstantManipulate::deserialize(&bytes)
.unwrap()
.with_exprs_and_inputs(vec![], vec![series_normalize])
.unwrap();
assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
}
#[test]
fn to_execution_plan_enables_tsid_fast_path() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
]));
let exec_input: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[], schema, None).unwrap(),
)));
let exec = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
vec!["__tsid".to_string()],
Some("value".to_string()),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(datafusion::common::DFSchema::empty()),
}),
)
.to_execution_plan(exec_input);
assert!(format!("{exec:?}").contains("reuse_tsid_column: true"));
}
#[tokio::test]
async fn tsid_fast_path_reuses_tsid_column_when_output_grows() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
Field::new("host", DataType::Utf8, true),
Field::new("__tsid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
Arc::new(datafusion::arrow::array::StringArray::from(vec![
"foo", "foo",
])),
Arc::new(UInt64Array::from(vec![42, 42])),
],
)
.unwrap();
let input = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
)));
let normalize_exec = Arc::new(InstantManipulateExec {
start: 0,
end: 1_500,
lookback_delta: 1_000,
interval: 500,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_tsid_column: true,
input,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_eq!(
result_literal,
"+-------------------------+-------+------+--------+\
\n| timestamp | value | host | __tsid |\
\n+-------------------------+-------+------+--------+\
\n| 1970-01-01T00:00:00 | 1.0 | foo | 42 |\
\n| 1970-01-01T00:00:00.500 | 1.0 | foo | 42 |\
\n| 1970-01-01T00:00:01 | 2.0 | foo | 42 |\
\n| 1970-01-01T00:00:01.500 | 2.0 | foo | 42 |\
\n+-------------------------+-------+------+--------+"
);
}
#[tokio::test]
async fn tsid_fast_path_still_takes_additional_field_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
Field::new("value_2", DataType::Float64, true),
Field::new("host", DataType::Utf8, true),
Field::new("__tsid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
Arc::new(Float64Array::from(vec![10.0, 20.0])),
Arc::new(datafusion::arrow::array::StringArray::from(vec![
"foo", "foo",
])),
Arc::new(UInt64Array::from(vec![42, 42])),
],
)
.unwrap();
let input = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
)));
let normalize_exec = Arc::new(InstantManipulateExec {
start: 0,
end: 1_500,
lookback_delta: 1_000,
interval: 500,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_tsid_column: true,
input,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_eq!(
result_literal,
"+-------------------------+-------+---------+------+--------+\
\n| timestamp | value | value_2 | host | __tsid |\
\n+-------------------------+-------+---------+------+--------+\
\n| 1970-01-01T00:00:00 | 1.0 | 10.0 | foo | 42 |\
\n| 1970-01-01T00:00:00.500 | 1.0 | 10.0 | foo | 42 |\
\n| 1970-01-01T00:00:01 | 2.0 | 20.0 | foo | 42 |\
\n| 1970-01-01T00:00:01.500 | 2.0 | 20.0 | foo | 42 |\
\n+-------------------------+-------+---------+------+--------+"
);
}
#[tokio::test]
async fn manipulate_should_reject_too_many_output_points() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0])),
Arc::new(Float64Array::from(vec![1.0])),
],
)
.unwrap();
let input = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
)));
let too_many_points = MAX_INSTANT_MANIPULATE_OUTPUT_POINTS as Millisecond + 1;
let normalize_exec = Arc::new(InstantManipulateExec {
start: 0,
end: too_many_points,
lookback_delta: too_many_points + 1,
interval: 1,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_tsid_column: false,
input,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let err = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap_err();
assert!(
err.to_string()
.contains("InstantManipulate output points exceed limit")
);
}
#[tokio::test]
async fn lookback_10s_interval_30s() {
let expected = String::from(

View File

@@ -919,6 +919,11 @@ impl PromPlanner {
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
if self.ctx.use_tsid {
vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
} else {
self.ctx.tag_columns.clone()
},
self.ctx.field_columns.first().cloned(),
normalize,
);
@@ -4104,6 +4109,10 @@ mod test {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::greptime_timestamp;
use common_query::test_util::DummyDecoder;
use datafusion::arrow::datatypes::Schema as ArrowSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::Extension;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use promql_parser::label::Labels;
@@ -4117,6 +4126,16 @@ mod test {
use crate::options::QueryOptions;
use crate::parser::QueryLanguageParser;
fn find_instant_manipulate(plan: &LogicalPlan) -> Option<&InstantManipulate> {
if let LogicalPlan::Extension(Extension { node }) = plan
&& let Some(instant_manipulate) = node.as_any().downcast_ref::<InstantManipulate>()
{
return Some(instant_manipulate);
}
plan.inputs().into_iter().find_map(find_instant_manipulate)
}
fn build_query_engine_state() -> QueryEngineState {
QueryEngineState::new(
new_memory_catalog_manager().unwrap(),
@@ -4827,6 +4846,12 @@ mod test {
.iter()
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
);
let manipulate = find_instant_manipulate(&plan).unwrap();
let exec = manipulate.to_execution_plan(Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[], Arc::new(ArrowSchema::empty()), None).unwrap(),
))));
assert!(format!("{exec:?}").contains("reuse_tsid_column: true"));
}
#[tokio::test]