feat: implement last row cache reader for flat format (#7757)

* feat: initial implementation

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle multiple series

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: reset state in finish()

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle duplicated last timestamps across batches

Signed-off-by: evenyag <realevenyag@gmail.com>

* perf: compact primary key array

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix(mito2): simplify flat last timestamp selector state

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): rebuild flat pk dictionary from selector state

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: reduce tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: more logs to debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: concat batches in last row reader

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): simplify flat last row selector output buffer

- Replace VecDeque with BatchBuffer struct for output buffering
- Remove rebuild_pk_dictionary_for_key as batches go directly into buffer
- Remove unused push method and make BatchBuffer pub(crate)
- Remove debug logging in maybe_update_cache

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address comments

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-03-06 20:08:58 +08:00
committed by GitHub
parent 2000611ea1
commit 93c48a078c
6 changed files with 686 additions and 29 deletions

View File

@@ -28,6 +28,7 @@ use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
@@ -45,6 +46,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache
#[cfg(feature = "vector_index")]
use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
use crate::cache::write_cache::WriteCacheRef;
use crate::memtable::record_batch_estimated_size;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::{RegionFileId, RegionIndexId};
@@ -833,24 +835,47 @@ pub struct SelectorResultKey {
pub selector: TimeSeriesRowSelector,
}
/// Result stored in the selector result cache.
pub enum SelectorResult {
/// Batches in the primary key format.
PrimaryKey(Vec<Batch>),
/// Record batches in the flat format.
Flat(Vec<RecordBatch>),
}
/// Cached result for time series row selector.
pub struct SelectorResultValue {
/// Batches of rows selected by the selector.
pub result: Vec<Batch>,
pub result: SelectorResult,
/// Projection of rows.
pub projection: Vec<usize>,
}
impl SelectorResultValue {
/// Creates a new selector result value.
/// Creates a new selector result value with primary key format.
pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
SelectorResultValue { result, projection }
SelectorResultValue {
result: SelectorResult::PrimaryKey(result),
projection,
}
}
/// Creates a new selector result value with flat format.
pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
SelectorResultValue {
result: SelectorResult::Flat(result),
projection,
}
}
/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
// We only consider heap size of all batches.
self.result.iter().map(|batch| batch.memory_size()).sum()
match &self.result {
SelectorResult::PrimaryKey(batches) => {
batches.iter().map(|batch| batch.memory_size()).sum()
}
SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
}
}
}

View File

@@ -17,16 +17,24 @@
use std::sync::Arc;
use async_trait::async_trait;
use datatypes::arrow::array::{Array, BinaryArray};
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::vectors::UInt32Vector;
use snafu::ResultExt;
use store_api::storage::{FileId, TimeSeriesRowSelector};
use crate::cache::{
CacheStrategy, SelectorResultKey, SelectorResultValue, selector_result_cache_hit,
selector_result_cache_miss,
CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
selector_result_cache_hit, selector_result_cache_miss,
};
use crate::error::Result;
use crate::error::{ComputeArrowSnafu, Result};
use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
/// Reader to keep the last row for each time series.
/// It assumes that batches from the input reader are
@@ -95,10 +103,11 @@ impl RowGroupLastRowCachedReader {
};
if let Some(value) = cache_strategy.get_selector_result(&key) {
let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
let schema_matches =
value.projection == row_group_reader.read_format().projection_indices();
if schema_matches {
// Schema matches, use cache batches.
if is_primary_key && schema_matches {
// Format and schema match, use cache batches.
Self::new_hit(value)
} else {
Self::new_miss(key, row_group_reader, cache_strategy)
@@ -156,8 +165,12 @@ pub(crate) struct LastRowCacheReader {
impl LastRowCacheReader {
/// Iterates cached last rows.
async fn next_batch(&mut self) -> Result<Option<Batch>> {
if self.idx < self.value.result.len() {
let res = Ok(Some(self.value.result[self.idx].clone()));
let batches = match &self.value.result {
SelectorResult::PrimaryKey(batches) => batches,
SelectorResult::Flat(_) => unreachable!(),
};
if self.idx < batches.len() {
let res = Ok(Some(batches[self.idx].clone()));
self.idx += 1;
res
} else {
@@ -217,10 +230,10 @@ impl RowGroupLastRowReader {
// we always expect that row groups yields batches.
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self.reader.read_format().projection_indices().to_vec(),
});
let value = Arc::new(SelectorResultValue::new(
std::mem::take(&mut self.yielded_batches),
self.reader.read_format().projection_indices().to_vec(),
));
self.cache_strategy.put_selector_result(self.key, value);
}
@@ -281,9 +294,362 @@ impl LastRowSelector {
}
}
/// Cached last row reader for flat format row group.
/// If the last rows are already cached (as flat `RecordBatch`), returns cached values.
/// Otherwise, reads from the row group, selects last rows, and updates the cache.
pub(crate) enum FlatRowGroupLastRowCachedReader {
/// Cache hit, reads last rows from cached value.
Hit(FlatLastRowCacheReader),
/// Cache miss, reads from row group reader and updates cache.
Miss(FlatRowGroupLastRowReader),
}
impl FlatRowGroupLastRowCachedReader {
pub(crate) fn new(
file_id: FileId,
row_group_idx: usize,
cache_strategy: CacheStrategy,
projection: &[usize],
reader: FlatRowGroupReader,
) -> Self {
let key = SelectorResultKey {
file_id,
row_group_idx,
selector: TimeSeriesRowSelector::LastRow,
};
if let Some(value) = cache_strategy.get_selector_result(&key) {
let is_flat = matches!(&value.result, SelectorResult::Flat(_));
let schema_matches = value.projection == projection;
if is_flat && schema_matches {
Self::new_hit(value)
} else {
Self::new_miss(key, projection, reader, cache_strategy)
}
} else {
Self::new_miss(key, projection, reader, cache_strategy)
}
}
/// Returns the next RecordBatch.
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(),
}
}
fn new_hit(value: Arc<SelectorResultValue>) -> Self {
selector_result_cache_hit();
Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
}
fn new_miss(
key: SelectorResultKey,
projection: &[usize],
reader: FlatRowGroupReader,
cache_strategy: CacheStrategy,
) -> Self {
selector_result_cache_miss();
Self::Miss(FlatRowGroupLastRowReader::new(
key,
projection.to_vec(),
reader,
cache_strategy,
))
}
}
/// Iterates over cached flat last rows.
pub(crate) struct FlatLastRowCacheReader {
value: Arc<SelectorResultValue>,
idx: usize,
}
impl FlatLastRowCacheReader {
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
let batches = match &self.value.result {
SelectorResult::Flat(batches) => batches,
SelectorResult::PrimaryKey(_) => unreachable!(),
};
if self.idx < batches.len() {
let res = Ok(Some(batches[self.idx].clone()));
self.idx += 1;
res
} else {
Ok(None)
}
}
}
/// Buffer that accumulates small `RecordBatch`es and tracks total row count.
pub(crate) struct BatchBuffer {
batches: Vec<RecordBatch>,
num_rows: usize,
}
impl BatchBuffer {
fn new() -> Self {
Self {
batches: Vec::new(),
num_rows: 0,
}
}
/// Returns true if total buffered rows reaches `DEFAULT_READ_BATCH_SIZE`.
fn is_full(&self) -> bool {
self.num_rows >= DEFAULT_READ_BATCH_SIZE
}
/// Extends the buffer from a slice of batches.
fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
for batch in batches {
self.num_rows += batch.num_rows();
}
self.batches.extend_from_slice(batches);
}
/// Returns true if the buffer has no batches.
fn is_empty(&self) -> bool {
self.batches.is_empty()
}
/// Concatenates all buffered batches into one, resets the buffer, and returns the result.
fn concat(&mut self) -> Result<RecordBatch> {
debug_assert!(!self.batches.is_empty());
let schema = self.batches[0].schema();
let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
self.batches.clear();
self.num_rows = 0;
Ok(merged)
}
}
/// Reads last rows from a flat format row group and caches the results.
pub(crate) struct FlatRowGroupLastRowReader {
key: SelectorResultKey,
reader: FlatRowGroupReader,
selector: FlatLastTimestampSelector,
yielded_batches: Vec<RecordBatch>,
cache_strategy: CacheStrategy,
projection: Vec<usize>,
/// Accumulates small selector-output batches before concatenating.
pending: BatchBuffer,
}
impl FlatRowGroupLastRowReader {
fn new(
key: SelectorResultKey,
projection: Vec<usize>,
reader: FlatRowGroupReader,
cache_strategy: CacheStrategy,
) -> Self {
Self {
key,
reader,
selector: FlatLastTimestampSelector::default(),
yielded_batches: vec![],
cache_strategy,
projection,
pending: BatchBuffer::new(),
}
}
/// Concatenates pending batches and records the result in `yielded_batches`.
fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
if self.pending.is_empty() {
return Ok(None);
}
let merged = self.pending.concat()?;
self.yielded_batches.push(merged.clone());
Ok(Some(merged))
}
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.pending.is_full() {
return self.flush_pending();
}
while let Some(batch) = self.reader.next_batch()? {
self.selector.on_next(batch, &mut self.pending)?;
if self.pending.is_full() {
return self.flush_pending();
}
}
// Reader exhausted — flush remaining selector state.
self.selector.finish(&mut self.pending)?;
if !self.pending.is_empty() {
let result = self.flush_pending();
// All last rows in row group are yielded, update cache.
self.maybe_update_cache();
return result;
}
// All last rows in row group are yielded, update cache.
self.maybe_update_cache();
Ok(None)
}
fn maybe_update_cache(&mut self) {
if self.yielded_batches.is_empty() {
return;
}
let batches = std::mem::take(&mut self.yielded_batches);
let value = Arc::new(SelectorResultValue::new_flat(
batches,
self.projection.clone(),
));
self.cache_strategy.put_selector_result(self.key, value);
}
}
/// Selects the last-timestamp rows per primary key from flat `RecordBatch`.
///
/// Assumes that input batches are sorted by primary key then by timestamp,
/// and contain only PUT operations (no DELETE).
#[derive(Default)]
pub(crate) struct FlatLastTimestampSelector {
/// State for the currently in-progress primary key.
current_key: Option<LastKeyState>,
}
#[derive(Debug)]
struct LastKeyState {
key: Vec<u8>,
last_timestamp: i64,
slices: Vec<RecordBatch>,
}
impl LastKeyState {
fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
Self {
key,
last_timestamp,
slices: vec![first_slice],
}
}
}
impl FlatLastTimestampSelector {
/// Processes the next batch and appends completed-key results into `output_buffer`.
pub(crate) fn on_next(
&mut self,
batch: RecordBatch,
output_buffer: &mut BatchBuffer,
) -> Result<()> {
if batch.num_rows() == 0 {
return Ok(());
}
let num_columns = batch.num_columns();
let pk_col_idx = primary_key_column_index(num_columns);
let ts_col_idx = time_index_column_index(num_columns);
let pk_array = batch
.column(pk_col_idx)
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.unwrap();
let offsets = primary_key_offsets(pk_array)?;
if offsets.is_empty() {
return Ok(());
}
let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
for i in 0..offsets.len() - 1 {
let range_start = offsets[i];
let range_end = offsets[i + 1];
let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
let range_last_ts = ts_values[range_end - 1];
let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
match self.current_key.as_mut() {
Some(state) if state.key.as_slice() == range_key => {
if range_last_ts > state.last_timestamp {
state.last_timestamp = range_last_ts;
state.slices.clear();
state.slices.push(range_slice);
} else if range_last_ts == state.last_timestamp {
state.slices.push(range_slice);
}
}
Some(_) => {
self.flush_current_key(output_buffer);
self.current_key = Some(LastKeyState::new(
range_key.to_vec(),
range_last_ts,
range_slice,
));
}
None => {
self.current_key = Some(LastKeyState::new(
range_key.to_vec(),
range_last_ts,
range_slice,
));
}
}
}
Ok(())
}
/// Finishes the selector and appends remaining results into `output_buffer`.
pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
self.flush_current_key(output_buffer);
Ok(())
}
fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
let Some(state) = self.current_key.take() else {
return;
};
output_buffer.extend_from_slice(&state.slices);
}
}
/// Gets the primary key bytes at `index` from the primary key dictionary column.
fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
let pk_dict = batch
.column(pk_col_idx)
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.unwrap();
let key = pk_dict.keys().value(index);
let binary_values = pk_dict
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
binary_values.value(key as usize)
}
/// Finds the start index of rows sharing the last (maximum) timestamp
/// within the range `[range_start, range_end)`.
fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
debug_assert!(range_start < range_end);
let last_ts = ts_values[range_end - 1];
let mut start = range_end - 1;
while start > range_start && ts_values[start - 1] == last_ts {
start -= 1;
}
start
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{
ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
UInt64Array,
};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use super::*;
use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
@@ -352,4 +718,223 @@ mod tests {
)
.await;
}
/// Helper to build a flat format RecordBatch for testing.
fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
let num_rows = timestamps.len();
assert_eq!(primary_keys.len(), num_rows);
assert_eq!(fields.len(), num_rows);
let columns: Vec<ArrayRef> = vec![
// field0 column
Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
// ts column (time index)
Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)),
// __primary_key column (dictionary(uint32, binary))
{
let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for &pk in primary_keys {
builder.append(pk).unwrap();
}
Arc::new(builder.finish())
},
// __sequence column
Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
// __op_type column
Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
];
RecordBatch::try_new(test_flat_schema(), columns).unwrap()
}
fn test_flat_schema() -> SchemaRef {
let fields = vec![
Field::new("field0", DataType::Int64, false),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
false,
),
Field::new("__sequence", DataType::UInt64, false),
Field::new("__op_type", DataType::UInt8, false),
];
Arc::new(Schema::new(fields))
}
/// Collects all rows from the selector across all result batches.
fn collect_flat_results(
selector: &mut FlatLastTimestampSelector,
batches: Vec<RecordBatch>,
) -> Vec<(Vec<u8>, i64)> {
let mut output_buffer = BatchBuffer::new();
let mut results = Vec::new();
for batch in batches {
selector.on_next(batch, &mut output_buffer).unwrap();
for r in output_buffer.batches.drain(..) {
extract_flat_rows(&r, &mut results);
}
output_buffer.num_rows = 0;
}
selector.finish(&mut output_buffer).unwrap();
for r in output_buffer.batches.drain(..) {
extract_flat_rows(&r, &mut results);
}
results
}
/// Extracts (primary_key, timestamp) pairs from a result batch.
fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
let ts_col = batch
.column(1)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let pk_col = batch
.column(2)
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.unwrap();
let binary_values = pk_col
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
for i in 0..batch.num_rows() {
let key_idx = pk_col.keys().value(i);
let pk = binary_values.value(key_idx as usize).to_vec();
let ts = ts_col.value(i);
out.push((pk, ts));
}
}
#[test]
fn test_flat_single_batch_one_key() {
let mut selector = FlatLastTimestampSelector::default();
let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
let results = collect_flat_results(&mut selector, vec![batch]);
assert_eq!(vec![(b"k1".to_vec(), 3)], results);
}
#[test]
fn test_flat_single_batch_multiple_keys() {
let mut selector = FlatLastTimestampSelector::default();
let batch = new_flat_batch(
&[b"k1", b"k1", b"k2", b"k2", b"k3"],
&[1, 2, 3, 4, 5],
&[10, 20, 30, 40, 50],
);
let results = collect_flat_results(&mut selector, vec![batch]);
assert_eq!(
vec![
(b"k1".to_vec(), 2),
(b"k2".to_vec(), 4),
(b"k3".to_vec(), 5),
],
results
);
}
#[test]
fn test_flat_key_spans_batches() {
let mut selector = FlatLastTimestampSelector::default();
let batches = vec![
new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
];
let results = collect_flat_results(&mut selector, batches);
assert_eq!(
vec![
(b"k1".to_vec(), 3),
(b"k2".to_vec(), 5),
(b"k3".to_vec(), 6),
],
results
);
}
#[test]
fn test_flat_duplicate_last_timestamps() {
let mut selector = FlatLastTimestampSelector::default();
// k1 has two rows with the same last timestamp (3).
let batch = new_flat_batch(
&[b"k1", b"k1", b"k1", b"k2"],
&[1, 3, 3, 5],
&[10, 20, 30, 40],
);
let results = collect_flat_results(&mut selector, vec![batch]);
assert_eq!(
vec![
(b"k1".to_vec(), 3),
(b"k1".to_vec(), 3),
(b"k2".to_vec(), 5),
],
results
);
}
#[test]
fn test_flat_duplicate_last_timestamps_across_batches() {
let mut selector = FlatLastTimestampSelector::default();
// k1's last timestamp (3) spans two batches.
let batches = vec![
new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
];
let results = collect_flat_results(&mut selector, batches);
assert_eq!(
vec![
(b"k1".to_vec(), 3),
(b"k1".to_vec(), 3),
(b"k2".to_vec(), 5),
],
results
);
}
#[test]
fn test_flat_pending_chain_dropped_by_higher_timestamp() {
let mut selector = FlatLastTimestampSelector::default();
let batches = vec![
new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
];
let results = collect_flat_results(&mut selector, batches);
assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
}
#[test]
fn test_flat_finish_is_one_shot() {
let mut selector = FlatLastTimestampSelector::default();
let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
let mut output_buffer = BatchBuffer::new();
// Feed one batch: completed keys can be emitted before EOF.
selector.on_next(batch, &mut output_buffer).unwrap();
let mut pre_finish = Vec::new();
for r in output_buffer.batches.drain(..) {
extract_flat_rows(&r, &mut pre_finish);
}
output_buffer.num_rows = 0;
assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
// Simulate EOF by calling finish().
selector.finish(&mut output_buffer).unwrap();
assert!(!output_buffer.is_empty());
output_buffer.batches.clear();
output_buffer.num_rows = 0;
// A second finish after EOF should not yield any more rows.
selector.finish(&mut output_buffer).unwrap();
assert!(output_buffer.is_empty());
}
}

View File

@@ -24,7 +24,7 @@ use snafu::ResultExt;
use crate::error::{RecordBatchSnafu, Result};
use crate::memtable::BoxedBatchIterator;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRangeContextRef;
@@ -248,12 +248,14 @@ impl Iterator for PruneTimeIterator {
pub enum FlatSource {
RowGroup(FlatRowGroupReader),
LastRow(FlatRowGroupLastRowCachedReader),
}
impl FlatSource {
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatSource::RowGroup(r) => r.next_batch(),
FlatSource::LastRow(r) => r.next_batch(),
}
}
}
@@ -282,9 +284,21 @@ impl FlatPruneReader {
}
}
/// Merge metrics with the inner reader and return the merged metrics.
pub(crate) fn new_with_last_row_reader(
ctx: FileRangeContextRef,
reader: FlatRowGroupLastRowCachedReader,
skip_fields: bool,
) -> Self {
Self {
context: ctx,
source: FlatSource::LastRow(reader),
metrics: Default::default(),
skip_fields,
}
}
/// Returns metrics.
pub(crate) fn metrics(&self) -> ReaderMetrics {
// FlatRowGroupReader doesn't collect metrics, so just return our own
self.metrics.clone()
}

View File

@@ -1462,7 +1462,7 @@ pub fn build_flat_file_range_scan_stream(
};
for range in ranges {
let build_reader_start = Instant::now();
let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue};
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);

View File

@@ -45,7 +45,7 @@ use crate::error::{
use crate::read::Batch;
use crate::read::compat::CompatBatch;
use crate::read::flat_projection::CompactionProjectionMapper;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
use crate::read::prune::{FlatPruneReader, PruneReader};
use crate::sst::file::FileHandle;
use crate::sst::parquet::flat_format::{
@@ -237,6 +237,7 @@ impl FileRange {
/// Creates a flat reader that returns RecordBatch.
pub(crate) async fn flat_reader(
&self,
selector: Option<TimeSeriesRowSelector>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<Option<FlatPruneReader>> {
if !self.in_dynamic_filter_range() {
@@ -252,15 +253,47 @@ impl FileRange {
)
.await?;
let use_last_row_reader = if selector
.map(|s| s == TimeSeriesRowSelector::LastRow)
.unwrap_or(false)
{
// Only use LastRowReader if row group does not contain DELETE
// and all rows are selected.
let put_only = !self
.context
.contains_delete(self.row_group_idx)
.inspect_err(|e| {
error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader");
})
.unwrap_or(true);
put_only && self.select_all()
} else {
false
};
// Compute skip_fields once for this row group
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
);
let flat_prune_reader = if use_last_row_reader {
let flat_row_group_reader =
FlatRowGroupReader::new(self.context.clone(), parquet_reader);
let reader = FlatRowGroupLastRowCachedReader::new(
self.file_handle().file_id().file_id(),
self.row_group_idx,
self.context.reader_builder.cache_strategy().clone(),
self.context.read_format().projection_indices(),
flat_row_group_reader,
);
FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
} else {
let flat_row_group_reader =
FlatRowGroupReader::new(self.context.clone(), parquet_reader);
FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
)
};
Ok(Some(flat_prune_reader))
}

View File

@@ -904,7 +904,7 @@ impl PrimaryKeyReadFormat {
}
/// Compute offsets of different primary keys in the array.
fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
if pk_dict_array.is_empty() {
return Ok(Vec::new());
}