perf: better performance for LastNonNullIter close #5229 about 10x times faster (#5518)

* fix: better performance for LastNonNullIter close #LastNonNullIter

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: address comments

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: add Safety comments for the unwrap

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
This commit is contained in:
yihong
2025-02-13 13:14:39 +08:00
committed by GitHub
parent 63d5a69a31
commit fbd5316fdb

View File

@@ -502,6 +502,9 @@ pub(crate) struct LastNonNullIter<I> {
/// fetch a new batch.
/// The batch is always not empty.
current_batch: Option<Batch>,
/// The index of the current row in the current batch.
/// more to check issue #5229.
current_index: usize,
}
impl<I> LastNonNullIter<I> {
@@ -513,20 +516,9 @@ impl<I> LastNonNullIter<I> {
strategy: LastNonNull::new(false),
metrics: DedupMetrics::default(),
current_batch: None,
current_index: 0,
}
}
/// Finds the index of the first row that has the same timestamp with the next row.
/// If no duplicate rows, returns None.
fn find_split_index(batch: &Batch) -> Option<usize> {
if batch.num_rows() < 2 {
return None;
}
// Safety: The batch is not empty.
let timestamps = batch.timestamps_native().unwrap();
timestamps.windows(2).position(|t| t[0] == t[1])
}
}
impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
@@ -541,6 +533,7 @@ impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
};
self.current_batch = iter.next().transpose()?;
self.current_index = 0;
if self.current_batch.is_none() {
// The iterator is exhausted.
self.iter = None;
@@ -549,17 +542,21 @@ impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
}
if let Some(batch) = &self.current_batch {
let Some(index) = Self::find_split_index(batch) else {
// No duplicate rows in the current batch.
return Ok(self.current_batch.take());
};
let first = batch.slice(0, index + 1);
let batch = batch.slice(index + 1, batch.num_rows() - index - 1);
// `index` is Some indicates that the batch has at least one row remaining.
debug_assert!(!batch.is_empty());
self.current_batch = Some(batch);
return Ok(Some(first));
let n = batch.num_rows();
// Safety: The batch is not empty when accessed.
let timestamps = batch.timestamps_native().unwrap();
let mut pos = self.current_index;
while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
pos += 1;
}
let segment = batch.slice(self.current_index, pos - self.current_index + 1);
if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
self.current_index = pos + 1;
} else {
self.current_batch = None;
self.current_index = 0;
}
return Ok(Some(segment));
}
Ok(None)