diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 6dc531a8cb..b0ce0219ea 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -205,11 +205,12 @@ impl ExecutionPlan for SeriesDivideExec { .collect(); Ok(Box::pin(SeriesDivideStream { tag_indices, - buffer: None, + buffer: vec![], schema, input, metric: baseline_metric, num_series: 0, + inspect_start: 0, })) } @@ -231,11 +232,13 @@ impl DisplayAs for SeriesDivideExec { /// Assume the input stream is ordered on the tag columns. pub struct SeriesDivideStream { tag_indices: Vec, - buffer: Option, + buffer: Vec, schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, num_series: usize, + /// Index of buffered batches to start inspect next time. + inspect_start: usize, } impl RecordBatchStream for SeriesDivideStream { @@ -248,30 +251,45 @@ impl Stream for SeriesDivideStream { type Item = DataFusionResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let timer = std::time::Instant::now(); loop { - if let Some(batch) = self.buffer.as_ref() { - let same_length = self.find_first_diff_row(batch) + 1; - if same_length >= batch.num_rows() { + if !self.buffer.is_empty() { + let cut_at = self.find_first_diff_row(); + if let Some((batch_index, row_index)) = cut_at { + // slice out the first time series and return it. + let half_batch_of_first_series = + self.buffer[batch_index].slice(0, row_index + 1); + let half_batch_of_second_series = self.buffer[batch_index].slice( + row_index + 1, + self.buffer[batch_index].num_rows() - row_index - 1, + ); + let result_batches = self + .buffer + .drain(0..batch_index) + .chain([half_batch_of_first_series]) + .collect::>(); + self.buffer[0] = half_batch_of_second_series; + let result_batch = compute::concat_batches(&self.schema, &result_batches)?; + + self.inspect_start = 0; + self.num_series += 1; + self.metric.elapsed_compute().add_elapsed(timer); + return Poll::Ready(Some(Ok(result_batch))); + } else { + // continue to fetch next batch as the current buffer only contains one time series. let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?; - // SAFETY: if-let guards the buffer is not None; - // and we cannot change the buffer at this point. - let batch = self.buffer.take().expect("this batch must exist"); if let Some(next_batch) = next_batch { - self.buffer = Some(compute::concat_batches( - &batch.schema(), - &[batch, next_batch], - )?); + self.buffer.push(next_batch); continue; } else { + // input stream is ended + let result = compute::concat_batches(&self.schema, &self.buffer)?; + self.buffer.clear(); + self.inspect_start = 0; self.num_series += 1; - return Poll::Ready(Some(Ok(batch))); + self.metric.elapsed_compute().add_elapsed(timer); + return Poll::Ready(Some(Ok(result))); } - } else { - let result_batch = batch.slice(0, same_length); - let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length); - self.buffer = Some(remaining_batch); - self.num_series += 1; - return Poll::Ready(Some(Ok(result_batch))); } } else { let batch = match ready!(self.as_mut().fetch_next_batch(cx)) { @@ -282,7 +300,7 @@ impl Stream for SeriesDivideStream { } error => return Poll::Ready(error), }; - self.buffer = Some(batch); + self.buffer.push(batch); continue; } } @@ -294,40 +312,72 @@ impl SeriesDivideStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let poll = match self.input.poll_next_unpin(cx) { - Poll::Ready(batch) => { - let _timer = self.metric.elapsed_compute().timer(); - Poll::Ready(batch) - } - Poll::Pending => Poll::Pending, - }; + let poll = self.input.poll_next_unpin(cx); self.metric.record_poll(poll) } - fn find_first_diff_row(&self, batch: &RecordBatch) -> usize { + /// Return the position to cut buffer. + /// None implies the current buffer only contains one time series. + fn find_first_diff_row(&mut self) -> Option<(usize, usize)> { // fast path: no tag columns means all data belongs to the same series. if self.tag_indices.is_empty() { - return batch.num_rows(); + return None; } - let num_rows = batch.num_rows(); - let mut result = num_rows; + let mut resumed_batch_index = self.inspect_start; - for index in &self.tag_indices { - let array = batch.column(*index); - let string_array = array.as_any().downcast_ref::().unwrap(); - // the first row number that not equal to the next row. - let mut same_until = 0; - while same_until < num_rows - 1 { - if string_array.value(same_until) != string_array.value(same_until + 1) { - break; + for batch in &self.buffer[resumed_batch_index..] { + let num_rows = batch.num_rows(); + let mut result_index = num_rows; + + // check if the first row is the same with last batch's last row + if resumed_batch_index > self.inspect_start { + let last_batch = &self.buffer[resumed_batch_index - 1]; + let last_row = last_batch.num_rows() - 1; + for index in &self.tag_indices { + let current_array = batch.column(*index); + let last_array = last_batch.column(*index); + let current_value = current_array + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + let last_value = last_array + .as_any() + .downcast_ref::() + .unwrap() + .value(last_row); + if current_value != last_value { + return Some((resumed_batch_index, 0)); + } } - same_until += 1; } - result = result.min(same_until); + + // check column by column + for index in &self.tag_indices { + let array = batch.column(*index); + let string_array = array.as_any().downcast_ref::().unwrap(); + // the first row number that not equal to the next row. + let mut same_until = 0; + while same_until < num_rows - 1 { + if string_array.value(same_until) != string_array.value(same_until + 1) { + break; + } + same_until += 1; + } + result_index = result_index.min(same_until); + } + + if result_index + 1 >= num_rows { + // all rows are the same, inspect next batch + resumed_batch_index += 1; + } else { + return Some((resumed_batch_index, result_index)); + } } - result + self.inspect_start = resumed_batch_index; + None } }