feat: add flat last row reader to the final stream (#7818)

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-03-17 15:55:48 +08:00
committed by GitHub
parent 5a37e58b4f
commit e0aadffb91
3 changed files with 64 additions and 8 deletions

View File

@@ -24,7 +24,7 @@ use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows_for_key, flush_region, put_rows, rows_schema,
};
async fn test_last_row(append_mode: bool) {
async fn test_last_row(append_mode: bool, flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -39,9 +39,12 @@ async fn test_last_row(append_mode: bool) {
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("append_mode", &append_mode.to_string())
.build();
let mut request_builder =
CreateRequestBuilder::new().insert_option("append_mode", &append_mode.to_string());
if flat_format {
request_builder = request_builder.insert_option("sst_format", "flat");
}
let request = request_builder.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
@@ -106,10 +109,20 @@ async fn test_last_row(append_mode: bool) {
#[tokio::test]
async fn test_last_row_append_mode_disabled() {
test_last_row(false).await;
test_last_row(false, false).await;
}
#[tokio::test]
async fn test_last_row_append_mode_enabled() {
test_last_row(true).await;
test_last_row(true, false).await;
}
#[tokio::test]
async fn test_last_row_flat_format_append_mode_disabled() {
test_last_row(false, true).await;
}
#[tokio::test]
async fn test_last_row_flat_format_append_mode_enabled() {
test_last_row(true, true).await;
}

View File

@@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, BinaryArray};
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::vectors::UInt32Vector;
use futures::{Stream, TryStreamExt};
use snafu::ResultExt;
use store_api::storage::{FileId, TimeSeriesRowSelector};
@@ -30,7 +31,7 @@ use crate::cache::{
};
use crate::error::{ComputeArrowSnafu, Result};
use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
@@ -610,6 +611,41 @@ impl FlatLastTimestampSelector {
}
}
/// Reader that keeps only the last row of each time series from a flat RecordBatch stream.
/// Assumes input is sorted, deduped, and contains no delete operations.
pub(crate) struct FlatLastRowReader {
stream: BoxedRecordBatchStream,
selector: FlatLastTimestampSelector,
pending: BatchBuffer,
}
impl FlatLastRowReader {
/// Creates a new `FlatLastRowReader`.
pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
Self {
stream,
selector: FlatLastTimestampSelector::default(),
pending: BatchBuffer::new(),
}
}
/// Converts the reader into a stream of RecordBatches.
pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
async_stream::try_stream! {
while let Some(batch) = self.stream.try_next().await? {
self.selector.on_next(batch, &mut self.pending)?;
if self.pending.is_full() {
yield self.pending.concat()?;
}
}
self.selector.finish(&mut self.pending)?;
if !self.pending.is_empty() {
yield self.pending.concat()?;
}
}
}
}
/// Gets the primary key bytes at `index` from the primary key dictionary column.
fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
let pk_dict = batch

View File

@@ -39,7 +39,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, Un
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeReader;
use crate::read::last_row::LastRowReader;
use crate::read::last_row::{FlatLastRowReader, LastRowReader};
use crate::read::merge::MergeReaderBuilder;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::range::RangeMeta;
@@ -289,6 +289,13 @@ impl SeqScan {
Box::pin(reader.into_stream()) as _
};
let reader = match &stream_ctx.input.series_row_selector {
Some(TimeSeriesRowSelector::LastRow) => {
Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
}
None => reader,
};
Ok(reader)
}