refactor: LastRowReader to use LastRowSelector (#4374)

Refactor LastRowReader to use LastRowSelector
 - Replaced `last_batch` in `LastRowReader` with `LastRowSelector`.
This commit is contained in:
Lei, HUANG
2024-07-16 11:47:41 +08:00
committed by GitHub
parent b2c5f8eefa
commit 7b28da277d

View File

@@ -30,7 +30,7 @@ pub(crate) struct LastRowReader {
/// Inner reader.
reader: BoxedBatchReader,
/// The last batch pending to return.
last_batch: Option<Batch>,
selector: LastRowSelector,
}
impl LastRowReader {
@@ -38,37 +38,18 @@ impl LastRowReader {
pub(crate) fn new(reader: BoxedBatchReader) -> Self {
Self {
reader,
last_batch: None,
selector: LastRowSelector::default(),
}
}
/// Returns the last row of the next key.
pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.reader.next_batch().await? {
if let Some(last) = &self.last_batch {
if last.primary_key() == batch.primary_key() {
// Same key, update last batch.
self.last_batch = Some(batch);
} else {
// Different key, return the last row in `last` and update `last_batch` by
// current batch.
debug_assert!(!last.is_empty());
let last_row = last.slice(last.num_rows() - 1, 1);
self.last_batch = Some(batch);
return Ok(Some(last_row));
}
} else {
self.last_batch = Some(batch);
if let Some(yielded) = self.selector.on_next(batch) {
return Ok(Some(yielded));
}
}
if let Some(last) = self.last_batch.take() {
// This is the last key.
let last_row = last.slice(last.num_rows() - 1, 1);
return Ok(Some(last_row));
}
Ok(None)
Ok(self.selector.finish())
}
}
@@ -79,6 +60,45 @@ impl BatchReader for LastRowReader {
}
}
/// Common struct that selects only the last row of each time series.
#[derive(Default)]
pub struct LastRowSelector {
last_batch: Option<Batch>,
}
impl LastRowSelector {
/// Handles next batch. Return the yielding batch if present.
pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
if let Some(last) = &self.last_batch {
if last.primary_key() == batch.primary_key() {
// Same key, update last batch.
self.last_batch = Some(batch);
None
} else {
// Different key, return the last row in `last` and update `last_batch` by
// current batch.
debug_assert!(!last.is_empty());
let last_row = last.slice(last.num_rows() - 1, 1);
self.last_batch = Some(batch);
Some(last_row)
}
} else {
self.last_batch = Some(batch);
None
}
}
/// Finishes the selector and returns the pending batch if any.
pub fn finish(&mut self) -> Option<Batch> {
if let Some(last) = self.last_batch.take() {
// This is the last key.
let last_row = last.slice(last.num_rows() - 1, 1);
return Some(last_row);
}
None
}
}
#[cfg(test)]
mod tests {
use api::v1::OpType;