diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 85f8276061..f91f20b4f8 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -30,7 +30,7 @@ pub(crate) struct LastRowReader { /// Inner reader. reader: BoxedBatchReader, /// The last batch pending to return. - last_batch: Option, + selector: LastRowSelector, } impl LastRowReader { @@ -38,37 +38,18 @@ impl LastRowReader { pub(crate) fn new(reader: BoxedBatchReader) -> Self { Self { reader, - last_batch: None, + selector: LastRowSelector::default(), } } /// Returns the last row of the next key. pub(crate) async fn next_last_row(&mut self) -> Result> { while let Some(batch) = self.reader.next_batch().await? { - if let Some(last) = &self.last_batch { - if last.primary_key() == batch.primary_key() { - // Same key, update last batch. - self.last_batch = Some(batch); - } else { - // Different key, return the last row in `last` and update `last_batch` by - // current batch. - debug_assert!(!last.is_empty()); - let last_row = last.slice(last.num_rows() - 1, 1); - self.last_batch = Some(batch); - return Ok(Some(last_row)); - } - } else { - self.last_batch = Some(batch); + if let Some(yielded) = self.selector.on_next(batch) { + return Ok(Some(yielded)); } } - - if let Some(last) = self.last_batch.take() { - // This is the last key. - let last_row = last.slice(last.num_rows() - 1, 1); - return Ok(Some(last_row)); - } - - Ok(None) + Ok(self.selector.finish()) } } @@ -79,6 +60,45 @@ impl BatchReader for LastRowReader { } } +/// Common struct that selects only the last row of each time series. +#[derive(Default)] +pub struct LastRowSelector { + last_batch: Option, +} + +impl LastRowSelector { + /// Handles next batch. Return the yielding batch if present. + pub fn on_next(&mut self, batch: Batch) -> Option { + if let Some(last) = &self.last_batch { + if last.primary_key() == batch.primary_key() { + // Same key, update last batch. + self.last_batch = Some(batch); + None + } else { + // Different key, return the last row in `last` and update `last_batch` by + // current batch. + debug_assert!(!last.is_empty()); + let last_row = last.slice(last.num_rows() - 1, 1); + self.last_batch = Some(batch); + Some(last_row) + } + } else { + self.last_batch = Some(batch); + None + } + } + + /// Finishes the selector and returns the pending batch if any. + pub fn finish(&mut self) -> Option { + if let Some(last) = self.last_batch.take() { + // This is the last key. + let last_row = last.slice(last.num_rows() - 1, 1); + return Some(last_row); + } + None + } +} + #[cfg(test)] mod tests { use api::v1::OpType;