fix(mito2): fix parquet pruning and metadata cache

This commit is contained in:
Ruihang Xia
2026-02-12 01:43:37 +08:00
parent c40735e704
commit 33bf64bf45
4 changed files with 236 additions and 55 deletions

View File

@@ -137,8 +137,8 @@ impl PredicateKey {
}
/// Creates a new min-max pruning key.
pub fn new_minmax(exprs: Arc<Vec<String>>) -> Self {
Self::MinMax(MinMaxKey::new(exprs))
pub fn new_minmax(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
Self::MinMax(MinMaxKey::new(exprs, schema_version, skip_fields))
}
/// Returns the memory usage of this key.
@@ -251,13 +251,21 @@ impl InvertedIndexKey {
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct MinMaxKey {
exprs: Arc<Vec<String>>,
schema_version: u64,
skip_fields: bool,
mem_usage: usize,
}
impl MinMaxKey {
pub fn new(exprs: Arc<Vec<String>>) -> Self {
let mem_usage = exprs.iter().map(|s| s.len()).sum::<usize>();
Self { exprs, mem_usage }
pub fn new(exprs: Arc<Vec<String>>, schema_version: u64, skip_fields: bool) -> Self {
let mem_usage =
exprs.iter().map(|s| s.len()).sum::<usize>() + size_of::<u64>() + size_of::<bool>();
Self {
exprs,
schema_version,
skip_fields,
mem_usage,
}
}
}
@@ -304,6 +312,18 @@ mod tests {
assert!(cache.get(&key, non_existent_file_id).is_none());
}
#[test]
fn test_minmax_key_should_distinguish_schema_version_and_skip_fields() {
let exprs = Arc::new(vec!["col > 1".to_string()]);
let key1 = PredicateKey::new_minmax(exprs.clone(), 1, false);
let key2 = PredicateKey::new_minmax(exprs.clone(), 2, false);
assert_ne!(key1, key2);
let key3 = PredicateKey::new_minmax(exprs, 1, true);
assert_ne!(key1, key3);
}
#[test]
fn test_cache_capacity_limit() {
// Create a cache with small capacity (100 bytes)

View File

@@ -63,7 +63,7 @@ use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
@@ -950,6 +950,58 @@ fn maybe_load_cache(
///
/// This improves the latency of the first query after server start by avoiding large Parquet
/// metadata reads on demand.
async fn preload_parquet_meta_cache_for_files(
region_id: RegionId,
table_dir: String,
path_type: PathType,
object_store: object_store::ObjectStore,
cache_manager: CacheManagerRef,
mut files: Vec<FileHandle>,
) -> usize {
// Load older files first so the most recent files remain hot in the LRU cache.
files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1));
let mut loaded = 0usize;
for file_handle in files {
let file_id = file_handle.file_id();
let mut cache_metrics = MetadataCacheMetrics::default();
if cache_manager
.get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
.await
.is_some()
{
// Metadata is either already in memory or loaded from file cache.
if cache_metrics.mem_cache_hit == 0 {
loaded += 1;
}
continue;
}
let file_size = file_handle.meta_ref().file_size;
if file_size == 0 {
continue;
}
let file_path = file_handle.file_path(&table_dir, path_type);
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
match loader.load(&mut cache_metrics).await {
Ok(metadata) => {
cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata));
loaded += 1;
}
Err(err) => {
// Preloading is best-effort. Failure shouldn't affect region open.
warn!(
err; "Failed to preload parquet metadata, region: {}, file: {}",
region_id, file_path
);
}
}
}
loaded
}
fn maybe_preload_parquet_meta_cache(
region: &MitoRegionRef,
config: &MitoConfig,
@@ -969,9 +1021,9 @@ fn maybe_preload_parquet_meta_cache(
tokio::spawn(async move {
let region_id = region.region_id;
let table_dir = region.access_layer.table_dir();
let table_dir = region.access_layer.table_dir().to_string();
let path_type = region.access_layer.path_type();
let object_store = region.access_layer.object_store();
let object_store = region.access_layer.object_store().clone();
// Collect SST files. Do not hold the version longer than needed.
let mut files = Vec::new();
@@ -983,42 +1035,15 @@ fn maybe_preload_parquet_meta_cache(
}
}
}
// Load older files first so the most recent files remain hot in the LRU cache.
files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1));
let mut loaded = 0usize;
for file_handle in files {
let file_id = file_handle.file_id();
if cache_manager
.get_parquet_meta_data_from_mem_cache(file_id)
.is_some()
{
continue;
}
let file_size = file_handle.meta_ref().file_size;
if file_size == 0 {
continue;
}
let file_path = file_handle.file_path(table_dir, path_type);
let mut cache_metrics = MetadataCacheMetrics::default();
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
match loader.load(&mut cache_metrics).await {
Ok(metadata) => {
cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata));
loaded += 1;
}
Err(err) => {
// Preloading is best-effort. Failure shouldn't affect region open.
warn!(
err; "Failed to preload parquet metadata, region: {}, file: {}",
region_id, file_path
);
}
}
}
let loaded = preload_parquet_meta_cache_for_files(
region_id,
table_dir,
path_type,
object_store,
cache_manager,
files,
)
.await;
if loaded > 0 {
info!(
@@ -1043,3 +1068,116 @@ fn can_load_cache(state: RegionRoleState) -> bool {
| RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_time::Timestamp;
use datatypes::arrow::array::{ArrayRef, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use object_store::services::Memory;
use parquet::arrow::ArrowWriter;
use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
use super::preload_parquet_meta_cache_for_files;
use crate::cache::CacheManager;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::NoopFilePurger;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_preload_parquet_meta_cache_uses_file_cache() {
let env = TestEnv::new().await;
let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
let write_cache = env
.create_write_cache(local_store, ReadableSize::mb(1024))
.await;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(1024 * 1024)
.write_cache(Some(write_cache.clone()))
.build(),
);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut parquet_bytes = Vec::new();
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let file_size = parquet_bytes.len() as u64;
let file_meta = FileMeta {
region_id,
file_id,
time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
level: 0,
file_size,
max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: vec![],
index_file_size: 0,
index_version: 0,
num_rows: 3,
num_row_groups: 1,
sequence: None,
partition_expr: None,
num_series: 0,
};
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
let table_dir = "test_table";
let path_type = PathType::Bare;
let remote_path = file_handle.file_path(table_dir, path_type);
let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
source_store
.write(&remote_path, parquet_bytes)
.await
.unwrap();
// Put the parquet file into the write cache, so file cache contains metadata.
let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
write_cache
.file_cache()
.download(index_key, &remote_path, &source_store, file_size)
.await
.unwrap();
let region_file_id = file_handle.file_id();
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_none()
);
// Provide an empty remote object store (so object-store load should fail if attempted).
let missing_store = ObjectStore::new(Memory::default()).unwrap().finish();
let loaded = preload_parquet_meta_cache_for_files(
region_id,
table_dir.to_string(),
path_type,
missing_store,
cache_manager.clone(),
vec![file_handle],
)
.await;
// Should warm the in-memory cache from file cache even if remote store misses.
assert_eq!(loaded, 1);
assert!(
cache_manager
.get_parquet_meta_data_from_mem_cache(region_file_id)
.is_some()
);
}
}

View File

@@ -1159,7 +1159,16 @@ impl ParquetReaderBuilder {
.map(|expr| format!("{expr:?}"))
.collect::<Vec<_>>();
exprs.sort();
Some(PredicateKey::new_minmax(Arc::new(exprs)))
let schema_version = self
.expected_metadata
.as_ref()
.map(|meta| meta.schema_version)
.unwrap_or_else(|| read_format.metadata().schema_version);
Some(PredicateKey::new_minmax(
Arc::new(exprs),
schema_version,
skip_fields,
))
} else {
None
};

View File

@@ -116,16 +116,20 @@ impl RowGroupSelection {
};
let selection = RowSelection::from(vec![RowSelector::select(rg_row_count)]);
selection_in_rg.insert(
rg_id,
RowSelectionWithCount {
selection,
row_count: rg_row_count,
selector_len: 1,
},
);
row_count += rg_row_count;
selector_len += 1;
if selection_in_rg
.insert(
rg_id,
RowSelectionWithCount {
selection,
row_count: rg_row_count,
selector_len: 1,
},
)
.is_none()
{
row_count += rg_row_count;
selector_len += 1;
}
}
Self {
@@ -806,6 +810,16 @@ mod tests {
assert_eq!(row_selection.row_count(), 1);
}
#[test]
fn test_from_full_row_group_ids_dedup_duplicates() {
let selection = RowGroupSelection::from_full_row_group_ids([0, 0, 2, 2], 10, 25);
assert_eq!(selection.row_group_count(), 2);
assert_eq!(selection.row_count(), 15);
assert_eq!(selection.get(0).unwrap().row_count(), 10);
assert_eq!(selection.get(2).unwrap().row_count(), 5);
}
#[test]
fn test_from_row_ids() {
let row_group_size = 100;