avoid double read

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-20 12:55:59 +08:00
parent b0edcefb17
commit 8bee922367
2 changed files with 270 additions and 67 deletions

View File

@@ -57,7 +57,7 @@ use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::reader::{
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::row_selection_from_row_ranges;
use crate::sst::parquet::stats::RowGroupPruningStats;
@@ -265,22 +265,18 @@ impl FileRange {
false
};
let row_selection = if use_last_row_reader {
self.row_selection.clone()
} else {
self.prefiltered_flat_row_selection(fetch_metrics).await?
};
if row_selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
return Ok(None);
}
// Compute skip_fields once for this row group
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
let flat_prune_reader = if use_last_row_reader {
let row_selection = self.row_selection.clone();
if row_selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
return Ok(None);
}
let flat_row_group_reader = FlatRowGroupReader::new(
self.context.clone(),
self.context
@@ -297,33 +293,66 @@ impl FileRange {
);
FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
} else {
let flat_row_group_reader = FlatRowGroupReader::new(
self.context.clone(),
self.context
.reader_builder
.build(self.row_group_idx, row_selection, fetch_metrics)
.await?,
);
FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
)
match self.prefiltered_flat_reader_input(fetch_metrics).await? {
PrefilteredFlatReaderInput::Selection(row_selection) => {
if row_selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
return Ok(None);
}
let flat_row_group_reader = FlatRowGroupReader::new(
self.context.clone(),
self.context
.reader_builder
.build(self.row_group_idx, row_selection, fetch_metrics)
.await?,
);
FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
)
}
PrefilteredFlatReaderInput::Prefetched(mut row_group) => {
let flat_row_group_reader = FlatRowGroupReader::new(
self.context.clone(),
self.context
.reader_builder
.build_on_row_group(
&mut row_group,
self.row_selection.clone(),
fetch_metrics,
)
.await?,
);
FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
)
}
}
};
Ok(Some(flat_prune_reader))
}
async fn prefiltered_flat_row_selection(
&self,
async fn prefiltered_flat_reader_input<'a>(
&'a self,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<Option<RowSelection>> {
) -> Result<PrefilteredFlatReaderInput<'a>> {
if !self.select_all() {
return Ok(self.row_selection.clone());
return Ok(PrefilteredFlatReaderInput::Selection(
self.row_selection.clone(),
));
}
let Some(mut primary_key_filter) = self.context.new_primary_key_filter() else {
return Ok(self.row_selection.clone());
return Ok(PrefilteredFlatReaderInput::Selection(
self.row_selection.clone(),
));
};
let read_format = ReadFormat::new_flat(
@@ -340,11 +369,15 @@ impl FileRange {
self.context.file_path(),
false,
)?;
let mut row_group = self
.context
.reader_builder
.new_in_memory_row_group(self.row_group_idx);
let reader = self
.context
.reader_builder
.build_with_read_format(
self.row_group_idx,
.build_on_row_group_with_read_format(
&mut row_group,
self.row_selection.clone(),
fetch_metrics,
&read_format,
@@ -375,19 +408,20 @@ impl FileRange {
}
if matched_row_ranges.is_empty() {
return Ok(Some(RowSelection::from(vec![])));
return Ok(PrefilteredFlatReaderInput::Selection(Some(
RowSelection::from(vec![]),
)));
}
if matched_row_ranges.len() == 1
&& matched_row_ranges[0].start == 0
&& matched_row_ranges[0].end == rows_in_group
{
return Ok(self.row_selection.clone());
return Ok(PrefilteredFlatReaderInput::Prefetched(row_group));
}
Ok(Some(row_selection_from_row_ranges(
matched_row_ranges.into_iter(),
rows_in_group,
Ok(PrefilteredFlatReaderInput::Selection(Some(
row_selection_from_row_ranges(matched_row_ranges.into_iter(), rows_in_group),
)))
}
@@ -407,6 +441,11 @@ impl FileRange {
}
}
enum PrefilteredFlatReaderInput<'a> {
Selection(Option<RowSelection>),
Prefetched(InMemoryRowGroup<'a>),
}
/// Context shared by ranges of the same parquet SST.
pub(crate) struct FileRangeContext {
/// Row group reader builder for the file.
@@ -1266,8 +1305,10 @@ impl RangeBase {
#[cfg(test)]
mod tests {
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use async_trait::async_trait;
use datafusion_expr::{Expr, col, lit};
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array,
@@ -1276,16 +1317,69 @@ mod tests {
use datatypes::arrow::datatypes::{Schema, UInt32Type};
use datatypes::schema::ColumnSchema;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::services::Memory;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_request::PathType;
use table::predicate::Predicate;
use super::*;
use crate::sst::internal_fields;
use crate::access_layer::{FilePathProvider, Metrics, WriteType};
use crate::cache::CacheStrategy;
use crate::config::IndexConfig;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::reader::{ParquetReaderBuilder, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetricsData;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{internal_fields, location};
use crate::test_util::sst_util::{
new_primary_key, new_sparse_primary_key, sst_region_metadata,
new_flat_source_from_record_batches, new_primary_key, new_record_batch_by_range,
new_sparse_primary_key, sst_file_handle, sst_region_metadata,
sst_region_metadata_with_encoding,
};
const FILE_DIR: &str = "/";
#[derive(Clone)]
struct FixedPathProvider {
region_file_id: RegionFileId,
}
impl FilePathProvider for FixedPathProvider {
fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
location::index_file_path(FILE_DIR, index_id, PathType::Bare)
}
fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare)
}
}
struct NoopIndexBuilder;
impl Debug for NoopIndexBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("NoopIndexBuilder")
}
}
#[async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(
&self,
_file_id: store_api::storage::FileId,
_index_version: u64,
) -> Indexer {
Indexer::default()
}
}
fn flat_file_num_columns(metadata: &RegionMetadata) -> usize {
metadata.column_metadatas.len() + 3
}
@@ -1663,4 +1757,86 @@ mod tests {
assert_eq!(filtered.num_rows(), 2);
assert_eq!(field_values(&filtered), vec![12, 13]);
}
async fn fetch_metrics_for_predicate(predicate: Option<Predicate>) -> ParquetFetchMetricsData {
let object_store = object_store::ObjectStore::new(Memory::default())
.unwrap()
.finish();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_flat_source_from_record_batches(vec![new_record_batch_by_range(
&["a", "d"],
0,
60,
)]);
let write_opts = WriteOptions {
row_group_size: 128,
..Default::default()
};
let mut write_metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata,
IndexConfig::default(),
NoopIndexBuilder,
FixedPathProvider {
region_file_id: handle.file_id(),
},
&mut write_metrics,
)
.await;
writer
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap();
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
.cache(CacheStrategy::Disabled)
.flat_format(true)
.predicate(predicate);
let mut reader_metrics = ReaderMetrics::default();
let (context, mut selection) = builder
.build_reader_input(&mut reader_metrics)
.await
.unwrap()
.unwrap();
let (row_group_idx, row_selection) = selection.pop_first().unwrap();
assert!(selection.pop_first().is_none());
let fetch_metrics = ParquetFetchMetrics::default();
let mut reader = FileRange::new(Arc::new(context), row_group_idx, Some(row_selection))
.flat_reader(None, Some(&fetch_metrics))
.await
.unwrap()
.unwrap();
let mut row_count = 0;
while let Some(batch) = reader.next_batch().unwrap() {
row_count += batch.num_rows();
}
assert_eq!(row_count, 60);
fetch_metrics.data.lock().unwrap().clone()
}
#[tokio::test(flavor = "current_thread")]
async fn test_flat_reader_full_match_primary_key_filter_does_not_refetch_row_group() {
let baseline = fetch_metrics_for_predicate(None).await;
let full_match =
fetch_metrics_for_predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])))
.await;
assert_eq!(
full_match.pages_to_fetch_store,
baseline.pages_to_fetch_store
);
assert_eq!(
full_match.page_size_to_fetch_store,
baseline.page_size_to_fetch_store
);
assert_eq!(full_match.page_size_needed, baseline.page_size_needed);
}
}

View File

@@ -1693,14 +1693,22 @@ impl RowGroupReaderBuilder {
&self.cache_strategy
}
/// Builds a [ParquetRecordBatchReader] for `row_group_idx` using a custom read format.
pub(crate) async fn build_with_read_format(
pub(crate) fn new_in_memory_row_group(&self, row_group_idx: usize) -> InMemoryRowGroup<'_> {
InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
&self.parquet_meta,
row_group_idx,
self.cache_strategy.clone(),
&self.file_path,
self.object_store.clone(),
)
}
fn projection_and_field_levels(
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
read_format: &ReadFormat,
) -> Result<ParquetRecordBatchReader> {
) -> Result<(ProjectionMask, FieldLevels)> {
let parquet_schema_desc = self.parquet_meta.file_metadata().schema_descr();
let projection_mask = ProjectionMask::roots(
parquet_schema_desc,
@@ -1711,14 +1719,7 @@ impl RowGroupReaderBuilder {
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadDataPartSnafu)?;
self.build_with_projection(
row_group_idx,
row_selection,
fetch_metrics,
projection_mask,
field_levels,
)
.await
Ok((projection_mask, field_levels))
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
@@ -1728,8 +1729,9 @@ impl RowGroupReaderBuilder {
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
self.build_with_projection(
row_group_idx,
let mut row_group = self.new_in_memory_row_group(row_group_idx);
self.build_on_row_group_with_projection(
&mut row_group,
row_selection,
fetch_metrics,
self.projection.clone(),
@@ -1738,9 +1740,43 @@ impl RowGroupReaderBuilder {
.await
}
async fn build_with_projection(
pub(crate) async fn build_on_row_group_with_read_format(
&self,
row_group_idx: usize,
row_group: &mut InMemoryRowGroup<'_>,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
read_format: &ReadFormat,
) -> Result<ParquetRecordBatchReader> {
let (projection, field_levels) = self.projection_and_field_levels(read_format)?;
self.build_on_row_group_with_projection(
row_group,
row_selection,
fetch_metrics,
projection,
field_levels,
)
.await
}
pub(crate) async fn build_on_row_group(
&self,
row_group: &mut InMemoryRowGroup<'_>,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
self.build_on_row_group_with_projection(
row_group,
row_selection,
fetch_metrics,
self.projection.clone(),
self.field_levels.clone(),
)
.await
}
async fn build_on_row_group_with_projection(
&self,
row_group: &mut InMemoryRowGroup<'_>,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
projection: ProjectionMask,
@@ -1748,15 +1784,6 @@ impl RowGroupReaderBuilder {
) -> Result<ParquetRecordBatchReader> {
let fetch_start = Instant::now();
let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
&self.parquet_meta,
row_group_idx,
self.cache_strategy.clone(),
&self.file_path,
self.object_store.clone(),
);
// Fetches data into memory.
row_group
.fetch(&projection, row_selection.as_ref(), fetch_metrics)
@@ -1774,7 +1801,7 @@ impl RowGroupReaderBuilder {
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(
&field_levels,
&row_group,
row_group,
DEFAULT_READ_BATCH_SIZE,
row_selection,
)