cap max points

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-13 18:20:49 +08:00
parent ad41b85f0d
commit a90f955f96

View File

@@ -49,6 +49,8 @@ use crate::extension_plan::{
};
use crate::metrics::PROMQL_SERIES_COUNT;
const MAX_INSTANT_MANIPULATE_OUTPUT_POINTS: usize = 1_000_000;
/// Manipulate the input record batch to make it suitable for Instant Operator.
///
/// This plan will try to align the input time series, for every timestamp between
@@ -557,6 +559,11 @@ impl InstantManipulateStream {
} else {
0
};
if estimated_points > MAX_INSTANT_MANIPULATE_OUTPUT_POINTS {
return Err(DataFusionError::Execution(format!(
"InstantManipulate output points exceed limit: {estimated_points} > {MAX_INSTANT_MANIPULATE_OUTPUT_POINTS}"
)));
}
let mut take_indices = Vec::with_capacity(estimated_points);
let mut cursor = 0;
@@ -980,6 +987,50 @@ mod test {
);
}
#[tokio::test]
async fn manipulate_should_reject_too_many_output_points() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0])),
Arc::new(Float64Array::from(vec![1.0])),
],
)
.unwrap();
let input = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
)));
let too_many_points = MAX_INSTANT_MANIPULATE_OUTPUT_POINTS as Millisecond + 1;
let normalize_exec = Arc::new(InstantManipulateExec {
start: 0,
end: too_many_points,
lookback_delta: too_many_points + 1,
interval: 1,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_tsid_column: false,
input,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let err = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap_err();
assert!(
err.to_string()
.contains("InstantManipulate output points exceed limit")
);
}
#[tokio::test]
async fn lookback_10s_interval_30s() {
let expected = String::from(