diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 802d8f15b1..845f4f76c0 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -57,7 +57,7 @@ use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::reader::{ FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, }; -use crate::sst::parquet::row_group::ParquetFetchMetrics; +use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics}; use crate::sst::parquet::row_selection::row_selection_from_row_ranges; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -265,22 +265,18 @@ impl FileRange { false }; - let row_selection = if use_last_row_reader { - self.row_selection.clone() - } else { - self.prefiltered_flat_row_selection(fetch_metrics).await? - }; - if row_selection - .as_ref() - .is_some_and(|selection| selection.row_count() == 0) - { - return Ok(None); - } - // Compute skip_fields once for this row group let skip_fields = self.context.should_skip_fields(self.row_group_idx); let flat_prune_reader = if use_last_row_reader { + let row_selection = self.row_selection.clone(); + if row_selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return Ok(None); + } + let flat_row_group_reader = FlatRowGroupReader::new( self.context.clone(), self.context @@ -297,33 +293,66 @@ impl FileRange { ); FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) } else { - let flat_row_group_reader = FlatRowGroupReader::new( - self.context.clone(), - self.context - .reader_builder - .build(self.row_group_idx, row_selection, fetch_metrics) - .await?, - ); - FlatPruneReader::new_with_row_group_reader( - self.context.clone(), - flat_row_group_reader, - skip_fields, - ) + match self.prefiltered_flat_reader_input(fetch_metrics).await? { + PrefilteredFlatReaderInput::Selection(row_selection) => { + if row_selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return Ok(None); + } + + let flat_row_group_reader = FlatRowGroupReader::new( + self.context.clone(), + self.context + .reader_builder + .build(self.row_group_idx, row_selection, fetch_metrics) + .await?, + ); + FlatPruneReader::new_with_row_group_reader( + self.context.clone(), + flat_row_group_reader, + skip_fields, + ) + } + PrefilteredFlatReaderInput::Prefetched(mut row_group) => { + let flat_row_group_reader = FlatRowGroupReader::new( + self.context.clone(), + self.context + .reader_builder + .build_on_row_group( + &mut row_group, + self.row_selection.clone(), + fetch_metrics, + ) + .await?, + ); + FlatPruneReader::new_with_row_group_reader( + self.context.clone(), + flat_row_group_reader, + skip_fields, + ) + } + } }; Ok(Some(flat_prune_reader)) } - async fn prefiltered_flat_row_selection( - &self, + async fn prefiltered_flat_reader_input<'a>( + &'a self, fetch_metrics: Option<&ParquetFetchMetrics>, - ) -> Result> { + ) -> Result> { if !self.select_all() { - return Ok(self.row_selection.clone()); + return Ok(PrefilteredFlatReaderInput::Selection( + self.row_selection.clone(), + )); } let Some(mut primary_key_filter) = self.context.new_primary_key_filter() else { - return Ok(self.row_selection.clone()); + return Ok(PrefilteredFlatReaderInput::Selection( + self.row_selection.clone(), + )); }; let read_format = ReadFormat::new_flat( @@ -340,11 +369,15 @@ impl FileRange { self.context.file_path(), false, )?; + let mut row_group = self + .context + .reader_builder + .new_in_memory_row_group(self.row_group_idx); let reader = self .context .reader_builder - .build_with_read_format( - self.row_group_idx, + .build_on_row_group_with_read_format( + &mut row_group, self.row_selection.clone(), fetch_metrics, &read_format, @@ -375,19 +408,20 @@ impl FileRange { } if matched_row_ranges.is_empty() { - return Ok(Some(RowSelection::from(vec![]))); + return Ok(PrefilteredFlatReaderInput::Selection(Some( + RowSelection::from(vec![]), + ))); } if matched_row_ranges.len() == 1 && matched_row_ranges[0].start == 0 && matched_row_ranges[0].end == rows_in_group { - return Ok(self.row_selection.clone()); + return Ok(PrefilteredFlatReaderInput::Prefetched(row_group)); } - Ok(Some(row_selection_from_row_ranges( - matched_row_ranges.into_iter(), - rows_in_group, + Ok(PrefilteredFlatReaderInput::Selection(Some( + row_selection_from_row_ranges(matched_row_ranges.into_iter(), rows_in_group), ))) } @@ -407,6 +441,11 @@ impl FileRange { } } +enum PrefilteredFlatReaderInput<'a> { + Selection(Option), + Prefetched(InMemoryRowGroup<'a>), +} + /// Context shared by ranges of the same parquet SST. pub(crate) struct FileRangeContext { /// Row group reader builder for the file. @@ -1266,8 +1305,10 @@ impl RangeBase { #[cfg(test)] mod tests { + use std::fmt::{Debug, Formatter}; use std::sync::Arc; + use async_trait::async_trait; use datafusion_expr::{Expr, col, lit}; use datatypes::arrow::array::{ Array, ArrayRef, BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, @@ -1276,16 +1317,69 @@ mod tests { use datatypes::arrow::datatypes::{Schema, UInt32Type}; use datatypes::schema::ColumnSchema; use mito_codec::row_converter::build_primary_key_codec; + use object_store::services::Memory; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + use store_api::region_request::PathType; + use table::predicate::Predicate; use super::*; - use crate::sst::internal_fields; + use crate::access_layer::{FilePathProvider, Metrics, WriteType}; + use crate::cache::CacheStrategy; + use crate::config::IndexConfig; + use crate::sst::file::{RegionFileId, RegionIndexId}; + use crate::sst::index::{Indexer, IndexerBuilder}; + use crate::sst::parquet::WriteOptions; + use crate::sst::parquet::reader::{ParquetReaderBuilder, ReaderMetrics}; + use crate::sst::parquet::row_group::ParquetFetchMetricsData; + use crate::sst::parquet::writer::ParquetWriter; + use crate::sst::{internal_fields, location}; use crate::test_util::sst_util::{ - new_primary_key, new_sparse_primary_key, sst_region_metadata, + new_flat_source_from_record_batches, new_primary_key, new_record_batch_by_range, + new_sparse_primary_key, sst_file_handle, sst_region_metadata, sst_region_metadata_with_encoding, }; + const FILE_DIR: &str = "/"; + + #[derive(Clone)] + struct FixedPathProvider { + region_file_id: RegionFileId, + } + + impl FilePathProvider for FixedPathProvider { + fn build_index_file_path(&self, _file_id: RegionFileId) -> String { + location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare) + } + + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String { + location::index_file_path(FILE_DIR, index_id, PathType::Bare) + } + + fn build_sst_file_path(&self, _file_id: RegionFileId) -> String { + location::sst_file_path(FILE_DIR, self.region_file_id, PathType::Bare) + } + } + + struct NoopIndexBuilder; + + impl Debug for NoopIndexBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("NoopIndexBuilder") + } + } + + #[async_trait] + impl IndexerBuilder for NoopIndexBuilder { + async fn build( + &self, + _file_id: store_api::storage::FileId, + _index_version: u64, + ) -> Indexer { + Indexer::default() + } + } + fn flat_file_num_columns(metadata: &RegionMetadata) -> usize { metadata.column_metadatas.len() + 3 } @@ -1663,4 +1757,86 @@ mod tests { assert_eq!(filtered.num_rows(), 2); assert_eq!(field_values(&filtered), vec![12, 13]); } + + async fn fetch_metrics_for_predicate(predicate: Option) -> ParquetFetchMetricsData { + let object_store = object_store::ObjectStore::new(Memory::default()) + .unwrap() + .finish(); + let handle = sst_file_handle(0, 1000); + let metadata = Arc::new(sst_region_metadata()); + let source = new_flat_source_from_record_batches(vec![new_record_batch_by_range( + &["a", "d"], + 0, + 60, + )]); + let write_opts = WriteOptions { + row_group_size: 128, + ..Default::default() + }; + + let mut write_metrics = Metrics::new(WriteType::Flush); + let mut writer = ParquetWriter::new_with_object_store( + object_store.clone(), + metadata, + IndexConfig::default(), + NoopIndexBuilder, + FixedPathProvider { + region_file_id: handle.file_id(), + }, + &mut write_metrics, + ) + .await; + writer + .write_all_flat_as_primary_key(source, None, &write_opts) + .await + .unwrap(); + + let builder = + ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store) + .cache(CacheStrategy::Disabled) + .flat_format(true) + .predicate(predicate); + + let mut reader_metrics = ReaderMetrics::default(); + let (context, mut selection) = builder + .build_reader_input(&mut reader_metrics) + .await + .unwrap() + .unwrap(); + let (row_group_idx, row_selection) = selection.pop_first().unwrap(); + assert!(selection.pop_first().is_none()); + + let fetch_metrics = ParquetFetchMetrics::default(); + let mut reader = FileRange::new(Arc::new(context), row_group_idx, Some(row_selection)) + .flat_reader(None, Some(&fetch_metrics)) + .await + .unwrap() + .unwrap(); + + let mut row_count = 0; + while let Some(batch) = reader.next_batch().unwrap() { + row_count += batch.num_rows(); + } + assert_eq!(row_count, 60); + + fetch_metrics.data.lock().unwrap().clone() + } + + #[tokio::test(flavor = "current_thread")] + async fn test_flat_reader_full_match_primary_key_filter_does_not_refetch_row_group() { + let baseline = fetch_metrics_for_predicate(None).await; + let full_match = + fetch_metrics_for_predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))]))) + .await; + + assert_eq!( + full_match.pages_to_fetch_store, + baseline.pages_to_fetch_store + ); + assert_eq!( + full_match.page_size_to_fetch_store, + baseline.page_size_to_fetch_store + ); + assert_eq!(full_match.page_size_needed, baseline.page_size_needed); + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 67564e6445..cd275d7769 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1693,14 +1693,22 @@ impl RowGroupReaderBuilder { &self.cache_strategy } - /// Builds a [ParquetRecordBatchReader] for `row_group_idx` using a custom read format. - pub(crate) async fn build_with_read_format( + pub(crate) fn new_in_memory_row_group(&self, row_group_idx: usize) -> InMemoryRowGroup<'_> { + InMemoryRowGroup::create( + self.file_handle.region_id(), + self.file_handle.file_id().file_id(), + &self.parquet_meta, + row_group_idx, + self.cache_strategy.clone(), + &self.file_path, + self.object_store.clone(), + ) + } + + fn projection_and_field_levels( &self, - row_group_idx: usize, - row_selection: Option, - fetch_metrics: Option<&ParquetFetchMetrics>, read_format: &ReadFormat, - ) -> Result { + ) -> Result<(ProjectionMask, FieldLevels)> { let parquet_schema_desc = self.parquet_meta.file_metadata().schema_descr(); let projection_mask = ProjectionMask::roots( parquet_schema_desc, @@ -1711,14 +1719,7 @@ impl RowGroupReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadDataPartSnafu)?; - self.build_with_projection( - row_group_idx, - row_selection, - fetch_metrics, - projection_mask, - field_levels, - ) - .await + Ok((projection_mask, field_levels)) } /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. @@ -1728,8 +1729,9 @@ impl RowGroupReaderBuilder { row_selection: Option, fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result { - self.build_with_projection( - row_group_idx, + let mut row_group = self.new_in_memory_row_group(row_group_idx); + self.build_on_row_group_with_projection( + &mut row_group, row_selection, fetch_metrics, self.projection.clone(), @@ -1738,9 +1740,43 @@ impl RowGroupReaderBuilder { .await } - async fn build_with_projection( + pub(crate) async fn build_on_row_group_with_read_format( &self, - row_group_idx: usize, + row_group: &mut InMemoryRowGroup<'_>, + row_selection: Option, + fetch_metrics: Option<&ParquetFetchMetrics>, + read_format: &ReadFormat, + ) -> Result { + let (projection, field_levels) = self.projection_and_field_levels(read_format)?; + self.build_on_row_group_with_projection( + row_group, + row_selection, + fetch_metrics, + projection, + field_levels, + ) + .await + } + + pub(crate) async fn build_on_row_group( + &self, + row_group: &mut InMemoryRowGroup<'_>, + row_selection: Option, + fetch_metrics: Option<&ParquetFetchMetrics>, + ) -> Result { + self.build_on_row_group_with_projection( + row_group, + row_selection, + fetch_metrics, + self.projection.clone(), + self.field_levels.clone(), + ) + .await + } + + async fn build_on_row_group_with_projection( + &self, + row_group: &mut InMemoryRowGroup<'_>, row_selection: Option, fetch_metrics: Option<&ParquetFetchMetrics>, projection: ProjectionMask, @@ -1748,15 +1784,6 @@ impl RowGroupReaderBuilder { ) -> Result { let fetch_start = Instant::now(); - let mut row_group = InMemoryRowGroup::create( - self.file_handle.region_id(), - self.file_handle.file_id().file_id(), - &self.parquet_meta, - row_group_idx, - self.cache_strategy.clone(), - &self.file_path, - self.object_store.clone(), - ); // Fetches data into memory. row_group .fetch(&projection, row_selection.as_ref(), fetch_metrics) @@ -1774,7 +1801,7 @@ impl RowGroupReaderBuilder { // Now the row selection is None. ParquetRecordBatchReader::try_new_with_row_groups( &field_levels, - &row_group, + row_group, DEFAULT_READ_BATCH_SIZE, row_selection, )