diff --git a/src/promql/src/extension_plan/absent.rs b/src/promql/src/extension_plan/absent.rs index 7787d7cce6..91b3d088e9 100644 --- a/src/promql/src/extension_plan/absent.rs +++ b/src/promql/src/extension_plan/absent.rs @@ -406,6 +406,8 @@ impl ExecutionPlan for AbsentExec { metric: baseline_metric, // Buffer for streaming output timestamps output_timestamps: Vec::new(), + input_timestamps: Vec::new(), + input_timestamp_offset: 0, // Current timestamp in the output range output_ts_cursor: self.start, input_finished: false, @@ -448,6 +450,9 @@ pub struct AbsentStream { metric: BaselineMetrics, // Buffer for streaming output timestamps output_timestamps: Vec, + // Current input timestamps being processed incrementally. + input_timestamps: Vec, + input_timestamp_offset: usize, // Current timestamp in the output range output_ts_cursor: Millisecond, input_finished: bool, @@ -464,52 +469,53 @@ impl Stream for AbsentStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - if !self.input_finished { - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = std::time::Instant::now(); - if let Err(e) = self.process_input_batch(&batch) { - return Poll::Ready(Some(Err(e))); - } - self.metric.elapsed_compute().add_elapsed(timer); - - // If we have enough data for a batch, output it - if self.output_timestamps.len() >= self.batch_size { - let timer = std::time::Instant::now(); - let result = self.flush_output_batch(); - self.metric.elapsed_compute().add_elapsed(timer); - - match result { - Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))), - Ok(None) => continue, - Err(e) => return Poll::Ready(Some(Err(e))), - } - } - } - Some(Err(e)) => return Poll::Ready(Some(Err(e))), - None => { - self.input_finished = true; - - let timer = std::time::Instant::now(); - // Process any remaining absent timestamps - if let Err(e) = self.process_remaining_absent_timestamps() { - return Poll::Ready(Some(Err(e))); - } - let result = self.flush_output_batch(); - self.metric.elapsed_compute().add_elapsed(timer); - return Poll::Ready(result.transpose()); - } + if self.has_pending_input_timestamps() { + let timer = std::time::Instant::now(); + if let Err(e) = self.process_input_batch() { + return Poll::Ready(Some(Err(e))); + } + self.metric.elapsed_compute().add_elapsed(timer); + + match self.flush_output_batch() { + Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))), + Ok(None) => continue, + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + if self.input_finished { + let timer = std::time::Instant::now(); + if let Err(e) = self.process_remaining_absent_timestamps() { + return Poll::Ready(Some(Err(e))); + } + self.metric.elapsed_compute().add_elapsed(timer); + + match self.flush_output_batch() { + Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))), + Ok(None) => return Poll::Ready(None), + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = std::time::Instant::now(); + if let Err(e) = self.buffer_input_timestamps(&batch) { + return Poll::Ready(Some(Err(e))); + } + self.metric.elapsed_compute().add_elapsed(timer); + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + self.input_finished = true; } - } else { - return Poll::Ready(None); } } } } impl AbsentStream { - fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> { - // Extract timestamps from this batch + fn buffer_input_timestamps(&mut self, batch: &RecordBatch) -> DataFusionResult<()> { let timestamp_array = batch.column(self.time_index_column_index); let milli_ts_array = arrow::compute::cast( timestamp_array, @@ -519,29 +525,52 @@ impl AbsentStream { .as_any() .downcast_ref::() .unwrap(); + self.input_timestamps.clear(); + self.input_timestamps + .extend_from_slice(timestamp_array.values()); + self.input_timestamp_offset = 0; + Ok(()) + } + + fn has_pending_input_timestamps(&self) -> bool { + self.input_timestamp_offset < self.input_timestamps.len() + } + + fn process_input_batch(&mut self) -> DataFusionResult<()> { + while self.input_timestamp_offset < self.input_timestamps.len() { + let input_ts = self.input_timestamps[self.input_timestamp_offset]; - // Process against current output cursor position - for &input_ts in timestamp_array.values() { // Generate absent timestamps up to this input timestamp while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end { self.output_timestamps.push(self.output_ts_cursor); self.output_ts_cursor += self.step; + + if self.output_timestamps.len() >= self.batch_size { + return Ok(()); + } } // Skip the input timestamp if it matches our cursor if self.output_ts_cursor == input_ts { self.output_ts_cursor += self.step; } + + self.input_timestamp_offset += 1; } + self.input_timestamps.clear(); + self.input_timestamp_offset = 0; Ok(()) } fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> { - // Generate all remaining absent timestamps (input is finished) while self.output_ts_cursor <= self.end { self.output_timestamps.push(self.output_ts_cursor); self.output_ts_cursor += self.step; + + if self.output_timestamps.len() >= self.batch_size { + return Ok(()); + } } Ok(()) } @@ -551,11 +580,16 @@ impl AbsentStream { return Ok(None); } + let timestamps = if self.output_timestamps.len() <= self.batch_size { + std::mem::take(&mut self.output_timestamps) + } else { + let remaining = self.output_timestamps.split_off(self.batch_size); + std::mem::replace(&mut self.output_timestamps, remaining) + }; + let mut columns: Vec = Vec::with_capacity(self.output_schema.fields().len()); - let num_rows = self.output_timestamps.len(); - columns.push(Arc::new(TimestampMillisecondArray::from( - self.output_timestamps.clone(), - )) as _); + let num_rows = timestamps.len(); + columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as _); columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _); for (_, value) in self.fake_labels.iter() { @@ -567,7 +601,6 @@ impl AbsentStream { let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?; - self.output_timestamps.clear(); Ok(Some(batch)) } } @@ -580,7 +613,7 @@ mod tests { use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::memory::DataSourceExec; use datafusion::datasource::memory::MemorySourceConfig; - use datafusion::prelude::SessionContext; + use datafusion::prelude::{SessionConfig, SessionContext}; use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray}; use super::*; @@ -725,4 +758,77 @@ mod tests { // Should output all timestamps in range: 0, 1000, 2000 assert_eq!(output_timestamps, vec![0, 1000, 2000]); } + + #[tokio::test] + async fn test_absent_respects_session_batch_size_for_large_gap() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9])); + let value_array = Arc::new(Float64Array::from(vec![1.0])); + let batch = + RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap(); + + let memory_exec = DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + )); + + let output_schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + let absent_exec = AbsentExec { + start: 0, + end: 10, + step: 1, + time_index_column: "timestamp".to_string(), + value_column: "value".to_string(), + fake_labels: vec![], + output_schema: output_schema.clone(), + input: Arc::new(memory_exec), + properties: Arc::new(PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )), + metric: ExecutionPlanMetricsSet::new(), + }; + + let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3)); + let task_ctx = session_ctx.task_ctx(); + let mut stream = absent_exec.execute(0, task_ctx).unwrap(); + + let mut batch_sizes = Vec::new(); + let mut output_timestamps = Vec::new(); + while let Some(batch_result) = stream.next().await { + let batch = batch_result.unwrap(); + batch_sizes.push(batch.num_rows()); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ts_array.len() { + if !ts_array.is_null(i) { + output_timestamps.push(ts_array.value(i)); + } + } + } + + assert_eq!(batch_sizes, vec![3, 3, 3, 1]); + assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10]); + } }