diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 27ba8f4f3d..1aef1e8343 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -345,6 +345,8 @@ impl ExecutionPlan for RangeManipulateExec { .0 }) .collect(); + let aligned_ts_array = + RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval); Ok(Box::pin(RangeManipulateStream { start: self.start, end: self.end, @@ -352,6 +354,7 @@ impl ExecutionPlan for RangeManipulateExec { range: self.range, time_index, field_columns, + aligned_ts_array, output_schema: self.output_schema.clone(), input, metric: baseline_metric, @@ -405,6 +408,7 @@ pub struct RangeManipulateStream { range: Millisecond, time_index: usize, field_columns: Vec, + aligned_ts_array: ArrayRef, output_schema: SchemaRef, input: SendableRecordBatchStream, @@ -447,7 +451,7 @@ impl RangeManipulateStream { pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult> { let mut other_columns = (0..input.columns().len()).collect::>(); // calculate the range - let (aligned_ts, ranges) = self.calculate_range(&input)?; + let ranges = self.calculate_range(&input)?; // ignore this if all ranges are empty if ranges.iter().all(|(_, len)| *len == 0) { return Ok(None); @@ -479,17 +483,20 @@ impl RangeManipulateStream { new_columns[index] = compute::take(&input.column(index), &take_indices, None)?; } // replace timestamp with the aligned one - new_columns[self.time_index] = aligned_ts; + new_columns[self.time_index] = self.aligned_ts_array.clone(); RecordBatch::try_new(self.output_schema.clone(), new_columns) .map(Some) .map_err(|e| DataFusionError::ArrowError(e, None)) } - fn calculate_range( - &self, - input: &RecordBatch, - ) -> DataFusionResult<(ArrayRef, Vec<(u32, u32)>)> { + fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef { + Arc::new(TimestampMillisecondArray::from_iter_values( + (start..=end).step_by(interval as _), + )) + } + + fn calculate_range(&self, input: &RecordBatch) -> DataFusionResult> { let ts_column = input .column(self.time_index) .as_any() @@ -500,12 +507,10 @@ impl RangeManipulateStream { ) })?; - let mut aligned_ts = vec![]; let mut ranges = vec![]; // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered. for curr_ts in (self.start..=self.end).step_by(self.interval as _) { - aligned_ts.push(curr_ts); let mut range_start = ts_column.len(); let mut range_end = 0; for (index, ts) in ts_column.values().iter().enumerate() { @@ -525,9 +530,7 @@ impl RangeManipulateStream { } } - let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _; - - Ok((aligned_ts_array, ranges)) + Ok(ranges) } }