feat: cache sst meta with policy aware

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-19 15:13:04 +08:00
parent d4cfbd3400
commit db68aea7c2
4 changed files with 123 additions and 10 deletions

View File

@@ -148,17 +148,35 @@ pub(crate) struct CachedSstMeta {
parquet_metadata: Arc<ParquetMetaData>,
region_metadata: RegionMetadataRef,
region_metadata_weight: usize,
page_index_policy: PageIndexPolicy,
}
impl CachedSstMeta {
#[cfg(test)]
pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy)
}
pub(crate) fn try_new_with_region_metadata(
file_path: &str,
parquet_metadata: ParquetMetaData,
region_metadata: Option<RegionMetadataRef>,
) -> Result<Self> {
let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
Self::try_new_with_page_index_policy(
file_path,
parquet_metadata,
region_metadata,
page_index_policy,
)
}
pub(crate) fn try_new_with_page_index_policy(
file_path: &str,
parquet_metadata: ParquetMetaData,
region_metadata: Option<RegionMetadataRef>,
page_index_policy: PageIndexPolicy,
) -> Result<Self> {
let file_metadata = parquet_metadata.file_metadata();
let key_values = file_metadata
@@ -196,6 +214,7 @@ impl CachedSstMeta {
parquet_metadata,
region_metadata,
region_metadata_weight,
page_index_policy,
})
}
@@ -206,6 +225,22 @@ impl CachedSstMeta {
pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
self.region_metadata.clone()
}
fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool {
match requested {
PageIndexPolicy::Skip => true,
PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip,
PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required,
}
}
}
fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy {
if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() {
PageIndexPolicy::Optional
} else {
PageIndexPolicy::Skip
}
}
fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
@@ -279,10 +314,11 @@ impl CacheStrategy {
pub(crate) fn get_sst_meta_data_from_mem_cache(
&self,
file_id: RegionFileId,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<CachedSstMeta>> {
match self {
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_sst_meta_data_from_mem_cache(file_id)
cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy)
}
CacheStrategy::Disabled => None,
}
@@ -293,7 +329,7 @@ impl CacheStrategy {
&self,
file_id: RegionFileId,
) -> Option<Arc<ParquetMetaData>> {
self.get_sst_meta_data_from_mem_cache(file_id)
self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
.map(|metadata| metadata.parquet_metadata())
}
@@ -592,7 +628,7 @@ impl CacheManager {
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<CachedSstMeta>> {
if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) {
metrics.mem_cache_hit += 1;
return Some(metadata);
}
@@ -631,9 +667,12 @@ impl CacheManager {
pub(crate) fn get_sst_meta_data_from_mem_cache(
&self,
file_id: RegionFileId,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<CachedSstMeta>> {
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
let value =
value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy));
update_hit_miss(value, SST_META_TYPE)
})
}
@@ -644,7 +683,7 @@ impl CacheManager {
&self,
file_id: RegionFileId,
) -> Option<Arc<ParquetMetaData>> {
self.get_sst_meta_data_from_mem_cache(file_id)
self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
.map(|metadata| metadata.parquet_metadata())
}
@@ -1388,6 +1427,63 @@ mod tests {
assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
}
#[tokio::test]
async fn test_parquet_meta_cache_respects_page_index_policy() {
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
let region_id = RegionId::new(1, 1);
let file_id = RegionFileId::new(region_id, FileId::random());
let (metadata, _) = sst_parquet_meta();
let skip_metadata = Arc::new(
CachedSstMeta::try_new_with_page_index_policy(
"test.parquet",
Arc::unwrap_or_clone(metadata.clone()),
None,
PageIndexPolicy::Skip,
)
.unwrap(),
);
cache.put_sst_meta_data(file_id, skip_metadata);
let mut metrics = MetadataCacheMetrics::default();
assert!(
cache
.get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
.await
.is_none()
);
assert_eq!(1, metrics.cache_miss);
let optional_metadata = Arc::new(
CachedSstMeta::try_new_with_page_index_policy(
"test.parquet",
Arc::unwrap_or_clone(metadata),
None,
PageIndexPolicy::Optional,
)
.unwrap(),
);
cache.put_sst_meta_data(file_id, optional_metadata);
let mut metrics = MetadataCacheMetrics::default();
assert!(
cache
.get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
.await
.is_some()
);
assert_eq!(1, metrics.mem_cache_hit);
let mut metrics = MetadataCacheMetrics::default();
assert!(
cache
.get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip)
.await
.is_some()
);
assert_eq!(1, metrics.mem_cache_hit);
}
#[test]
fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
let region_metadata = Arc::new(wide_region_metadata(128));

View File

@@ -628,8 +628,13 @@ impl FileCache {
let file_path = self.inner.cache_file_path(key);
self.get_parquet_meta_data(key, cache_metrics, page_index_policy)
.await
.and_then(
|metadata| match CachedSstMeta::try_new(&file_path, metadata) {
.and_then(|metadata| {
match CachedSstMeta::try_new_with_page_index_policy(
&file_path,
metadata,
None,
page_index_policy,
) {
Ok(metadata) => Some(Arc::new(metadata)),
Err(err) => {
CACHE_MISS
@@ -641,8 +646,8 @@ impl FileCache {
);
None
}
},
)
}
})
}
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {

View File

@@ -37,6 +37,7 @@ use datatypes::schema::Schema;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type::JsonNativeType;
use futures::StreamExt;
use parquet::file::metadata::PageIndexPolicy;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::ResultExt;
@@ -1080,6 +1081,7 @@ impl ScanInput {
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let predicate = self.predicate_for_file(file);
let may_build_selective_row_selection = predicate.is_some();
let decode_pk_values = !self.compaction
&& self
.mapper
@@ -1095,6 +1097,11 @@ impl ScanInput {
.inverted_index_appliers(self.inverted_index_appliers.clone())
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone());
let reader = if !self.compaction && may_build_selective_row_selection {
reader.page_index_policy(PageIndexPolicy::Optional)
} else {
reader
};
#[cfg(feature = "vector_index")]
let reader = {
let mut reader = reader;

View File

@@ -614,7 +614,12 @@ impl ParquetReaderBuilder {
metadata_loader.with_page_index_policy(page_index_policy);
let metadata = metadata_loader.load(cache_metrics).await?;
let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy(
file_path,
metadata,
None,
page_index_policy,
)?);
// Cache the metadata.
self.cache_strategy
.put_sst_meta_data(file_id, metadata.clone());