diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index a29781b947..678c03fbf6 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -502,6 +502,9 @@ pub(crate) struct LastNonNullIter { /// fetch a new batch. /// The batch is always not empty. current_batch: Option, + /// The index of the current row in the current batch. + /// more to check issue #5229. + current_index: usize, } impl LastNonNullIter { @@ -513,20 +516,9 @@ impl LastNonNullIter { strategy: LastNonNull::new(false), metrics: DedupMetrics::default(), current_batch: None, + current_index: 0, } } - - /// Finds the index of the first row that has the same timestamp with the next row. - /// If no duplicate rows, returns None. - fn find_split_index(batch: &Batch) -> Option { - if batch.num_rows() < 2 { - return None; - } - - // Safety: The batch is not empty. - let timestamps = batch.timestamps_native().unwrap(); - timestamps.windows(2).position(|t| t[0] == t[1]) - } } impl>> LastNonNullIter { @@ -541,6 +533,7 @@ impl>> LastNonNullIter { }; self.current_batch = iter.next().transpose()?; + self.current_index = 0; if self.current_batch.is_none() { // The iterator is exhausted. self.iter = None; @@ -549,17 +542,21 @@ impl>> LastNonNullIter { } if let Some(batch) = &self.current_batch { - let Some(index) = Self::find_split_index(batch) else { - // No duplicate rows in the current batch. - return Ok(self.current_batch.take()); - }; - - let first = batch.slice(0, index + 1); - let batch = batch.slice(index + 1, batch.num_rows() - index - 1); - // `index` is Some indicates that the batch has at least one row remaining. - debug_assert!(!batch.is_empty()); - self.current_batch = Some(batch); - return Ok(Some(first)); + let n = batch.num_rows(); + // Safety: The batch is not empty when accessed. + let timestamps = batch.timestamps_native().unwrap(); + let mut pos = self.current_index; + while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] { + pos += 1; + } + let segment = batch.slice(self.current_index, pos - self.current_index + 1); + if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] { + self.current_index = pos + 1; + } else { + self.current_batch = None; + self.current_index = 0; + } + return Ok(Some(segment)); } Ok(None)