perf(mito2): speed up parquet scan via minmax caches (#7708)

* perf(mito2): speed up parquet scan via meta caches

* fix(mito2): fix parquet pruning and metadata cache

* revert config changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* enhance cache file enter logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve tiny cr comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* only preload from fs or cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix vector test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-07 14:32:47 +08:00
committed by GitHub
parent 93c48a078c
commit a71df9477a
7 changed files with 757 additions and 44 deletions

View File

@@ -400,6 +400,19 @@ impl CacheManager {
}
}
/// Returns the total weighted size of the in-memory SST meta cache.
pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
self.sst_meta_cache
.as_ref()
.map(|cache| cache.weighted_size())
.unwrap_or(0)
}
/// Returns true if the in-memory SST meta cache is enabled.
pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
self.sst_meta_cache.is_some()
}
/// Gets a vector with repeated value for specific `key`.
pub fn get_repeated_vector(
&self,

View File

@@ -116,6 +116,8 @@ pub enum PredicateKey {
Bloom(BloomFilterKey),
/// Inverted index predicate.
Inverted(InvertedIndexKey),
/// Min-max pruning predicate.
MinMax(MinMaxKey),
}
impl PredicateKey {
@@ -134,12 +136,18 @@ impl PredicateKey {
Self::Inverted(InvertedIndexKey::new(predicates))
}
/// Creates a new min-max pruning key.
pub fn new_minmax(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
Self::MinMax(MinMaxKey::new(exprs, schema_version, skip_fields))
}
/// Returns the memory usage of this key.
pub fn mem_usage(&self) -> usize {
match self {
Self::Fulltext(key) => key.mem_usage,
Self::Bloom(key) => key.mem_usage,
Self::Inverted(key) => key.mem_usage,
Self::MinMax(key) => key.mem_usage,
}
}
}
@@ -239,6 +247,30 @@ impl InvertedIndexKey {
}
}
/// Key for min-max pruning.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct MinMaxKey {
exprs: Arc<Vec<String>>,
schema_version: u64,
skip_fields: bool,
mem_usage: usize,
}
impl MinMaxKey {
pub fn new(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
let mem_usage = size_of::<Self>()
+ size_of::<Vec<String>>()
+ exprs.len() * size_of::<String>()
+ exprs.iter().map(|s| s.len()).sum::<usize>();
Self {
exprs,
schema_version,
skip_fields,
mem_usage,
}
}
}
#[cfg(test)]
#[allow(clippy::single_range_in_vec_init)]
mod tests {
@@ -282,6 +314,18 @@ mod tests {
assert!(cache.get(&key, non_existent_file_id).is_none());
}
#[test]
fn test_minmax_key_should_distinguish_schema_version_and_skip_fields() {
let exprs = Arc::new(vec!["col > 1".to_string()]);
let key1 = PredicateKey::new_minmax(exprs.clone(), 1, false);
let key2 = PredicateKey::new_minmax(exprs.clone(), 2, false);
assert_ne!(key1, key2);
let key3 = PredicateKey::new_minmax(exprs, 1, true);
assert_ne!(key1, key3);
}
#[test]
fn test_cache_capacity_limit() {
// Create a cache with small capacity (100 bytes)

View File

@@ -182,6 +182,10 @@ pub(crate) struct ScanMetricsSet {
bloom_filter_cache_hit: usize,
/// Number of index result cache misses for bloom filter index.
bloom_filter_cache_miss: usize,
/// Number of index result cache hits for minmax pruning.
minmax_cache_hit: usize,
/// Number of index result cache misses for minmax pruning.
minmax_cache_miss: usize,
/// Number of pruner builder cache hits.
pruner_cache_hit: usize,
/// Number of pruner builder cache misses.
@@ -308,6 +312,8 @@ impl fmt::Debug for ScanMetricsSet {
inverted_index_cache_miss,
bloom_filter_cache_hit,
bloom_filter_cache_miss,
minmax_cache_hit,
minmax_cache_miss,
pruner_cache_hit,
pruner_cache_miss,
pruner_prune_cost,
@@ -433,6 +439,12 @@ impl fmt::Debug for ScanMetricsSet {
if *bloom_filter_cache_miss > 0 {
write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
}
if *minmax_cache_hit > 0 {
write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
}
if *minmax_cache_miss > 0 {
write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
}
if *pruner_cache_hit > 0 {
write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
}
@@ -639,6 +651,8 @@ impl ScanMetricsSet {
inverted_index_cache_miss,
bloom_filter_cache_hit,
bloom_filter_cache_miss,
minmax_cache_hit,
minmax_cache_miss,
pruner_cache_hit,
pruner_cache_miss,
pruner_prune_cost,
@@ -680,6 +694,8 @@ impl ScanMetricsSet {
self.inverted_index_cache_miss += *inverted_index_cache_miss;
self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
self.minmax_cache_hit += *minmax_cache_hit;
self.minmax_cache_miss += *minmax_cache_miss;
self.pruner_cache_hit += *pruner_cache_hit;
self.pruner_cache_miss += *pruner_cache_miss;
self.pruner_prune_cost += *pruner_prune_cost;

View File

@@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use common_error::ext::BoxedError;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::compute;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
@@ -47,7 +47,7 @@ pub(crate) struct ConvertBatchStream {
projection_mapper: Arc<ProjectionMapper>,
cache_strategy: CacheStrategy,
partition_metrics: PartitionMetrics,
buffer: Vec<DfRecordBatch>,
pending: VecDeque<RecordBatch>,
}
impl ConvertBatchStream {
@@ -62,7 +62,7 @@ impl ConvertBatchStream {
projection_mapper,
cache_strategy,
partition_metrics,
buffer: Vec::new(),
pending: VecDeque::new(),
}
}
@@ -79,40 +79,36 @@ impl ConvertBatchStream {
}
}
ScanBatch::Series(series) => {
self.buffer.clear();
debug_assert!(
self.pending.is_empty(),
"ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
);
match series {
SeriesBatch::PrimaryKey(primary_key_batch) => {
self.buffer.reserve(primary_key_batch.batches.len());
// Safety: Only primary key format returns this batch.
let mapper = self.projection_mapper.as_primary_key().unwrap();
for batch in primary_key_batch.batches {
let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
self.buffer.push(record_batch.into_df_record_batch());
self.pending
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
}
}
SeriesBatch::Flat(flat_batch) => {
self.buffer.reserve(flat_batch.batches.len());
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
for batch in flat_batch.batches {
let record_batch = mapper.convert(&batch)?;
self.buffer.push(record_batch.into_df_record_batch());
self.pending.push_back(mapper.convert(&batch)?);
}
}
}
let output_schema = self.projection_mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;
Ok(RecordBatch::from_df_record_batch(
output_schema,
record_batch,
))
Ok(self
.pending
.pop_front()
.unwrap_or_else(|| RecordBatch::new_empty(output_schema)))
}
ScanBatch::RecordBatch(df_record_batch) => {
// Safety: Only flat format returns this batch.
@@ -128,6 +124,10 @@ impl Stream for ConvertBatchStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.pending.pop_front() {
return Poll::Ready(Some(Ok(batch)));
}
let batch = futures::ready!(self.inner.poll_next_unpin(cx));
let Some(batch) = batch else {
return Poll::Ready(None);

View File

@@ -17,7 +17,7 @@
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, LazyLock, Mutex};
use std::time::Instant;
use common_telemetry::{debug, error, info, warn};
@@ -38,6 +38,7 @@ use store_api::metadata::{
use store_api::region_engine::RegionRole;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, RegionId};
use tokio::sync::Semaphore;
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
@@ -63,16 +64,23 @@ use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location::{self, region_dir_from_table_dir};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::reader::MetadataCacheMetrics;
use crate::time_provider::TimeProviderRef;
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};
const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
/// A fetcher to retrieve partition expr for a region.
///
/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
@@ -585,6 +593,7 @@ impl RegionOpener {
let region = Arc::new(region);
maybe_load_cache(&region, config, &self.cache_manager);
maybe_preload_parquet_meta_cache(&region, config, &self.cache_manager);
Ok(Some(region))
}
@@ -973,6 +982,150 @@ fn maybe_load_cache(
write_cache.load_region_cache(task);
}
/// Preloads Parquet metadata into the in-memory SST meta cache on region open.
///
/// This improves the latency of the first query after server start by avoiding large Parquet
/// metadata reads on demand.
///
/// The preload is best-effort:
/// - Always tries to warm from the local write cache (file cache) first.
/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata
/// directly from the local store.
/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...).
async fn preload_parquet_meta_cache_for_files(
region_id: RegionId,
cache_manager: CacheManagerRef,
sst_meta_cache_capacity: u64,
table_dir: String,
path_type: PathType,
object_store: object_store::ObjectStore,
mut files: Vec<FileHandle>,
) -> usize {
if !cache_manager.sst_meta_cache_enabled()
|| sst_meta_cache_capacity == 0
|| cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
{
return 0;
}
let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
// Sort by time range so we can prefer preloading newer files first.
files.sort_by(|a, b| b.meta_ref().time_range.1.cmp(&a.meta_ref().time_range.1));
let mut loaded = 0usize;
for file_handle in files {
// Stop when the shared SST meta cache is full.
if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
break;
}
let file_id = file_handle.file_id();
let mut cache_metrics = MetadataCacheMetrics::default();
if cache_manager
.get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
.await
.is_some()
{
// Metadata is either already in memory or loaded from file cache.
if cache_metrics.mem_cache_hit == 0 {
loaded += 1;
}
continue;
}
if !allow_direct_load {
continue;
}
let file_size = file_handle.meta_ref().file_size;
let file_path = file_handle.file_path(&table_dir, path_type);
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
match loader.load(&mut cache_metrics).await {
Ok(metadata) => {
cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata));
loaded += 1;
}
Err(err) => {
// Preloading is best-effort. Failure shouldn't affect region open.
warn!(
err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
region_id, file_path
);
}
}
}
loaded
}
fn maybe_preload_parquet_meta_cache(
region: &MitoRegionRef,
config: &MitoConfig,
cache_manager: &Option<CacheManagerRef>,
) {
let Some(cache_manager) = cache_manager else {
return;
};
if !cache_manager.sst_meta_cache_enabled() {
return;
}
// Skip if SST meta cache is disabled.
if config.sst_meta_cache_size.as_bytes() == 0 {
return;
}
if !config.preload_index_cache {
return;
}
let region = region.clone();
let cache_manager = cache_manager.clone();
let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
tokio::spawn(async move {
// Safety: semaphore must exist.
let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
let region_id = region.region_id;
let table_dir = region.access_layer.table_dir().to_string();
let path_type = region.access_layer.path_type();
let object_store = region.access_layer.object_store().clone();
// Collect SST files. Do not hold the version longer than needed.
let mut files = Vec::new();
{
let version = region.version_control.current().version;
for level in version.ssts.levels() {
for file_handle in level.files.values() {
files.push(file_handle.clone());
}
}
}
let preloading_start = Instant::now();
let loaded = preload_parquet_meta_cache_for_files(
region_id,
cache_manager,
sst_meta_cache_capacity,
table_dir,
path_type,
object_store,
files,
)
.await;
let preloading_cost = preloading_start.elapsed();
if loaded > 0 {
info!(
"Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
region_id,
loaded,
preloading_cost.as_millis()
);
}
});
}
fn can_load_cache(state: RegionRoleState) -> bool {
match state {
RegionRoleState::Leader(RegionLeaderState::Writable)
@@ -987,3 +1140,259 @@ fn can_load_cache(state: RegionRoleState) -> bool {
| RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use common_time::Timestamp;
use datatypes::arrow::array::{ArrayRef, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use object_store::services::{Fs, Memory};
use parquet::arrow::ArrowWriter;
use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
use super::preload_parquet_meta_cache_for_files;
use crate::cache::CacheManager;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::NoopFilePurger;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_preload_parquet_meta_cache_uses_file_cache() {
let env = TestEnv::new().await;
let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
let write_cache = env
.create_write_cache(local_store, ReadableSize::mb(1024))
.await;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(1024 * 1024)
.write_cache(Some(write_cache.clone()))
.build(),
);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut parquet_bytes = Vec::new();
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let file_size = parquet_bytes.len() as u64;
let file_meta = FileMeta {
region_id,
file_id,
time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
level: 0,
file_size,
max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: vec![],
index_file_size: 0,
index_version: 0,
num_rows: 3,
num_row_groups: 1,
sequence: None,
partition_expr: None,
num_series: 0,
};
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
let table_dir = "test_table";
let path_type = PathType::Bare;
let remote_path = file_handle.file_path(table_dir, path_type);
let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
source_store
.write(&remote_path, parquet_bytes)
.await
.unwrap();
// Put the parquet file into the write cache, so file cache contains metadata.
let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
write_cache
.file_cache()
.download(index_key, &remote_path, &source_store, file_size)
.await
.unwrap();
let region_file_id = file_handle.file_id();
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_none()
);
let loaded = preload_parquet_meta_cache_for_files(
region_id,
cache_manager.clone(),
1024 * 1024,
table_dir.to_string(),
path_type,
source_store.clone(),
vec![file_handle],
)
.await;
// Should warm the in-memory cache from the local file cache.
assert_eq!(loaded, 1);
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_some()
);
}
#[tokio::test]
async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(1024 * 1024)
.build(),
);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
// Without a local file cache entry, preloading should skip the file.
let file_meta = FileMeta {
region_id,
file_id,
time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
level: 0,
file_size: 0,
max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: vec![],
index_file_size: 0,
index_version: 0,
num_rows: 3,
num_row_groups: 1,
sequence: None,
partition_expr: None,
num_series: 0,
};
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
let table_dir = "test_table";
let path_type = PathType::Bare;
let remote_path = file_handle.file_path(table_dir, path_type);
// Even if the remote object store has the file, we should not preload from it.
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
object_store
.write(&remote_path, b"noop".as_slice())
.await
.unwrap();
let region_file_id = file_handle.file_id();
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_none()
);
let loaded = preload_parquet_meta_cache_for_files(
region_id,
cache_manager.clone(),
1024 * 1024,
table_dir.to_string(),
path_type,
object_store,
vec![file_handle],
)
.await;
assert_eq!(loaded, 0);
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_none()
);
}
#[tokio::test]
async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(1024 * 1024)
.build(),
);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut parquet_bytes = Vec::new();
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
// file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
// the local filesystem to retrieve it.
let file_meta = FileMeta {
region_id,
file_id,
time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
level: 0,
file_size: 0,
max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: vec![],
index_file_size: 0,
index_version: 0,
num_rows: 3,
num_row_groups: 1,
sequence: None,
partition_expr: None,
num_series: 0,
};
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
let table_dir = "test_table";
let path_type = PathType::Bare;
let file_path = file_handle.file_path(table_dir, path_type);
let root = create_temp_dir("parquet-meta-preload");
let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
.unwrap()
.finish();
object_store.write(&file_path, parquet_bytes).await.unwrap();
let region_file_id = file_handle.file_id();
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_none()
);
let loaded = preload_parquet_meta_cache_for_files(
region_id,
cache_manager.clone(),
1024 * 1024,
table_dir.to_string(),
path_type,
object_store,
vec![file_handle],
)
.await;
assert_eq!(loaded, 1);
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_some()
);
}
}

View File

@@ -679,15 +679,14 @@ impl ParquetReaderBuilder {
metrics.rg_total += num_row_groups;
metrics.rows_total += num_rows as usize;
let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
// Compute skip_fields once for all pruning operations
let skip_fields = self.compute_skip_fields(parquet_meta);
self.prune_row_groups_by_minmax(
let mut output = self.row_groups_by_minmax(
read_format,
parquet_meta,
&mut output,
row_group_size,
num_rows as usize,
metrics,
skip_fields,
);
@@ -1136,20 +1135,59 @@ impl ParquetReaderBuilder {
}
}
/// Prunes row groups by min-max index.
fn prune_row_groups_by_minmax(
/// Computes row groups selection after min-max pruning.
fn row_groups_by_minmax(
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
output: &mut RowGroupSelection,
row_group_size: usize,
total_row_count: usize,
metrics: &mut ReaderFilterMetrics,
skip_fields: bool,
) -> bool {
) -> RowGroupSelection {
let Some(predicate) = &self.predicate else {
return false;
return RowGroupSelection::new(row_group_size, total_row_count);
};
let row_groups_before = output.row_group_count();
let file_id = self.file_handle.file_id().file_id();
let index_result_cache = self.cache_strategy.index_result_cache();
let cached_minmax_key =
if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
// Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
// building row-group pruning stats for identical predicates across queries.
let mut exprs = predicate
.exprs()
.iter()
.map(|expr| format!("{expr:?}"))
.collect::<Vec<_>>();
exprs.sort();
let schema_version = self
.expected_metadata
.as_ref()
.map(|meta| meta.schema_version)
.unwrap_or_else(|| read_format.metadata().schema_version);
Some(PredicateKey::new_minmax(
Arc::new(exprs),
schema_version,
skip_fields,
))
} else {
None
};
if let Some(index_result_cache) = index_result_cache
&& let Some(predicate_key) = cached_minmax_key.as_ref()
{
if let Some(result) = index_result_cache.get(predicate_key, file_id) {
metrics.minmax_cache_hit += 1;
let num_row_groups = parquet_meta.num_row_groups();
metrics.rg_minmax_filtered +=
num_row_groups.saturating_sub(result.row_group_count());
return (*result).clone();
}
metrics.minmax_cache_miss += 1;
}
let region_meta = read_format.metadata();
let row_groups = parquet_meta.row_groups();
@@ -1168,20 +1206,26 @@ impl ParquetReaderBuilder {
// Here we use the schema of the SST to build the physical expression. If the column
// in the SST doesn't have the same column id as the column in the expected metadata,
// we will get a None statistics for that column.
predicate
.prune_with_stats(&stats, prune_schema)
.iter()
.zip(0..parquet_meta.num_row_groups())
.for_each(|(mask, row_group)| {
if !*mask {
output.remove_row_group(row_group);
}
});
let mask = predicate.prune_with_stats(&stats, prune_schema);
let output = RowGroupSelection::from_full_row_group_ids(
mask.iter()
.enumerate()
.filter_map(|(row_group, keep)| keep.then_some(row_group)),
row_group_size,
total_row_count,
);
let row_groups_after = output.row_group_count();
metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
metrics.rg_minmax_filtered += parquet_meta
.num_row_groups()
.saturating_sub(output.row_group_count());
true
if let Some(index_result_cache) = index_result_cache
&& let Some(predicate_key) = cached_minmax_key
{
index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
}
output
}
fn apply_index_result_and_update_cache(
@@ -1295,6 +1339,10 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) bloom_filter_cache_hit: usize,
/// Number of index result cache misses for bloom filter index.
pub(crate) bloom_filter_cache_miss: usize,
/// Number of index result cache hits for minmax pruning.
pub(crate) minmax_cache_hit: usize,
/// Number of index result cache misses for minmax pruning.
pub(crate) minmax_cache_miss: usize,
/// Optional metrics for inverted index applier.
pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
@@ -1335,6 +1383,8 @@ impl ReaderFilterMetrics {
self.inverted_index_cache_miss += other.inverted_index_cache_miss;
self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
self.minmax_cache_hit += other.minmax_cache_hit;
self.minmax_cache_miss += other.minmax_cache_miss;
self.pruner_cache_hit += other.pruner_cache_hit;
self.pruner_cache_miss += other.pruner_cache_miss;
@@ -1423,7 +1473,7 @@ impl ReaderFilterMetrics {
}
#[cfg(all(test, feature = "vector_index"))]
mod tests {
mod vector_index_tests {
use super::*;
#[test]
@@ -2120,3 +2170,112 @@ impl FlatRowGroupReader {
}
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, LazyLock};
use datafusion::arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use datatypes::arrow::array::{ArrayRef, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::services::Memory;
use parquet::arrow::ArrowWriter;
use store_api::region_request::PathType;
use table::predicate::Predicate;
use super::*;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
#[tokio::test(flavor = "current_thread")]
async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
#[derive(Eq, PartialEq, Hash)]
struct PanicDebugUdf;
impl Debug for PanicDebugUdf {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
panic!("minmax predicate key should not format exprs when cache is disabled");
}
}
impl ScalarUDFImpl for PanicDebugUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"panic_debug_udf"
}
fn signature(&self) -> &Signature {
static SIGNATURE: LazyLock<Signature> =
LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
&SIGNATURE
}
fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
}
}
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_handle = sst_file_handle(0, 1);
let table_dir = "test_table".to_string();
let path_type = PathType::Bare;
let file_path = file_handle.file_path(&table_dir, path_type);
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut parquet_bytes = Vec::new();
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let file_size = parquet_bytes.len() as u64;
object_store.write(&file_path, parquet_bytes).await.unwrap();
let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format =
ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap();
let mut cache_metrics = MetadataCacheMetrics::default();
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
udf,
vec![],
))]);
let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
.predicate(Some(predicate))
.cache(CacheStrategy::Disabled);
let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
let mut metrics = ReaderFilterMetrics::default();
let selection = builder.row_groups_by_minmax(
&read_format,
&parquet_meta,
row_group_size,
total_row_count,
&mut metrics,
false,
);
assert!(!selection.is_empty());
}
}

View File

@@ -77,6 +77,68 @@ impl RowGroupSelection {
}
}
/// Creates a new `RowGroupSelection` that selects all rows in the specified row groups.
///
/// This is useful for fast construction after coarse pruning (e.g. min-max pruning),
/// avoiding building and then removing a full selection of all row groups.
pub fn from_full_row_group_ids<I>(
row_group_ids: I,
row_group_size: usize,
total_row_count: usize,
) -> Self
where
I: IntoIterator<Item = usize>,
{
if row_group_size == 0 || total_row_count == 0 {
return Self::default();
}
let row_group_count = total_row_count.div_ceil(row_group_size);
if row_group_count == 0 {
return Self::default();
}
let last_row_group_size = total_row_count - (row_group_count - 1) * row_group_size;
let mut selection_in_rg = BTreeMap::new();
let mut row_count = 0usize;
let mut selector_len = 0usize;
for rg_id in row_group_ids {
if rg_id >= row_group_count {
continue;
}
let rg_row_count = if rg_id == row_group_count - 1 {
last_row_group_size
} else {
row_group_size
};
let selection = RowSelection::from(vec![RowSelector::select(rg_row_count)]);
if selection_in_rg
.insert(
rg_id,
RowSelectionWithCount {
selection,
row_count: rg_row_count,
selector_len: 1,
},
)
.is_none()
{
row_count += rg_row_count;
selector_len += 1;
}
}
Self {
selection_in_rg,
row_count,
selector_len,
}
}
/// Returns the row selection for a given row group.
///
/// `None` indicates not selected.
@@ -748,6 +810,16 @@ mod tests {
assert_eq!(row_selection.row_count(), 1);
}
#[test]
fn test_from_full_row_group_ids_dedup_duplicates() {
let selection = RowGroupSelection::from_full_row_group_ids([0, 0, 2, 2], 10, 25);
assert_eq!(selection.row_group_count(), 2);
assert_eq!(selection.row_count(), 15);
assert_eq!(selection.get(0).unwrap().row_count(), 10);
assert_eq!(selection.get(2).unwrap().row_count(), 5);
}
#[test]
fn test_from_row_ids() {
let row_group_size = 100;