From ddb34fec2e7d0d89347941854c6e7728ff235842 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 13 Mar 2026 14:36:39 +0800 Subject: [PATCH] cache decoded region metadata Signed-off-by: Ruihang Xia --- src/mito2/src/cache.rs | 260 ++++++++++++++++++++++------ src/mito2/src/cache/file_cache.rs | 30 +++- src/mito2/src/cache/test_util.rs | 30 +++- src/mito2/src/cache/write_cache.rs | 8 +- src/mito2/src/region/opener.rs | 32 +++- src/mito2/src/sst/parquet.rs | 6 +- src/mito2/src/sst/parquet/reader.rs | 55 ++---- 7 files changed, 315 insertions(+), 106 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 3ad71d2a61..57fb11a8d1 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -28,6 +28,7 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; +use common_telemetry::warn; use datatypes::arrow::record_batch::RecordBatch; use datatypes::value::Value; use datatypes::vectors::VectorRef; @@ -36,8 +37,10 @@ use index::result_cache::IndexResultCache; use moka::notification::RemovalCause; use moka::sync::Cache; use object_store::ObjectStore; -use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; +use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData}; use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef}; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector}; use crate::cache::cache_size::parquet_meta_size; @@ -46,10 +49,12 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache #[cfg(feature = "vector_index")] use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef}; use crate::cache::write_cache::WriteCacheRef; +use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result}; use crate::memtable::record_batch_estimated_size; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; use crate::sst::file::{RegionFileId, RegionIndexId}; +use crate::sst::parquet::PARQUET_METADATA_KEY; use crate::sst::parquet::reader::MetadataCacheMetrics; /// Metrics type key for sst meta. @@ -65,6 +70,96 @@ const INDEX_TYPE: &str = "index"; /// Metrics type key for selector result cache. const SELECTOR_RESULT_TYPE: &str = "selector_result"; +/// Cached SST metadata combines the parquet footer with the decoded region metadata. +/// +/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded +/// [RegionMetadata] separately so readers can skip repeated deserialization work. +#[derive(Debug)] +pub(crate) struct CachedSstMeta { + parquet_metadata: Arc, + region_metadata: RegionMetadataRef, + region_metadata_size_hint: usize, +} + +impl CachedSstMeta { + pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result { + let (region_metadata, region_metadata_size_hint) = { + let file_metadata = parquet_metadata.file_metadata(); + let key_values = file_metadata + .key_value_metadata() + .context(InvalidParquetSnafu { + file: file_path, + reason: "missing key value meta", + })?; + let meta_value = key_values + .iter() + .find(|kv| kv.key == PARQUET_METADATA_KEY) + .with_context(|| InvalidParquetSnafu { + file: file_path, + reason: format!("key {} not found", PARQUET_METADATA_KEY), + })?; + let json = meta_value + .value + .as_ref() + .with_context(|| InvalidParquetSnafu { + file: file_path, + reason: format!("No value for key {}", PARQUET_METADATA_KEY), + })?; + let region_metadata = Arc::new( + store_api::metadata::RegionMetadata::from_json(json) + .context(InvalidMetadataSnafu)?, + ); + (region_metadata, json.len()) + }; + let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata)); + + Ok(Self { + parquet_metadata, + region_metadata, + region_metadata_size_hint, + }) + } + + pub(crate) fn parquet_metadata(&self) -> Arc { + self.parquet_metadata.clone() + } + + pub(crate) fn region_metadata(&self) -> RegionMetadataRef { + self.region_metadata.clone() + } +} + +fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData { + let file_metadata = parquet_metadata.file_metadata(); + let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| { + let filtered = key_values + .iter() + .filter(|kv| kv.key != PARQUET_METADATA_KEY) + .cloned() + .collect::>(); + (!filtered.is_empty()).then_some(filtered) + }); + let stripped_file_metadata = FileMetaData::new( + file_metadata.version(), + file_metadata.num_rows(), + file_metadata.created_by().map(ToString::to_string), + filtered_key_values, + file_metadata.schema_descr_ptr(), + file_metadata.column_orders().cloned(), + ); + + let mut builder = parquet_metadata.into_builder(); + let row_groups = builder.take_row_groups(); + let column_index = builder.take_column_index(); + let offset_index = builder.take_offset_index(); + + parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata) + .set_row_groups(row_groups) + .set_column_index(column_index) + .set_offset_index(offset_index) + .build() +} + /// Cache strategies that may only enable a subset of caches. #[derive(Clone)] pub enum CacheStrategy { @@ -81,18 +176,17 @@ pub enum CacheStrategy { } impl CacheStrategy { - /// Gets parquet metadata with cache metrics tracking. - /// Returns the metadata and updates the provided metrics. - pub(crate) async fn get_parquet_meta_data( + /// Gets fused SST metadata with cache metrics tracking. + pub(crate) async fn get_sst_meta_data( &self, file_id: RegionFileId, metrics: &mut MetadataCacheMetrics, page_index_policy: PageIndexPolicy, - ) -> Option> { + ) -> Option> { match self { CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => { cache_manager - .get_parquet_meta_data(file_id, metrics, page_index_policy) + .get_sst_meta_data(file_id, metrics, page_index_policy) .await } CacheStrategy::Disabled => { @@ -102,19 +196,35 @@ impl CacheStrategy { } } + /// Calls [CacheManager::get_sst_meta_data_from_mem_cache()]. + pub(crate) fn get_sst_meta_data_from_mem_cache( + &self, + file_id: RegionFileId, + ) -> Option> { + match self { + CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => { + cache_manager.get_sst_meta_data_from_mem_cache(file_id) + } + CacheStrategy::Disabled => None, + } + } + /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()]. pub fn get_parquet_meta_data_from_mem_cache( &self, file_id: RegionFileId, ) -> Option> { + self.get_sst_meta_data_from_mem_cache(file_id) + .map(|metadata| metadata.parquet_metadata()) + } + + /// Calls [CacheManager::put_sst_meta_data()]. + pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc) { match self { - CacheStrategy::EnableAll(cache_manager) => { - cache_manager.get_parquet_meta_data_from_mem_cache(file_id) + CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => { + cache_manager.put_sst_meta_data(file_id, metadata); } - CacheStrategy::Compaction(cache_manager) => { - cache_manager.get_parquet_meta_data_from_mem_cache(file_id) - } - CacheStrategy::Disabled => None, + CacheStrategy::Disabled => {} } } @@ -336,6 +446,35 @@ impl CacheManager { CacheManagerBuilder::default() } + /// Gets fused SST metadata with metrics tracking. + /// Tries in-memory cache first, then file cache, updating metrics accordingly. + pub(crate) async fn get_sst_meta_data( + &self, + file_id: RegionFileId, + metrics: &mut MetadataCacheMetrics, + page_index_policy: PageIndexPolicy, + ) -> Option> { + if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) { + metrics.mem_cache_hit += 1; + return Some(metadata); + } + + let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet); + if let Some(write_cache) = &self.write_cache + && let Some(metadata) = write_cache + .file_cache() + .get_sst_meta_data(key, metrics, page_index_policy) + .await + { + metrics.file_cache_hit += 1; + self.put_sst_meta_data(file_id, metadata.clone()); + return Some(metadata); + } + + metrics.cache_miss += 1; + None + } + /// Gets cached [ParquetMetaData] with metrics tracking. /// Tries in-memory cache first, then file cache, updating metrics accordingly. pub(crate) async fn get_parquet_meta_data( @@ -344,29 +483,21 @@ impl CacheManager { metrics: &mut MetadataCacheMetrics, page_index_policy: PageIndexPolicy, ) -> Option> { - // Try to get metadata from sst meta cache - if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) { - metrics.mem_cache_hit += 1; - return Some(metadata); - } + self.get_sst_meta_data(file_id, metrics, page_index_policy) + .await + .map(|metadata| metadata.parquet_metadata()) + } - // Try to get metadata from write cache - let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet); - if let Some(write_cache) = &self.write_cache - && let Some(metadata) = write_cache - .file_cache() - .get_parquet_meta_data(key, metrics, page_index_policy) - .await - { - metrics.file_cache_hit += 1; - let metadata = Arc::new(metadata); - // Put metadata into sst meta cache - self.put_parquet_meta_data(file_id, metadata.clone()); - return Some(metadata); - }; - metrics.cache_miss += 1; - - None + /// Gets cached fused SST metadata from in-memory cache. + /// This method does not perform I/O. + pub(crate) fn get_sst_meta_data_from_mem_cache( + &self, + file_id: RegionFileId, + ) -> Option> { + self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { + let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id())); + update_hit_miss(value, SST_META_TYPE) + }) } /// Gets cached [ParquetMetaData] from in-memory cache. @@ -375,15 +506,12 @@ impl CacheManager { &self, file_id: RegionFileId, ) -> Option> { - // Try to get metadata from sst meta cache - self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { - let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id())); - update_hit_miss(value, SST_META_TYPE) - }) + self.get_sst_meta_data_from_mem_cache(file_id) + .map(|metadata| metadata.parquet_metadata()) } - /// Puts [ParquetMetaData] into the cache. - pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc) { + /// Puts fused SST metadata into the cache. + pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc) { if let Some(cache) = &self.sst_meta_cache { let key = SstMetaKey(file_id.region_id(), file_id.file_id()); CACHE_BYTES @@ -393,6 +521,25 @@ impl CacheManager { } } + /// Puts [ParquetMetaData] into the cache. + pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc) { + if self.sst_meta_cache.is_some() { + let file_path = format!( + "region_id={}, file_id={}", + file_id.region_id(), + file_id.file_id() + ); + match CachedSstMeta::try_new(&file_path, Arc::unwrap_or_clone(metadata)) { + Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)), + Err(err) => warn!( + err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}", + file_id.region_id(), + file_id.file_id() + ), + } + } + } + /// Removes [ParquetMetaData] from the cache. pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) { if let Some(cache) = &self.sst_meta_cache { @@ -728,9 +875,10 @@ impl CacheManagerBuilder { } } -fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { +fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { // We ignore the size of `Arc`. - (k.estimated_size() + parquet_meta_size(v)) as u32 + (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_size_hint) + as u32 } fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 { @@ -892,8 +1040,8 @@ impl SelectorResultValue { } } -/// Maps (region id, file id) to [ParquetMetaData]. -type SstMetaCache = Cache>; +/// Maps (region id, file id) to fused SST metadata. +type SstMetaCache = Cache>; /// Maps [Value] to a vector that holds this value repeatedly. /// /// e.g. `"hello" => ["hello", "hello", "hello"]` @@ -915,7 +1063,7 @@ mod tests { use super::*; use crate::cache::index::bloom_filter_index::Tag; use crate::cache::index::result_cache::PredicateKey; - use crate::cache::test_util::parquet_meta; + use crate::cache::test_util::{parquet_meta, sst_parquet_meta}; use crate::sst::parquet::row_selection::RowGroupSelection; #[tokio::test] @@ -966,13 +1114,23 @@ mod tests { .await .is_none() ); - let metadata = parquet_meta(); + let (metadata, region_metadata) = sst_parquet_meta(); cache.put_parquet_meta_data(file_id, metadata); + let cached = cache + .get_sst_meta_data(file_id, &mut metrics, Default::default()) + .await + .unwrap(); + assert_eq!(region_metadata, cached.region_metadata()); assert!( - cache - .get_parquet_meta_data(file_id, &mut metrics, Default::default()) - .await - .is_some() + cached + .parquet_metadata() + .file_metadata() + .key_value_metadata() + .is_none_or(|key_values| { + key_values + .iter() + .all(|key_value| key_value.key != PARQUET_METADATA_KEY) + }) ); cache.remove_parquet_meta_data(file_id); assert!( diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 32a276d0e4..278838b369 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -34,7 +34,7 @@ use store_api::storage::{FileId, RegionId}; use tokio::sync::mpsc::{Sender, UnboundedReceiver}; use crate::access_layer::TempFileCleaner; -use crate::cache::{FILE_TYPE, INDEX_TYPE}; +use crate::cache::{CachedSstMeta, FILE_TYPE, INDEX_TYPE}; use crate::error::{self, OpenDalSnafu, Result}; use crate::metrics::{ CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, @@ -612,6 +612,34 @@ impl FileCache { } } + /// Get fused SST metadata from the file cache. + /// If the file is not in the cache, or metadata loading/decoding fails, return None. + pub(crate) async fn get_sst_meta_data( + &self, + key: IndexKey, + cache_metrics: &mut MetadataCacheMetrics, + page_index_policy: PageIndexPolicy, + ) -> Option> { + let file_path = self.inner.cache_file_path(key); + self.get_parquet_meta_data(key, cache_metrics, page_index_policy) + .await + .and_then( + |metadata| match CachedSstMeta::try_new(&file_path, metadata) { + Ok(metadata) => Some(Arc::new(metadata)), + Err(err) => { + CACHE_MISS + .with_label_values(&[key.file_type.metric_label()]) + .inc(); + warn!( + err; "Failed to decode cached parquet metadata for key {:?}", + key + ); + None + } + }, + ) + } + async fn get_reader(&self, file_path: &str) -> object_store::Result> { if self.inner.local_store.exists(file_path).await? { Ok(Some(self.inner.local_store.reader(file_path).await?)) diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs index 65ad9d87eb..e3de745d5d 100644 --- a/src/mito2/src/cache/test_util.rs +++ b/src/mito2/src/cache/test_util.rs @@ -23,8 +23,13 @@ use object_store::ObjectStore; use object_store::services::Fs; use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{KeyValue, ParquetMetaData}; +use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; +use store_api::metadata::RegionMetadataRef; + +use crate::sst::parquet::PARQUET_METADATA_KEY; +use crate::test_util::sst_util::sst_region_metadata; /// Returns a parquet meta data. pub(crate) fn parquet_meta() -> Arc { @@ -33,13 +38,34 @@ pub(crate) fn parquet_meta() -> Arc { builder.metadata().clone() } +/// Returns parquet metadata for an SST parquet file and its decoded region metadata. +pub(crate) fn sst_parquet_meta() -> (Arc, RegionMetadataRef) { + let region_metadata = Arc::new(sst_region_metadata()); + let file_data = parquet_file_data_with_region_metadata(®ion_metadata); + let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(file_data)).unwrap(); + (builder.metadata().clone(), region_metadata) +} + /// Write a test parquet file to a buffer fn parquet_file_data() -> Vec { + parquet_file_data_inner(None) +} + +fn parquet_file_data_with_region_metadata(region_metadata: &RegionMetadataRef) -> Vec { + let json = region_metadata.to_json().unwrap(); + let key_value = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); + parquet_file_data_inner(Some(vec![key_value])) +} + +fn parquet_file_data_inner(key_value_metadata: Option>) -> Vec { let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); let mut buffer = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap(); + let props = WriterProperties::builder() + .set_key_value_metadata(key_value_metadata) + .build(); + let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), Some(props)).unwrap(); writer.write(&to_write).unwrap(); writer.close().unwrap(); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index a28df3f54c..9c86d896cf 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -686,9 +686,15 @@ mod tests { .cache(CacheStrategy::EnableAll(cache_manager.clone())) .page_index_policy(PageIndexPolicy::Optional); let reader = builder.build().await.unwrap().unwrap(); + let cached_write_parquet_metadata = crate::cache::CachedSstMeta::try_new( + "test.sst", + Arc::unwrap_or_clone(write_parquet_metadata), + ) + .unwrap() + .parquet_metadata(); // Check parquet metadata - assert_parquet_metadata_equal(write_parquet_metadata, reader.parquet_metadata()); + assert_parquet_metadata_equal(cached_write_parquet_metadata, reader.parquet_metadata()); } #[tokio::test] diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 014c50820f..bb2bc23337 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -1153,6 +1153,8 @@ mod tests { use object_store::ObjectStore; use object_store::services::{Fs, Memory}; use parquet::arrow::ArrowWriter; + use parquet::file::metadata::KeyValue; + use parquet::file::properties::WriterProperties; use store_api::region_request::PathType; use store_api::storage::{FileId, RegionId}; @@ -1161,7 +1163,27 @@ mod tests { use crate::cache::file_cache::{FileType, IndexKey}; use crate::sst::file::{FileHandle, FileMeta}; use crate::sst::file_purger::NoopFilePurger; + use crate::sst::parquet::PARQUET_METADATA_KEY; use crate::test_util::TestEnv; + use crate::test_util::sst_util::sst_region_metadata; + + fn sst_parquet_bytes(batch: &RecordBatch) -> Vec { + let key_value_meta = KeyValue::new( + PARQUET_METADATA_KEY.to_string(), + sst_region_metadata().to_json().unwrap(), + ); + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![key_value_meta])) + .build(); + + let mut parquet_bytes = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + + parquet_bytes + } #[tokio::test] async fn test_preload_parquet_meta_cache_uses_file_cache() { @@ -1183,10 +1205,7 @@ mod tests { let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); - let mut parquet_bytes = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); + let parquet_bytes = sst_parquet_bytes(&batch); let file_size = parquet_bytes.len() as u64; let file_meta = FileMeta { @@ -1334,10 +1353,7 @@ mod tests { let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); - let mut parquet_bytes = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); + let parquet_bytes = sst_parquet_bytes(&batch); // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat // the local filesystem to retrieve it. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index aa98b69176..2ffc474ea2 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -382,8 +382,12 @@ mod tests { .page_index_policy(PageIndexPolicy::Optional); let reader = builder.build().await.unwrap().unwrap(); let reader_metadata = reader.parquet_metadata(); + let cached_writer_metadata = + crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata)) + .unwrap() + .parquet_metadata(); - assert_parquet_metadata_equal(writer_metadata, reader_metadata); + assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata); } #[tokio::test] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 500f32ae91..d25481f5b5 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -35,22 +35,21 @@ use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; -use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData}; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; use partition::expr::PartitionExpr; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::region_request::PathType; use store_api::storage::{ColumnId, FileId}; use table::predicate::Predicate; -use crate::cache::CacheStrategy; use crate::cache::index::result_cache::PredicateKey; +use crate::cache::{CacheStrategy, CachedSstMeta}; #[cfg(feature = "vector_index")] use crate::error::ApplyVectorIndexSnafu; use crate::error::{ - ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu, - ReadParquetSnafu, Result, SerializePartitionExprSnafu, + ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu, }; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, @@ -71,6 +70,7 @@ use crate::sst::index::inverted_index::applier::{ }; #[cfg(feature = "vector_index")] use crate::sst::index::vector_index::applier::VectorIndexApplierRef; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, row_group_contains_delete, @@ -80,7 +80,6 @@ use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics}; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; -use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; use crate::sst::tag_maybe_to_dictionary_field; const INDEX_TYPE_FULLTEXT: &str = "fulltext"; @@ -338,7 +337,7 @@ impl ParquetReaderBuilder { let file_size = self.file_handle.meta_ref().file_size; // Loads parquet metadata of the file. - let (parquet_meta, cache_miss) = self + let (sst_meta, cache_miss) = self .read_parquet_metadata( &file_path, file_size, @@ -346,9 +345,8 @@ impl ParquetReaderBuilder { self.page_index_policy, ) .await?; - // Decodes region metadata. - let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); - let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); + let parquet_meta = sst_meta.parquet_metadata(); + let region_meta = sst_meta.region_metadata(); let region_partition_expr_str = self .expected_metadata .as_ref() @@ -599,42 +597,15 @@ impl ParquetReaderBuilder { })) } - /// Decodes region metadata from key value. - fn get_region_metadata( - file_path: &str, - key_value_meta: Option<&Vec>, - ) -> Result { - let key_values = key_value_meta.context(InvalidParquetSnafu { - file: file_path, - reason: "missing key value meta", - })?; - let meta_value = key_values - .iter() - .find(|kv| kv.key == PARQUET_METADATA_KEY) - .with_context(|| InvalidParquetSnafu { - file: file_path, - reason: format!("key {} not found", PARQUET_METADATA_KEY), - })?; - let json = meta_value - .value - .as_ref() - .with_context(|| InvalidParquetSnafu { - file: file_path, - reason: format!("No value for key {}", PARQUET_METADATA_KEY), - })?; - - RegionMetadata::from_json(json).context(InvalidMetadataSnafu) - } - /// Reads parquet metadata of specific file. - /// Returns (metadata, cache_miss_flag). + /// Returns (fused metadata, cache_miss_flag). async fn read_parquet_metadata( &self, file_path: &str, file_size: u64, cache_metrics: &mut MetadataCacheMetrics, page_index_policy: PageIndexPolicy, - ) -> Result<(Arc, bool)> { + ) -> Result<(Arc, bool)> { let start = Instant::now(); let _t = READ_STAGE_ELAPSED .with_label_values(&["read_parquet_metadata"]) @@ -644,7 +615,7 @@ impl ParquetReaderBuilder { // Tries to get from cache with metrics tracking. if let Some(metadata) = self .cache_strategy - .get_parquet_meta_data(file_id, cache_metrics, page_index_policy) + .get_sst_meta_data(file_id, cache_metrics, page_index_policy) .await { cache_metrics.metadata_load_cost += start.elapsed(); @@ -657,10 +628,10 @@ impl ParquetReaderBuilder { metadata_loader.with_page_index_policy(page_index_policy); let metadata = metadata_loader.load(cache_metrics).await?; - let metadata = Arc::new(metadata); + let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?); // Cache the metadata. self.cache_strategy - .put_parquet_meta_data(file_id, metadata.clone()); + .put_sst_meta_data(file_id, metadata.clone()); cache_metrics.metadata_load_cost += start.elapsed(); Ok((metadata, true))