feat: some optimistic paths for instant manipulate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-14 12:32:36 +08:00
parent 3beb538aa8
commit 456288df87
2 changed files with 179 additions and 15 deletions

View File

@@ -19,13 +19,15 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::{DataType, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::stats::Precision;
use datafusion::common::{DFSchema, DFSchemaRef};
use datafusion::common::{DFSchema, DFSchemaRef, ScalarValue};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{
EmptyRelation, Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore,
};
use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
};
@@ -35,13 +37,13 @@ use datafusion::physical_plan::{
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt, ready};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::series_divide::SeriesDivide;
use crate::extension_plan::{
METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
};
@@ -59,6 +61,8 @@ pub struct InstantManipulate {
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
// Planner-provided tag-column hint for execution fast paths.
tag_columns: Vec<String>,
/// A optional column for validating staleness
field_column: Option<String>,
input: LogicalPlan,
@@ -166,6 +170,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column,
tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
field_column,
input,
unfix: None,
@@ -177,6 +182,7 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
field_column: self.field_column.clone(),
input,
unfix: None,
@@ -186,12 +192,14 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
}
impl InstantManipulate {
#[allow(clippy::too_many_arguments)]
pub fn new(
start: Millisecond,
end: Millisecond,
lookback_delta: Millisecond,
interval: Millisecond,
time_index_column: String,
tag_columns: Vec<String>,
field_column: Option<String>,
input: LogicalPlan,
) -> Self {
@@ -201,6 +209,7 @@ impl InstantManipulate {
lookback_delta,
interval,
time_index_column,
tag_columns,
field_column,
input,
unfix: None,
@@ -211,7 +220,25 @@ impl InstantManipulate {
"InstantManipulate"
}
fn resolve_tag_columns(input: &LogicalPlan, tag_columns: &[String]) -> Vec<String> {
if !tag_columns.is_empty() {
return tag_columns.to_vec();
}
let LogicalPlan::Extension(Extension { node }) = input else {
return Vec::new();
};
node.as_any()
.downcast_ref::<SeriesDivide>()
.map(|series_divide| series_divide.tags().to_vec())
.unwrap_or_default()
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let reuse_all_non_sample_columns =
matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid");
Arc::new(InstantManipulateExec {
start: self.start,
end: self.end,
@@ -219,6 +246,7 @@ impl InstantManipulate {
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
reuse_all_non_sample_columns,
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
@@ -264,6 +292,7 @@ impl InstantManipulate {
lookback_delta: pb_instant_manipulate.lookback_delta,
interval: pb_instant_manipulate.interval,
time_index_column: String::new(),
tag_columns: Vec::new(),
field_column: None,
input: placeholder_plan,
unfix: Some(unfix),
@@ -279,6 +308,7 @@ pub struct InstantManipulateExec {
interval: Millisecond,
time_index_column: String,
field_column: Option<String>,
reuse_all_non_sample_columns: bool,
input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
@@ -322,6 +352,7 @@ impl ExecutionPlan for InstantManipulateExec {
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
reuse_all_non_sample_columns: self.reuse_all_non_sample_columns,
input: children[0].clone(),
metric: self.metric.clone(),
}))
@@ -333,9 +364,8 @@ impl ExecutionPlan for InstantManipulateExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let metrics_builder = MetricBuilder::new(&self.metric);
let num_series = Count::new();
metrics_builder
MetricBuilder::new(&self.metric)
.with_partition(partition)
.build(MetricValue::Count {
name: METRIC_NUM_SERIES.into(),
@@ -353,6 +383,10 @@ impl ExecutionPlan for InstantManipulateExec {
.as_ref()
.and_then(|name| schema.column_with_name(name))
.map(|x| x.0);
let reuse_all_non_sample_columns = self.reuse_all_non_sample_columns
&& schema
.column_with_name("__tsid")
.is_some_and(|(_, field)| field.data_type() == &DataType::UInt64);
Ok(Box::pin(InstantManipulateStream {
start: self.start,
end: self.end,
@@ -360,6 +394,7 @@ impl ExecutionPlan for InstantManipulateExec {
interval: self.interval,
time_index,
field_index,
reuse_all_non_sample_columns,
schema,
input,
metric: baseline_metric,
@@ -425,6 +460,7 @@ pub struct InstantManipulateStream {
// Column index of TIME INDEX column's position in schema
time_index: usize,
field_index: Option<usize>,
reuse_all_non_sample_columns: bool,
schema: SchemaRef,
input: SendableRecordBatchStream,
@@ -510,12 +546,17 @@ impl InstantManipulateStream {
let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
let mut take_indices = vec![];
let estimated_points = if aligned_end >= aligned_start {
((aligned_end - aligned_start) / self.interval).saturating_add(1) as usize
} else {
0
};
let mut take_indices = Vec::with_capacity(estimated_points);
let mut cursor = 0;
let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
let mut aligned_ts = vec![];
let mut aligned_ts = Vec::with_capacity(estimated_points);
// calculate the offsets to take
'next: for expected_ts in aligned_ts_iter {
@@ -593,13 +634,26 @@ impl InstantManipulateStream {
) -> DataFusionResult<RecordBatch> {
assert_eq!(take_indices.len(), aligned_ts.len());
let indices_array = UInt64Array::from(take_indices);
let mut arrays = record_batch
.columns()
.iter()
.map(|array| compute::take(array, &indices_array, None))
.collect::<ArrowResult<Vec<_>>>()?;
arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
let output_len = aligned_ts.len();
let mut indices_array = None;
let mut arrays = Vec::with_capacity(record_batch.num_columns());
let aligned_ts = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as Arc<dyn Array>;
for (index, array) in record_batch.columns().iter().enumerate() {
if index == self.time_index {
arrays.push(aligned_ts.clone());
continue;
}
if self.reuse_all_non_sample_columns && Some(index) != self.field_index {
arrays.push(reuse_constant_column(array, output_len)?);
continue;
}
let indices_array =
indices_array.get_or_insert_with(|| UInt64Array::from(take_indices.clone()));
arrays.push(compute::take(array, indices_array, None)?);
}
let result = RecordBatch::try_new(record_batch.schema(), arrays)
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
@@ -607,9 +661,24 @@ impl InstantManipulateStream {
}
}
fn reuse_constant_column(array: &Arc<dyn Array>, len: usize) -> DataFusionResult<Arc<dyn Array>> {
if len <= array.len() {
return Ok(array.slice(0, len));
}
if array.is_empty() {
return Ok(array.slice(0, 0));
}
ScalarValue::try_from_array(array.as_ref(), 0)?.to_array_of_size(len)
}
#[cfg(test)]
mod test {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
@@ -638,6 +707,7 @@ mod test {
interval,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_all_non_sample_columns: false,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
@@ -665,6 +735,7 @@ mod test {
0,
0,
TIME_INDEX_COLUMN.to_string(),
Vec::new(),
Some("value".to_string()),
input,
);
@@ -676,6 +747,98 @@ mod test {
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[test]
fn rebuild_should_recover_tag_columns_from_series_divide_input() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let series_divide = LogicalPlan::Extension(Extension {
node: Arc::new(SeriesDivide::new(
vec!["__tsid".to_string()],
TIME_INDEX_COLUMN.to_string(),
input,
)),
});
let bytes = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
vec!["__tsid".to_string()],
Some("value".to_string()),
series_divide.clone(),
)
.serialize();
let plan = InstantManipulate::deserialize(&bytes)
.unwrap()
.with_exprs_and_inputs(vec![], vec![series_divide])
.unwrap();
assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
}
#[tokio::test]
async fn tsid_fast_path_reuses_non_sample_columns_when_output_grows() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
Field::new("host", DataType::Utf8, true),
Field::new("__tsid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
Arc::new(datafusion::arrow::array::StringArray::from(vec![
"foo", "foo",
])),
Arc::new(UInt64Array::from(vec![42, 42])),
],
)
.unwrap();
let input = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
)));
let normalize_exec = Arc::new(InstantManipulateExec {
start: 0,
end: 1_500,
lookback_delta: 1_000,
interval: 500,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_all_non_sample_columns: true,
input,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_eq!(
result_literal,
"+-------------------------+-------+------+--------+\
\n| timestamp | value | host | __tsid |\
\n+-------------------------+-------+------+--------+\
\n| 1970-01-01T00:00:00 | 1.0 | foo | 42 |\
\n| 1970-01-01T00:00:00.500 | 1.0 | foo | 42 |\
\n| 1970-01-01T00:00:01 | 2.0 | foo | 42 |\
\n| 1970-01-01T00:00:01.500 | 2.0 | foo | 42 |\
\n+-------------------------+-------+------+--------+"
);
}
#[tokio::test]
async fn lookback_10s_interval_30s() {
let expected = String::from(

View File

@@ -915,6 +915,7 @@ impl PromPlanner {
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.tag_columns.clone(),
self.ctx.field_columns.first().cloned(),
normalize,
);