perf: optimize series divide algo (#4603)

* perf: optimize series divide algo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-08-22 17:16:36 +08:00
committed by GitHub
parent 883c5bc5b0
commit b9cedf2c1a

View File

@@ -205,11 +205,12 @@ impl ExecutionPlan for SeriesDivideExec {
.collect();
Ok(Box::pin(SeriesDivideStream {
tag_indices,
buffer: None,
buffer: vec![],
schema,
input,
metric: baseline_metric,
num_series: 0,
inspect_start: 0,
}))
}
@@ -231,11 +232,13 @@ impl DisplayAs for SeriesDivideExec {
/// Assume the input stream is ordered on the tag columns.
pub struct SeriesDivideStream {
tag_indices: Vec<usize>,
buffer: Option<RecordBatch>,
buffer: Vec<RecordBatch>,
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
num_series: usize,
/// Index of buffered batches to start inspect next time.
inspect_start: usize,
}
impl RecordBatchStream for SeriesDivideStream {
@@ -248,30 +251,45 @@ impl Stream for SeriesDivideStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
loop {
if let Some(batch) = self.buffer.as_ref() {
let same_length = self.find_first_diff_row(batch) + 1;
if same_length >= batch.num_rows() {
if !self.buffer.is_empty() {
let cut_at = self.find_first_diff_row();
if let Some((batch_index, row_index)) = cut_at {
// slice out the first time series and return it.
let half_batch_of_first_series =
self.buffer[batch_index].slice(0, row_index + 1);
let half_batch_of_second_series = self.buffer[batch_index].slice(
row_index + 1,
self.buffer[batch_index].num_rows() - row_index - 1,
);
let result_batches = self
.buffer
.drain(0..batch_index)
.chain([half_batch_of_first_series])
.collect::<Vec<_>>();
self.buffer[0] = half_batch_of_second_series;
let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
self.inspect_start = 0;
self.num_series += 1;
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(Some(Ok(result_batch)));
} else {
// continue to fetch next batch as the current buffer only contains one time series.
let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
// SAFETY: if-let guards the buffer is not None;
// and we cannot change the buffer at this point.
let batch = self.buffer.take().expect("this batch must exist");
if let Some(next_batch) = next_batch {
self.buffer = Some(compute::concat_batches(
&batch.schema(),
&[batch, next_batch],
)?);
self.buffer.push(next_batch);
continue;
} else {
// input stream is ended
let result = compute::concat_batches(&self.schema, &self.buffer)?;
self.buffer.clear();
self.inspect_start = 0;
self.num_series += 1;
return Poll::Ready(Some(Ok(batch)));
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(Some(Ok(result)));
}
} else {
let result_batch = batch.slice(0, same_length);
let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length);
self.buffer = Some(remaining_batch);
self.num_series += 1;
return Poll::Ready(Some(Ok(result_batch)));
}
} else {
let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
@@ -282,7 +300,7 @@ impl Stream for SeriesDivideStream {
}
error => return Poll::Ready(error),
};
self.buffer = Some(batch);
self.buffer.push(batch);
continue;
}
}
@@ -294,40 +312,72 @@ impl SeriesDivideStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<DataFusionResult<RecordBatch>>> {
let poll = match self.input.poll_next_unpin(cx) {
Poll::Ready(batch) => {
let _timer = self.metric.elapsed_compute().timer();
Poll::Ready(batch)
}
Poll::Pending => Poll::Pending,
};
let poll = self.input.poll_next_unpin(cx);
self.metric.record_poll(poll)
}
fn find_first_diff_row(&self, batch: &RecordBatch) -> usize {
/// Return the position to cut buffer.
/// None implies the current buffer only contains one time series.
fn find_first_diff_row(&mut self) -> Option<(usize, usize)> {
// fast path: no tag columns means all data belongs to the same series.
if self.tag_indices.is_empty() {
return batch.num_rows();
return None;
}
let num_rows = batch.num_rows();
let mut result = num_rows;
let mut resumed_batch_index = self.inspect_start;
for index in &self.tag_indices {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
// the first row number that not equal to the next row.
let mut same_until = 0;
while same_until < num_rows - 1 {
if string_array.value(same_until) != string_array.value(same_until + 1) {
break;
for batch in &self.buffer[resumed_batch_index..] {
let num_rows = batch.num_rows();
let mut result_index = num_rows;
// check if the first row is the same with last batch's last row
if resumed_batch_index > self.inspect_start {
let last_batch = &self.buffer[resumed_batch_index - 1];
let last_row = last_batch.num_rows() - 1;
for index in &self.tag_indices {
let current_array = batch.column(*index);
let last_array = last_batch.column(*index);
let current_value = current_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
let last_value = last_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(last_row);
if current_value != last_value {
return Some((resumed_batch_index, 0));
}
}
same_until += 1;
}
result = result.min(same_until);
// check column by column
for index in &self.tag_indices {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
// the first row number that not equal to the next row.
let mut same_until = 0;
while same_until < num_rows - 1 {
if string_array.value(same_until) != string_array.value(same_until + 1) {
break;
}
same_until += 1;
}
result_index = result_index.min(same_until);
}
if result_index + 1 >= num_rows {
// all rows are the same, inspect next batch
resumed_batch_index += 1;
} else {
return Some((resumed_batch_index, result_index));
}
}
result
self.inspect_start = resumed_batch_index;
None
}
}