diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 65fe94bc9e..7318b9cba5 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -49,6 +49,8 @@ use crate::extension_plan::{ }; use crate::metrics::PROMQL_SERIES_COUNT; +const MAX_INSTANT_MANIPULATE_OUTPUT_POINTS: usize = 1_000_000; + /// Manipulate the input record batch to make it suitable for Instant Operator. /// /// This plan will try to align the input time series, for every timestamp between @@ -557,6 +559,11 @@ impl InstantManipulateStream { } else { 0 }; + if estimated_points > MAX_INSTANT_MANIPULATE_OUTPUT_POINTS { + return Err(DataFusionError::Execution(format!( + "InstantManipulate output points exceed limit: {estimated_points} > {MAX_INSTANT_MANIPULATE_OUTPUT_POINTS}" + ))); + } let mut take_indices = Vec::with_capacity(estimated_points); let mut cursor = 0; @@ -980,6 +987,50 @@ mod test { ); } + #[tokio::test] + async fn manipulate_should_reject_too_many_output_points() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + TIME_INDEX_COLUMN, + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Float64, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0])), + Arc::new(Float64Array::from(vec![1.0])), + ], + ) + .unwrap(); + let input = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + ))); + let too_many_points = MAX_INSTANT_MANIPULATE_OUTPUT_POINTS as Millisecond + 1; + let normalize_exec = Arc::new(InstantManipulateExec { + start: 0, + end: too_many_points, + lookback_delta: too_many_points + 1, + interval: 1, + time_index_column: TIME_INDEX_COLUMN.to_string(), + field_column: Some("value".to_string()), + reuse_tsid_column: false, + input, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let err = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap_err(); + + assert!( + err.to_string() + .contains("InstantManipulate output points exceed limit") + ); + } + #[tokio::test] async fn lookback_10s_interval_30s() { let expected = String::from(