mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 17:00:37 +00:00
feat: use and cache page index from sst meta (#8139)
* feat: cache sst meta with policy aware Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * load page index when necessary Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -148,17 +148,35 @@ pub(crate) struct CachedSstMeta {
|
||||
parquet_metadata: Arc<ParquetMetaData>,
|
||||
region_metadata: RegionMetadataRef,
|
||||
region_metadata_weight: usize,
|
||||
page_index_policy: PageIndexPolicy,
|
||||
}
|
||||
|
||||
impl CachedSstMeta {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
|
||||
Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
|
||||
let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
|
||||
Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy)
|
||||
}
|
||||
|
||||
pub(crate) fn try_new_with_region_metadata(
|
||||
file_path: &str,
|
||||
parquet_metadata: ParquetMetaData,
|
||||
region_metadata: Option<RegionMetadataRef>,
|
||||
) -> Result<Self> {
|
||||
let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
|
||||
Self::try_new_with_page_index_policy(
|
||||
file_path,
|
||||
parquet_metadata,
|
||||
region_metadata,
|
||||
page_index_policy,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn try_new_with_page_index_policy(
|
||||
file_path: &str,
|
||||
parquet_metadata: ParquetMetaData,
|
||||
region_metadata: Option<RegionMetadataRef>,
|
||||
page_index_policy: PageIndexPolicy,
|
||||
) -> Result<Self> {
|
||||
let file_metadata = parquet_metadata.file_metadata();
|
||||
let key_values = file_metadata
|
||||
@@ -196,6 +214,7 @@ impl CachedSstMeta {
|
||||
parquet_metadata,
|
||||
region_metadata,
|
||||
region_metadata_weight,
|
||||
page_index_policy,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -206,6 +225,22 @@ impl CachedSstMeta {
|
||||
pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
|
||||
self.region_metadata.clone()
|
||||
}
|
||||
|
||||
fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool {
|
||||
match requested {
|
||||
PageIndexPolicy::Skip => true,
|
||||
PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip,
|
||||
PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy {
|
||||
if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() {
|
||||
PageIndexPolicy::Optional
|
||||
} else {
|
||||
PageIndexPolicy::Skip
|
||||
}
|
||||
}
|
||||
|
||||
fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
|
||||
@@ -279,10 +314,11 @@ impl CacheStrategy {
|
||||
pub(crate) fn get_sst_meta_data_from_mem_cache(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
page_index_policy: PageIndexPolicy,
|
||||
) -> Option<Arc<CachedSstMeta>> {
|
||||
match self {
|
||||
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
|
||||
cache_manager.get_sst_meta_data_from_mem_cache(file_id)
|
||||
cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy)
|
||||
}
|
||||
CacheStrategy::Disabled => None,
|
||||
}
|
||||
@@ -293,7 +329,7 @@ impl CacheStrategy {
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
) -> Option<Arc<ParquetMetaData>> {
|
||||
self.get_sst_meta_data_from_mem_cache(file_id)
|
||||
self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
|
||||
.map(|metadata| metadata.parquet_metadata())
|
||||
}
|
||||
|
||||
@@ -592,7 +628,7 @@ impl CacheManager {
|
||||
metrics: &mut MetadataCacheMetrics,
|
||||
page_index_policy: PageIndexPolicy,
|
||||
) -> Option<Arc<CachedSstMeta>> {
|
||||
if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
|
||||
if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) {
|
||||
metrics.mem_cache_hit += 1;
|
||||
return Some(metadata);
|
||||
}
|
||||
@@ -631,9 +667,12 @@ impl CacheManager {
|
||||
pub(crate) fn get_sst_meta_data_from_mem_cache(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
page_index_policy: PageIndexPolicy,
|
||||
) -> Option<Arc<CachedSstMeta>> {
|
||||
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
|
||||
let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
|
||||
let value =
|
||||
value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy));
|
||||
update_hit_miss(value, SST_META_TYPE)
|
||||
})
|
||||
}
|
||||
@@ -644,7 +683,7 @@ impl CacheManager {
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
) -> Option<Arc<ParquetMetaData>> {
|
||||
self.get_sst_meta_data_from_mem_cache(file_id)
|
||||
self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
|
||||
.map(|metadata| metadata.parquet_metadata())
|
||||
}
|
||||
|
||||
@@ -1388,6 +1427,63 @@ mod tests {
|
||||
assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parquet_meta_cache_respects_page_index_policy() {
|
||||
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let file_id = RegionFileId::new(region_id, FileId::random());
|
||||
let (metadata, _) = sst_parquet_meta();
|
||||
|
||||
let skip_metadata = Arc::new(
|
||||
CachedSstMeta::try_new_with_page_index_policy(
|
||||
"test.parquet",
|
||||
Arc::unwrap_or_clone(metadata.clone()),
|
||||
None,
|
||||
PageIndexPolicy::Skip,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
cache.put_sst_meta_data(file_id, skip_metadata);
|
||||
|
||||
let mut metrics = MetadataCacheMetrics::default();
|
||||
assert!(
|
||||
cache
|
||||
.get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
|
||||
.await
|
||||
.is_none()
|
||||
);
|
||||
assert_eq!(1, metrics.cache_miss);
|
||||
|
||||
let optional_metadata = Arc::new(
|
||||
CachedSstMeta::try_new_with_page_index_policy(
|
||||
"test.parquet",
|
||||
Arc::unwrap_or_clone(metadata),
|
||||
None,
|
||||
PageIndexPolicy::Optional,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
cache.put_sst_meta_data(file_id, optional_metadata);
|
||||
|
||||
let mut metrics = MetadataCacheMetrics::default();
|
||||
assert!(
|
||||
cache
|
||||
.get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
|
||||
.await
|
||||
.is_some()
|
||||
);
|
||||
assert_eq!(1, metrics.mem_cache_hit);
|
||||
|
||||
let mut metrics = MetadataCacheMetrics::default();
|
||||
assert!(
|
||||
cache
|
||||
.get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip)
|
||||
.await
|
||||
.is_some()
|
||||
);
|
||||
assert_eq!(1, metrics.mem_cache_hit);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
|
||||
let region_metadata = Arc::new(wide_region_metadata(128));
|
||||
|
||||
13
src/mito2/src/cache/file_cache.rs
vendored
13
src/mito2/src/cache/file_cache.rs
vendored
@@ -628,8 +628,13 @@ impl FileCache {
|
||||
let file_path = self.inner.cache_file_path(key);
|
||||
self.get_parquet_meta_data(key, cache_metrics, page_index_policy)
|
||||
.await
|
||||
.and_then(
|
||||
|metadata| match CachedSstMeta::try_new(&file_path, metadata) {
|
||||
.and_then(|metadata| {
|
||||
match CachedSstMeta::try_new_with_page_index_policy(
|
||||
&file_path,
|
||||
metadata,
|
||||
None,
|
||||
page_index_policy,
|
||||
) {
|
||||
Ok(metadata) => Some(Arc::new(metadata)),
|
||||
Err(err) => {
|
||||
CACHE_MISS
|
||||
@@ -641,8 +646,8 @@ impl FileCache {
|
||||
);
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
|
||||
|
||||
@@ -1080,6 +1080,7 @@ impl ScanInput {
|
||||
reader_metrics: &mut ReaderMetrics,
|
||||
) -> Result<FileRangeBuilder> {
|
||||
let predicate = self.predicate_for_file(file);
|
||||
let may_build_selective_row_selection = predicate.is_some();
|
||||
let decode_pk_values = !self.compaction
|
||||
&& self
|
||||
.mapper
|
||||
@@ -1095,6 +1096,11 @@ impl ScanInput {
|
||||
.inverted_index_appliers(self.inverted_index_appliers.clone())
|
||||
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
|
||||
.fulltext_index_appliers(self.fulltext_index_appliers.clone());
|
||||
let reader = if !self.compaction && may_build_selective_row_selection {
|
||||
reader.deferred_optional_page_index()
|
||||
} else {
|
||||
reader
|
||||
};
|
||||
#[cfg(feature = "vector_index")]
|
||||
let reader = {
|
||||
let mut reader = reader;
|
||||
|
||||
@@ -156,6 +156,7 @@ pub struct ParquetReaderBuilder {
|
||||
/// Whether to decode primary key values eagerly when reading primary key format SSTs.
|
||||
decode_primary_key_values: bool,
|
||||
page_index_policy: PageIndexPolicy,
|
||||
defer_optional_page_index: bool,
|
||||
}
|
||||
|
||||
impl ParquetReaderBuilder {
|
||||
@@ -186,6 +187,7 @@ impl ParquetReaderBuilder {
|
||||
pre_filter_mode: PreFilterMode::All,
|
||||
decode_primary_key_values: false,
|
||||
page_index_policy: Default::default(),
|
||||
defer_optional_page_index: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,6 +291,14 @@ impl ParquetReaderBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Defers loading optional page indexes until row-level selections can use them.
|
||||
#[must_use]
|
||||
pub(crate) fn deferred_optional_page_index(mut self) -> Self {
|
||||
self.page_index_policy = PageIndexPolicy::Optional;
|
||||
self.defer_optional_page_index = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds a [ParquetReader].
|
||||
///
|
||||
/// This needs to perform IO operation.
|
||||
@@ -337,15 +347,22 @@ impl ParquetReaderBuilder {
|
||||
let file_size = self.file_handle.meta_ref().file_size;
|
||||
|
||||
// Loads parquet metadata of the file.
|
||||
let (sst_meta, cache_miss) = self
|
||||
let initial_page_index_policy = if self.defer_optional_page_index
|
||||
&& self.page_index_policy == PageIndexPolicy::Optional
|
||||
{
|
||||
PageIndexPolicy::Skip
|
||||
} else {
|
||||
self.page_index_policy
|
||||
};
|
||||
let (sst_meta, mut cache_miss) = self
|
||||
.read_parquet_metadata(
|
||||
&file_path,
|
||||
file_size,
|
||||
&mut metrics.metadata_cache_metrics,
|
||||
self.page_index_policy,
|
||||
initial_page_index_policy,
|
||||
)
|
||||
.await?;
|
||||
let parquet_meta = sst_meta.parquet_metadata();
|
||||
let mut parquet_meta = sst_meta.parquet_metadata();
|
||||
let region_meta = sst_meta.region_metadata();
|
||||
let region_partition_expr_str = self
|
||||
.expected_metadata
|
||||
@@ -415,38 +432,12 @@ impl ParquetReaderBuilder {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Trigger background download if metadata had a cache miss and selection is not empty
|
||||
if cache_miss && !selection.is_empty() {
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
let index_key = IndexKey::new(
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id().file_id(),
|
||||
FileType::Parquet,
|
||||
);
|
||||
self.cache_strategy.maybe_download_background(
|
||||
index_key,
|
||||
file_path.clone(),
|
||||
self.object_store.clone(),
|
||||
file_size,
|
||||
);
|
||||
}
|
||||
|
||||
let prune_schema = self
|
||||
.expected_metadata
|
||||
.as_ref()
|
||||
.map(|meta| meta.schema.clone())
|
||||
.unwrap_or_else(|| region_meta.schema.clone());
|
||||
|
||||
// Create ArrowReaderMetadata for async stream building.
|
||||
let mut arrow_reader_options = ArrowReaderOptions::new();
|
||||
if !read_format.arrow_schema().has_json_extension_field() {
|
||||
arrow_reader_options =
|
||||
arrow_reader_options.with_schema(read_format.arrow_schema().clone());
|
||||
}
|
||||
let arrow_metadata =
|
||||
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
|
||||
.context(ReadDataPartSnafu)?;
|
||||
|
||||
let dyn_filters = if let Some(predicate) = &self.predicate {
|
||||
predicate.dyn_filters().as_ref().clone()
|
||||
} else {
|
||||
@@ -464,6 +455,49 @@ impl ParquetReaderBuilder {
|
||||
&codec,
|
||||
);
|
||||
|
||||
if self.defer_optional_page_index
|
||||
&& self.page_index_policy == PageIndexPolicy::Optional
|
||||
&& (filter_plan.prefilter_builder.is_some()
|
||||
|| has_row_level_selection(&selection, &parquet_meta))
|
||||
{
|
||||
let (sst_meta, page_index_cache_miss) = self
|
||||
.read_parquet_metadata(
|
||||
&file_path,
|
||||
file_size,
|
||||
&mut metrics.metadata_cache_metrics,
|
||||
PageIndexPolicy::Optional,
|
||||
)
|
||||
.await?;
|
||||
parquet_meta = sst_meta.parquet_metadata();
|
||||
cache_miss |= page_index_cache_miss;
|
||||
}
|
||||
|
||||
// Trigger background download if metadata had a cache miss and selection is not empty
|
||||
if cache_miss && !selection.is_empty() {
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
let index_key = IndexKey::new(
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id().file_id(),
|
||||
FileType::Parquet,
|
||||
);
|
||||
self.cache_strategy.maybe_download_background(
|
||||
index_key,
|
||||
file_path.clone(),
|
||||
self.object_store.clone(),
|
||||
file_size,
|
||||
);
|
||||
}
|
||||
|
||||
// Create ArrowReaderMetadata for async stream building.
|
||||
let mut arrow_reader_options = ArrowReaderOptions::new();
|
||||
if !read_format.arrow_schema().has_json_extension_field() {
|
||||
arrow_reader_options =
|
||||
arrow_reader_options.with_schema(read_format.arrow_schema().clone());
|
||||
}
|
||||
let arrow_metadata =
|
||||
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
|
||||
.context(ReadDataPartSnafu)?;
|
||||
|
||||
let output_schema = read_format.output_arrow_schema()?;
|
||||
|
||||
let reader_builder = RowGroupReaderBuilder {
|
||||
@@ -614,7 +648,12 @@ impl ParquetReaderBuilder {
|
||||
metadata_loader.with_page_index_policy(page_index_policy);
|
||||
let metadata = metadata_loader.load(cache_metrics).await?;
|
||||
|
||||
let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
|
||||
let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy(
|
||||
file_path,
|
||||
metadata,
|
||||
None,
|
||||
page_index_policy,
|
||||
)?);
|
||||
// Cache the metadata.
|
||||
self.cache_strategy
|
||||
.put_sst_meta_data(file_id, metadata.clone());
|
||||
@@ -1210,6 +1249,18 @@ impl ParquetReaderBuilder {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool {
|
||||
selection.iter().any(|(row_group_idx, row_selection)| {
|
||||
let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
row_selection.row_count() != row_group.num_rows() as usize
|
||||
|| row_selection.iter().any(|selector| selector.skip)
|
||||
})
|
||||
}
|
||||
|
||||
fn apply_selection_and_update_metrics(
|
||||
output: &mut RowGroupSelection,
|
||||
result: &RowGroupSelection,
|
||||
@@ -2171,6 +2222,7 @@ mod tests {
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use object_store::services::Memory;
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::RegionId;
|
||||
@@ -2276,6 +2328,39 @@ mod tests {
|
||||
assert!(!selection.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn test_has_row_level_selection() {
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let file_path = "row_level_selection.parquet";
|
||||
|
||||
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef;
|
||||
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
|
||||
let props = WriterProperties::builder()
|
||||
.set_max_row_group_row_count(Some(3))
|
||||
.build();
|
||||
let mut parquet_bytes = Vec::new();
|
||||
let mut writer =
|
||||
ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
writer.close().unwrap();
|
||||
let file_size = parquet_bytes.len() as u64;
|
||||
object_store.write(file_path, parquet_bytes).await.unwrap();
|
||||
|
||||
let mut cache_metrics = MetadataCacheMetrics::default();
|
||||
let loader = MetadataLoader::new(object_store, file_path, file_size);
|
||||
let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
|
||||
assert_eq!(2, parquet_meta.num_row_groups());
|
||||
|
||||
let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5);
|
||||
assert!(!has_row_level_selection(&full_row_groups, &parquet_meta));
|
||||
|
||||
let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3);
|
||||
assert!(has_row_level_selection(&prefix_selection, &parquet_meta));
|
||||
|
||||
let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3);
|
||||
assert!(has_row_level_selection(&interior_selection, &parquet_meta));
|
||||
}
|
||||
|
||||
fn expected_metadata_with_reused_tag_name(
|
||||
old_metadata: &RegionMetadata,
|
||||
) -> Arc<RegionMetadata> {
|
||||
|
||||
Reference in New Issue
Block a user