diff --git a/src/mito2/src/engine/row_selector_test.rs b/src/mito2/src/engine/row_selector_test.rs index 317ede5a97..d79152e57f 100644 --- a/src/mito2/src/engine/row_selector_test.rs +++ b/src/mito2/src/engine/row_selector_test.rs @@ -24,7 +24,7 @@ use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows_for_key, flush_region, put_rows, rows_schema, }; -async fn test_last_row(append_mode: bool) { +async fn test_last_row(append_mode: bool, flat_format: bool) { let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -39,9 +39,12 @@ async fn test_last_row(append_mode: bool) { env.get_kv_backend(), ) .await; - let request = CreateRequestBuilder::new() - .insert_option("append_mode", &append_mode.to_string()) - .build(); + let mut request_builder = + CreateRequestBuilder::new().insert_option("append_mode", &append_mode.to_string()); + if flat_format { + request_builder = request_builder.insert_option("sst_format", "flat"); + } + let request = request_builder.build(); let column_schemas = rows_schema(&request); engine .handle_request(region_id, RegionRequest::Create(request)) @@ -106,10 +109,20 @@ async fn test_last_row(append_mode: bool) { #[tokio::test] async fn test_last_row_append_mode_disabled() { - test_last_row(false).await; + test_last_row(false, false).await; } #[tokio::test] async fn test_last_row_append_mode_enabled() { - test_last_row(true).await; + test_last_row(true, false).await; +} + +#[tokio::test] +async fn test_last_row_flat_format_append_mode_disabled() { + test_last_row(false, true).await; +} + +#[tokio::test] +async fn test_last_row_flat_format_append_mode_enabled() { + test_last_row(true, true).await; } diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index c2336f218d..0c13c120a0 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, BinaryArray}; use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::UInt32Vector; +use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::{FileId, TimeSeriesRowSelector}; @@ -30,7 +31,7 @@ use crate::cache::{ }; use crate::error::{ComputeArrowSnafu, Result}; use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice; -use crate::read::{Batch, BatchReader, BoxedBatchReader}; +use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream}; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index}; use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets}; @@ -610,6 +611,41 @@ impl FlatLastTimestampSelector { } } +/// Reader that keeps only the last row of each time series from a flat RecordBatch stream. +/// Assumes input is sorted, deduped, and contains no delete operations. +pub(crate) struct FlatLastRowReader { + stream: BoxedRecordBatchStream, + selector: FlatLastTimestampSelector, + pending: BatchBuffer, +} + +impl FlatLastRowReader { + /// Creates a new `FlatLastRowReader`. + pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self { + Self { + stream, + selector: FlatLastTimestampSelector::default(), + pending: BatchBuffer::new(), + } + } + + /// Converts the reader into a stream of RecordBatches. + pub(crate) fn into_stream(mut self) -> impl Stream> { + async_stream::try_stream! { + while let Some(batch) = self.stream.try_next().await? { + self.selector.on_next(batch, &mut self.pending)?; + if self.pending.is_full() { + yield self.pending.concat()?; + } + } + self.selector.finish(&mut self.pending)?; + if !self.pending.is_empty() { + yield self.pending.concat()?; + } + } + } +} + /// Gets the primary key bytes at `index` from the primary key dictionary column. fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] { let pk_dict = batch diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index d2be17cc83..a1b3b8f350 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -39,7 +39,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, Un use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeReader; -use crate::read::last_row::LastRowReader; +use crate::read::last_row::{FlatLastRowReader, LastRowReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::range::RangeMeta; @@ -289,6 +289,13 @@ impl SeqScan { Box::pin(reader.into_stream()) as _ }; + let reader = match &stream_ctx.input.series_row_selector { + Some(TimeSeriesRowSelector::LastRow) => { + Box::pin(FlatLastRowReader::new(reader).into_stream()) as _ + } + None => reader, + }; + Ok(reader) }