From 93c48a078c8aa41b75ea24e6d1cd14e080a01c33 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 6 Mar 2026 20:08:58 +0800 Subject: [PATCH] feat: implement last row cache reader for flat format (#7757) * feat: initial implementation Signed-off-by: evenyag * fix: handle multiple series Signed-off-by: evenyag * fix: reset state in finish() Signed-off-by: evenyag * fix: handle duplicated last timestamps across batches Signed-off-by: evenyag * perf: compact primary key array Signed-off-by: evenyag * fix(mito2): simplify flat last timestamp selector state Signed-off-by: evenyag * refactor(mito2): rebuild flat pk dictionary from selector state Signed-off-by: evenyag * test: reduce tests Signed-off-by: evenyag * chore: update comment Signed-off-by: evenyag * chore: more logs to debug Signed-off-by: evenyag * feat: concat batches in last row reader Signed-off-by: evenyag * refactor(mito2): simplify flat last row selector output buffer - Replace VecDeque with BatchBuffer struct for output buffering - Remove rebuild_pk_dictionary_for_key as batches go directly into buffer - Remove unused push method and make BatchBuffer pub(crate) - Remove debug logging in maybe_update_cache Signed-off-by: evenyag * chore: address comments Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/cache.rs | 35 +- src/mito2/src/read/last_row.rs | 609 +++++++++++++++++++++++- src/mito2/src/read/prune.rs | 20 +- src/mito2/src/read/scan_util.rs | 2 +- src/mito2/src/sst/parquet/file_range.rs | 47 +- src/mito2/src/sst/parquet/format.rs | 2 +- 6 files changed, 686 insertions(+), 29 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index ced8f9e025..5f0184f528 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -28,6 +28,7 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::value::Value; use datatypes::vectors::VectorRef; use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef}; @@ -45,6 +46,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache #[cfg(feature = "vector_index")] use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef}; use crate::cache::write_cache::WriteCacheRef; +use crate::memtable::record_batch_estimated_size; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; use crate::sst::file::{RegionFileId, RegionIndexId}; @@ -833,24 +835,47 @@ pub struct SelectorResultKey { pub selector: TimeSeriesRowSelector, } +/// Result stored in the selector result cache. +pub enum SelectorResult { + /// Batches in the primary key format. + PrimaryKey(Vec), + /// Record batches in the flat format. + Flat(Vec), +} + /// Cached result for time series row selector. pub struct SelectorResultValue { /// Batches of rows selected by the selector. - pub result: Vec, + pub result: SelectorResult, /// Projection of rows. pub projection: Vec, } impl SelectorResultValue { - /// Creates a new selector result value. + /// Creates a new selector result value with primary key format. pub fn new(result: Vec, projection: Vec) -> SelectorResultValue { - SelectorResultValue { result, projection } + SelectorResultValue { + result: SelectorResult::PrimaryKey(result), + projection, + } + } + + /// Creates a new selector result value with flat format. + pub fn new_flat(result: Vec, projection: Vec) -> SelectorResultValue { + SelectorResultValue { + result: SelectorResult::Flat(result), + projection, + } } /// Returns memory used by the value (estimated). fn estimated_size(&self) -> usize { - // We only consider heap size of all batches. - self.result.iter().map(|batch| batch.memory_size()).sum() + match &self.result { + SelectorResult::PrimaryKey(batches) => { + batches.iter().map(|batch| batch.memory_size()).sum() + } + SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(), + } } } diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index a00c039cec..c2336f218d 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -17,16 +17,24 @@ use std::sync::Arc; use async_trait::async_trait; +use datatypes::arrow::array::{Array, BinaryArray}; +use datatypes::arrow::compute::concat_batches; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::UInt32Vector; +use snafu::ResultExt; use store_api::storage::{FileId, TimeSeriesRowSelector}; use crate::cache::{ - CacheStrategy, SelectorResultKey, SelectorResultValue, selector_result_cache_hit, - selector_result_cache_miss, + CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue, + selector_result_cache_hit, selector_result_cache_miss, }; -use crate::error::Result; +use crate::error::{ComputeArrowSnafu, Result}; +use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice; use crate::read::{Batch, BatchReader, BoxedBatchReader}; -use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader}; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; +use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index}; +use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets}; +use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; /// Reader to keep the last row for each time series. /// It assumes that batches from the input reader are @@ -95,10 +103,11 @@ impl RowGroupLastRowCachedReader { }; if let Some(value) = cache_strategy.get_selector_result(&key) { + let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_)); let schema_matches = value.projection == row_group_reader.read_format().projection_indices(); - if schema_matches { - // Schema matches, use cache batches. + if is_primary_key && schema_matches { + // Format and schema match, use cache batches. Self::new_hit(value) } else { Self::new_miss(key, row_group_reader, cache_strategy) @@ -156,8 +165,12 @@ pub(crate) struct LastRowCacheReader { impl LastRowCacheReader { /// Iterates cached last rows. async fn next_batch(&mut self) -> Result> { - if self.idx < self.value.result.len() { - let res = Ok(Some(self.value.result[self.idx].clone())); + let batches = match &self.value.result { + SelectorResult::PrimaryKey(batches) => batches, + SelectorResult::Flat(_) => unreachable!(), + }; + if self.idx < batches.len() { + let res = Ok(Some(batches[self.idx].clone())); self.idx += 1; res } else { @@ -217,10 +230,10 @@ impl RowGroupLastRowReader { // we always expect that row groups yields batches. return; } - let value = Arc::new(SelectorResultValue { - result: std::mem::take(&mut self.yielded_batches), - projection: self.reader.read_format().projection_indices().to_vec(), - }); + let value = Arc::new(SelectorResultValue::new( + std::mem::take(&mut self.yielded_batches), + self.reader.read_format().projection_indices().to_vec(), + )); self.cache_strategy.put_selector_result(self.key, value); } @@ -281,9 +294,362 @@ impl LastRowSelector { } } +/// Cached last row reader for flat format row group. +/// If the last rows are already cached (as flat `RecordBatch`), returns cached values. +/// Otherwise, reads from the row group, selects last rows, and updates the cache. +pub(crate) enum FlatRowGroupLastRowCachedReader { + /// Cache hit, reads last rows from cached value. + Hit(FlatLastRowCacheReader), + /// Cache miss, reads from row group reader and updates cache. + Miss(FlatRowGroupLastRowReader), +} + +impl FlatRowGroupLastRowCachedReader { + pub(crate) fn new( + file_id: FileId, + row_group_idx: usize, + cache_strategy: CacheStrategy, + projection: &[usize], + reader: FlatRowGroupReader, + ) -> Self { + let key = SelectorResultKey { + file_id, + row_group_idx, + selector: TimeSeriesRowSelector::LastRow, + }; + + if let Some(value) = cache_strategy.get_selector_result(&key) { + let is_flat = matches!(&value.result, SelectorResult::Flat(_)); + let schema_matches = value.projection == projection; + if is_flat && schema_matches { + Self::new_hit(value) + } else { + Self::new_miss(key, projection, reader, cache_strategy) + } + } else { + Self::new_miss(key, projection, reader, cache_strategy) + } + } + + /// Returns the next RecordBatch. + pub(crate) fn next_batch(&mut self) -> Result> { + match self { + FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(), + FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(), + } + } + + fn new_hit(value: Arc) -> Self { + selector_result_cache_hit(); + Self::Hit(FlatLastRowCacheReader { value, idx: 0 }) + } + + fn new_miss( + key: SelectorResultKey, + projection: &[usize], + reader: FlatRowGroupReader, + cache_strategy: CacheStrategy, + ) -> Self { + selector_result_cache_miss(); + Self::Miss(FlatRowGroupLastRowReader::new( + key, + projection.to_vec(), + reader, + cache_strategy, + )) + } +} + +/// Iterates over cached flat last rows. +pub(crate) struct FlatLastRowCacheReader { + value: Arc, + idx: usize, +} + +impl FlatLastRowCacheReader { + fn next_batch(&mut self) -> Result> { + let batches = match &self.value.result { + SelectorResult::Flat(batches) => batches, + SelectorResult::PrimaryKey(_) => unreachable!(), + }; + if self.idx < batches.len() { + let res = Ok(Some(batches[self.idx].clone())); + self.idx += 1; + res + } else { + Ok(None) + } + } +} + +/// Buffer that accumulates small `RecordBatch`es and tracks total row count. +pub(crate) struct BatchBuffer { + batches: Vec, + num_rows: usize, +} + +impl BatchBuffer { + fn new() -> Self { + Self { + batches: Vec::new(), + num_rows: 0, + } + } + + /// Returns true if total buffered rows reaches `DEFAULT_READ_BATCH_SIZE`. + fn is_full(&self) -> bool { + self.num_rows >= DEFAULT_READ_BATCH_SIZE + } + + /// Extends the buffer from a slice of batches. + fn extend_from_slice(&mut self, batches: &[RecordBatch]) { + for batch in batches { + self.num_rows += batch.num_rows(); + } + self.batches.extend_from_slice(batches); + } + + /// Returns true if the buffer has no batches. + fn is_empty(&self) -> bool { + self.batches.is_empty() + } + + /// Concatenates all buffered batches into one, resets the buffer, and returns the result. + fn concat(&mut self) -> Result { + debug_assert!(!self.batches.is_empty()); + let schema = self.batches[0].schema(); + let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?; + self.batches.clear(); + self.num_rows = 0; + Ok(merged) + } +} + +/// Reads last rows from a flat format row group and caches the results. +pub(crate) struct FlatRowGroupLastRowReader { + key: SelectorResultKey, + reader: FlatRowGroupReader, + selector: FlatLastTimestampSelector, + yielded_batches: Vec, + cache_strategy: CacheStrategy, + projection: Vec, + /// Accumulates small selector-output batches before concatenating. + pending: BatchBuffer, +} + +impl FlatRowGroupLastRowReader { + fn new( + key: SelectorResultKey, + projection: Vec, + reader: FlatRowGroupReader, + cache_strategy: CacheStrategy, + ) -> Self { + Self { + key, + reader, + selector: FlatLastTimestampSelector::default(), + yielded_batches: vec![], + cache_strategy, + projection, + pending: BatchBuffer::new(), + } + } + + /// Concatenates pending batches and records the result in `yielded_batches`. + fn flush_pending(&mut self) -> Result> { + if self.pending.is_empty() { + return Ok(None); + } + let merged = self.pending.concat()?; + self.yielded_batches.push(merged.clone()); + Ok(Some(merged)) + } + + fn next_batch(&mut self) -> Result> { + if self.pending.is_full() { + return self.flush_pending(); + } + + while let Some(batch) = self.reader.next_batch()? { + self.selector.on_next(batch, &mut self.pending)?; + if self.pending.is_full() { + return self.flush_pending(); + } + } + + // Reader exhausted — flush remaining selector state. + self.selector.finish(&mut self.pending)?; + if !self.pending.is_empty() { + let result = self.flush_pending(); + // All last rows in row group are yielded, update cache. + self.maybe_update_cache(); + return result; + } + + // All last rows in row group are yielded, update cache. + self.maybe_update_cache(); + Ok(None) + } + + fn maybe_update_cache(&mut self) { + if self.yielded_batches.is_empty() { + return; + } + let batches = std::mem::take(&mut self.yielded_batches); + let value = Arc::new(SelectorResultValue::new_flat( + batches, + self.projection.clone(), + )); + self.cache_strategy.put_selector_result(self.key, value); + } +} + +/// Selects the last-timestamp rows per primary key from flat `RecordBatch`. +/// +/// Assumes that input batches are sorted by primary key then by timestamp, +/// and contain only PUT operations (no DELETE). +#[derive(Default)] +pub(crate) struct FlatLastTimestampSelector { + /// State for the currently in-progress primary key. + current_key: Option, +} + +#[derive(Debug)] +struct LastKeyState { + key: Vec, + last_timestamp: i64, + slices: Vec, +} + +impl LastKeyState { + fn new(key: Vec, last_timestamp: i64, first_slice: RecordBatch) -> Self { + Self { + key, + last_timestamp, + slices: vec![first_slice], + } + } +} + +impl FlatLastTimestampSelector { + /// Processes the next batch and appends completed-key results into `output_buffer`. + pub(crate) fn on_next( + &mut self, + batch: RecordBatch, + output_buffer: &mut BatchBuffer, + ) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + let num_columns = batch.num_columns(); + let pk_col_idx = primary_key_column_index(num_columns); + let ts_col_idx = time_index_column_index(num_columns); + + let pk_array = batch + .column(pk_col_idx) + .as_any() + .downcast_ref::() + .unwrap(); + let offsets = primary_key_offsets(pk_array)?; + if offsets.is_empty() { + return Ok(()); + } + + let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx)); + for i in 0..offsets.len() - 1 { + let range_start = offsets[i]; + let range_end = offsets[i + 1]; + let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start); + let range_last_ts = ts_values[range_end - 1]; + let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end); + let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start); + + match self.current_key.as_mut() { + Some(state) if state.key.as_slice() == range_key => { + if range_last_ts > state.last_timestamp { + state.last_timestamp = range_last_ts; + state.slices.clear(); + state.slices.push(range_slice); + } else if range_last_ts == state.last_timestamp { + state.slices.push(range_slice); + } + } + Some(_) => { + self.flush_current_key(output_buffer); + self.current_key = Some(LastKeyState::new( + range_key.to_vec(), + range_last_ts, + range_slice, + )); + } + None => { + self.current_key = Some(LastKeyState::new( + range_key.to_vec(), + range_last_ts, + range_slice, + )); + } + } + } + + Ok(()) + } + + /// Finishes the selector and appends remaining results into `output_buffer`. + pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> { + self.flush_current_key(output_buffer); + Ok(()) + } + + fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) { + let Some(state) = self.current_key.take() else { + return; + }; + output_buffer.extend_from_slice(&state.slices); + } +} + +/// Gets the primary key bytes at `index` from the primary key dictionary column. +fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] { + let pk_dict = batch + .column(pk_col_idx) + .as_any() + .downcast_ref::() + .unwrap(); + let key = pk_dict.keys().value(index); + let binary_values = pk_dict + .values() + .as_any() + .downcast_ref::() + .unwrap(); + binary_values.value(key as usize) +} + +/// Finds the start index of rows sharing the last (maximum) timestamp +/// within the range `[range_start, range_end)`. +fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize { + debug_assert!(range_start < range_end); + + let last_ts = ts_values[range_end - 1]; + let mut start = range_end - 1; + while start > range_start && ts_values[start - 1] == last_ts { + start -= 1; + } + start +} + #[cfg(test)] mod tests { + use std::sync::Arc; + use api::v1::OpType; + use datatypes::arrow::array::{ + ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array, + UInt64Array, + }; + use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type}; + use datatypes::arrow::record_batch::RecordBatch; use super::*; use crate::test_util::{VecBatchReader, check_reader_result, new_batch}; @@ -352,4 +718,223 @@ mod tests { ) .await; } + + /// Helper to build a flat format RecordBatch for testing. + fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch { + let num_rows = timestamps.len(); + assert_eq!(primary_keys.len(), num_rows); + assert_eq!(fields.len(), num_rows); + + let columns: Vec = vec![ + // field0 column + Arc::new(Int64Array::from_iter_values(fields.iter().copied())), + // ts column (time index) + Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + )), + // __primary_key column (dictionary(uint32, binary)) + { + let mut builder = BinaryDictionaryBuilder::::new(); + for &pk in primary_keys { + builder.append(pk).unwrap(); + } + Arc::new(builder.finish()) + }, + // __sequence column + Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])), + // __op_type column + Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])), + ]; + + RecordBatch::try_new(test_flat_schema(), columns).unwrap() + } + + fn test_flat_schema() -> SchemaRef { + let fields = vec![ + Field::new("field0", DataType::Int64, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), + false, + ), + Field::new("__sequence", DataType::UInt64, false), + Field::new("__op_type", DataType::UInt8, false), + ]; + Arc::new(Schema::new(fields)) + } + + /// Collects all rows from the selector across all result batches. + fn collect_flat_results( + selector: &mut FlatLastTimestampSelector, + batches: Vec, + ) -> Vec<(Vec, i64)> { + let mut output_buffer = BatchBuffer::new(); + let mut results = Vec::new(); + for batch in batches { + selector.on_next(batch, &mut output_buffer).unwrap(); + for r in output_buffer.batches.drain(..) { + extract_flat_rows(&r, &mut results); + } + output_buffer.num_rows = 0; + } + selector.finish(&mut output_buffer).unwrap(); + for r in output_buffer.batches.drain(..) { + extract_flat_rows(&r, &mut results); + } + results + } + + /// Extracts (primary_key, timestamp) pairs from a result batch. + fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec, i64)>) { + let ts_col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let pk_col = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + let binary_values = pk_col + .values() + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..batch.num_rows() { + let key_idx = pk_col.keys().value(i); + let pk = binary_values.value(key_idx as usize).to_vec(); + let ts = ts_col.value(i); + out.push((pk, ts)); + } + } + + #[test] + fn test_flat_single_batch_one_key() { + let mut selector = FlatLastTimestampSelector::default(); + let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]); + let results = collect_flat_results(&mut selector, vec![batch]); + assert_eq!(vec![(b"k1".to_vec(), 3)], results); + } + + #[test] + fn test_flat_single_batch_multiple_keys() { + let mut selector = FlatLastTimestampSelector::default(); + let batch = new_flat_batch( + &[b"k1", b"k1", b"k2", b"k2", b"k3"], + &[1, 2, 3, 4, 5], + &[10, 20, 30, 40, 50], + ); + let results = collect_flat_results(&mut selector, vec![batch]); + assert_eq!( + vec![ + (b"k1".to_vec(), 2), + (b"k2".to_vec(), 4), + (b"k3".to_vec(), 5), + ], + results + ); + } + + #[test] + fn test_flat_key_spans_batches() { + let mut selector = FlatLastTimestampSelector::default(); + let batches = vec![ + new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]), + new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]), + new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]), + ]; + let results = collect_flat_results(&mut selector, batches); + assert_eq!( + vec![ + (b"k1".to_vec(), 3), + (b"k2".to_vec(), 5), + (b"k3".to_vec(), 6), + ], + results + ); + } + + #[test] + fn test_flat_duplicate_last_timestamps() { + let mut selector = FlatLastTimestampSelector::default(); + // k1 has two rows with the same last timestamp (3). + let batch = new_flat_batch( + &[b"k1", b"k1", b"k1", b"k2"], + &[1, 3, 3, 5], + &[10, 20, 30, 40], + ); + let results = collect_flat_results(&mut selector, vec![batch]); + assert_eq!( + vec![ + (b"k1".to_vec(), 3), + (b"k1".to_vec(), 3), + (b"k2".to_vec(), 5), + ], + results + ); + } + + #[test] + fn test_flat_duplicate_last_timestamps_across_batches() { + let mut selector = FlatLastTimestampSelector::default(); + // k1's last timestamp (3) spans two batches. + let batches = vec![ + new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]), + new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]), + ]; + let results = collect_flat_results(&mut selector, batches); + assert_eq!( + vec![ + (b"k1".to_vec(), 3), + (b"k1".to_vec(), 3), + (b"k2".to_vec(), 5), + ], + results + ); + } + + #[test] + fn test_flat_pending_chain_dropped_by_higher_timestamp() { + let mut selector = FlatLastTimestampSelector::default(); + let batches = vec![ + new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]), + new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]), + new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]), + ]; + let results = collect_flat_results(&mut selector, batches); + assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results); + } + + #[test] + fn test_flat_finish_is_one_shot() { + let mut selector = FlatLastTimestampSelector::default(); + let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]); + let mut output_buffer = BatchBuffer::new(); + + // Feed one batch: completed keys can be emitted before EOF. + selector.on_next(batch, &mut output_buffer).unwrap(); + let mut pre_finish = Vec::new(); + for r in output_buffer.batches.drain(..) { + extract_flat_rows(&r, &mut pre_finish); + } + output_buffer.num_rows = 0; + assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish); + + // Simulate EOF by calling finish(). + selector.finish(&mut output_buffer).unwrap(); + assert!(!output_buffer.is_empty()); + output_buffer.batches.clear(); + output_buffer.num_rows = 0; + + // A second finish after EOF should not yield any more rows. + selector.finish(&mut output_buffer).unwrap(); + assert!(output_buffer.is_empty()); + } } diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 82dc475d36..29ded3d49a 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -24,7 +24,7 @@ use snafu::ResultExt; use crate::error::{RecordBatchSnafu, Result}; use crate::memtable::BoxedBatchIterator; -use crate::read::last_row::RowGroupLastRowCachedReader; +use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; @@ -248,12 +248,14 @@ impl Iterator for PruneTimeIterator { pub enum FlatSource { RowGroup(FlatRowGroupReader), + LastRow(FlatRowGroupLastRowCachedReader), } impl FlatSource { fn next_batch(&mut self) -> Result> { match self { FlatSource::RowGroup(r) => r.next_batch(), + FlatSource::LastRow(r) => r.next_batch(), } } } @@ -282,9 +284,21 @@ impl FlatPruneReader { } } - /// Merge metrics with the inner reader and return the merged metrics. + pub(crate) fn new_with_last_row_reader( + ctx: FileRangeContextRef, + reader: FlatRowGroupLastRowCachedReader, + skip_fields: bool, + ) -> Self { + Self { + context: ctx, + source: FlatSource::LastRow(reader), + metrics: Default::default(), + skip_fields, + } + } + + /// Returns metrics. pub(crate) fn metrics(&self) -> ReaderMetrics { - // FlatRowGroupReader doesn't collect metrics, so just return our own self.metrics.clone() } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 0ce957a616..bda5b377b3 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1462,7 +1462,7 @@ pub fn build_flat_file_range_scan_stream( }; for range in ranges { let build_reader_start = Instant::now(); - let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue}; + let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue}; let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 6be6cf8c43..6445517bc8 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -45,7 +45,7 @@ use crate::error::{ use crate::read::Batch; use crate::read::compat::CompatBatch; use crate::read::flat_projection::CompactionProjectionMapper; -use crate::read::last_row::RowGroupLastRowCachedReader; +use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader}; use crate::read::prune::{FlatPruneReader, PruneReader}; use crate::sst::file::FileHandle; use crate::sst::parquet::flat_format::{ @@ -237,6 +237,7 @@ impl FileRange { /// Creates a flat reader that returns RecordBatch. pub(crate) async fn flat_reader( &self, + selector: Option, fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result> { if !self.in_dynamic_filter_range() { @@ -252,15 +253,47 @@ impl FileRange { ) .await?; + let use_last_row_reader = if selector + .map(|s| s == TimeSeriesRowSelector::LastRow) + .unwrap_or(false) + { + // Only use LastRowReader if row group does not contain DELETE + // and all rows are selected. + let put_only = !self + .context + .contains_delete(self.row_group_idx) + .inspect_err(|e| { + error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader"); + }) + .unwrap_or(true); + put_only && self.select_all() + } else { + false + }; + // Compute skip_fields once for this row group let skip_fields = self.context.should_skip_fields(self.row_group_idx); - let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader); - let flat_prune_reader = FlatPruneReader::new_with_row_group_reader( - self.context.clone(), - flat_row_group_reader, - skip_fields, - ); + let flat_prune_reader = if use_last_row_reader { + let flat_row_group_reader = + FlatRowGroupReader::new(self.context.clone(), parquet_reader); + let reader = FlatRowGroupLastRowCachedReader::new( + self.file_handle().file_id().file_id(), + self.row_group_idx, + self.context.reader_builder.cache_strategy().clone(), + self.context.read_format().projection_indices(), + flat_row_group_reader, + ); + FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) + } else { + let flat_row_group_reader = + FlatRowGroupReader::new(self.context.clone(), parquet_reader); + FlatPruneReader::new_with_row_group_reader( + self.context.clone(), + flat_row_group_reader, + skip_fields, + ) + }; Ok(Some(flat_prune_reader)) } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 06461e9dbf..70d026e6db 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -904,7 +904,7 @@ impl PrimaryKeyReadFormat { } /// Compute offsets of different primary keys in the array. -fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result> { +pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result> { if pk_dict_array.is_empty() { return Ok(Vec::new()); }