cache decoded region metadata

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-13 14:36:39 +08:00
parent 3beb538aa8
commit ddb34fec2e
7 changed files with 315 additions and 106 deletions

View File

@@ -28,6 +28,7 @@ use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use common_telemetry::warn;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
@@ -36,8 +37,10 @@ use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use object_store::ObjectStore;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
use crate::cache::cache_size::parquet_meta_size;
@@ -46,10 +49,12 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache
#[cfg(feature = "vector_index")]
use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
use crate::cache::write_cache::WriteCacheRef;
use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result};
use crate::memtable::record_batch_estimated_size;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::parquet::PARQUET_METADATA_KEY;
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
@@ -65,6 +70,96 @@ const INDEX_TYPE: &str = "index";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Cached SST metadata combines the parquet footer with the decoded region metadata.
///
/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded
/// [RegionMetadata] separately so readers can skip repeated deserialization work.
#[derive(Debug)]
pub(crate) struct CachedSstMeta {
parquet_metadata: Arc<ParquetMetaData>,
region_metadata: RegionMetadataRef,
region_metadata_size_hint: usize,
}
impl CachedSstMeta {
pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
let (region_metadata, region_metadata_size_hint) = {
let file_metadata = parquet_metadata.file_metadata();
let key_values = file_metadata
.key_value_metadata()
.context(InvalidParquetSnafu {
file: file_path,
reason: "missing key value meta",
})?;
let meta_value = key_values
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("key {} not found", PARQUET_METADATA_KEY),
})?;
let json = meta_value
.value
.as_ref()
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("No value for key {}", PARQUET_METADATA_KEY),
})?;
let region_metadata = Arc::new(
store_api::metadata::RegionMetadata::from_json(json)
.context(InvalidMetadataSnafu)?,
);
(region_metadata, json.len())
};
let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
Ok(Self {
parquet_metadata,
region_metadata,
region_metadata_size_hint,
})
}
pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.parquet_metadata.clone()
}
pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
self.region_metadata.clone()
}
}
fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
let file_metadata = parquet_metadata.file_metadata();
let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
let filtered = key_values
.iter()
.filter(|kv| kv.key != PARQUET_METADATA_KEY)
.cloned()
.collect::<Vec<_>>();
(!filtered.is_empty()).then_some(filtered)
});
let stripped_file_metadata = FileMetaData::new(
file_metadata.version(),
file_metadata.num_rows(),
file_metadata.created_by().map(ToString::to_string),
filtered_key_values,
file_metadata.schema_descr_ptr(),
file_metadata.column_orders().cloned(),
);
let mut builder = parquet_metadata.into_builder();
let row_groups = builder.take_row_groups();
let column_index = builder.take_column_index();
let offset_index = builder.take_offset_index();
parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
.set_row_groups(row_groups)
.set_column_index(column_index)
.set_offset_index(offset_index)
.build()
}
/// Cache strategies that may only enable a subset of caches.
#[derive(Clone)]
pub enum CacheStrategy {
@@ -81,18 +176,17 @@ pub enum CacheStrategy {
}
impl CacheStrategy {
/// Gets parquet metadata with cache metrics tracking.
/// Returns the metadata and updates the provided metrics.
pub(crate) async fn get_parquet_meta_data(
/// Gets fused SST metadata with cache metrics tracking.
pub(crate) async fn get_sst_meta_data(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<ParquetMetaData>> {
) -> Option<Arc<CachedSstMeta>> {
match self {
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data(file_id, metrics, page_index_policy)
.get_sst_meta_data(file_id, metrics, page_index_policy)
.await
}
CacheStrategy::Disabled => {
@@ -102,19 +196,35 @@ impl CacheStrategy {
}
}
/// Calls [CacheManager::get_sst_meta_data_from_mem_cache()].
pub(crate) fn get_sst_meta_data_from_mem_cache(
&self,
file_id: RegionFileId,
) -> Option<Arc<CachedSstMeta>> {
match self {
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_sst_meta_data_from_mem_cache(file_id)
}
CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
pub fn get_parquet_meta_data_from_mem_cache(
&self,
file_id: RegionFileId,
) -> Option<Arc<ParquetMetaData>> {
self.get_sst_meta_data_from_mem_cache(file_id)
.map(|metadata| metadata.parquet_metadata())
}
/// Calls [CacheManager::put_sst_meta_data()].
pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
cache_manager.put_sst_meta_data(file_id, metadata);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
}
CacheStrategy::Disabled => None,
CacheStrategy::Disabled => {}
}
}
@@ -336,6 +446,35 @@ impl CacheManager {
CacheManagerBuilder::default()
}
/// Gets fused SST metadata with metrics tracking.
/// Tries in-memory cache first, then file cache, updating metrics accordingly.
pub(crate) async fn get_sst_meta_data(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<CachedSstMeta>> {
if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
metrics.mem_cache_hit += 1;
return Some(metadata);
}
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache
.file_cache()
.get_sst_meta_data(key, metrics, page_index_policy)
.await
{
metrics.file_cache_hit += 1;
self.put_sst_meta_data(file_id, metadata.clone());
return Some(metadata);
}
metrics.cache_miss += 1;
None
}
/// Gets cached [ParquetMetaData] with metrics tracking.
/// Tries in-memory cache first, then file cache, updating metrics accordingly.
pub(crate) async fn get_parquet_meta_data(
@@ -344,29 +483,21 @@ impl CacheManager {
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
metrics.mem_cache_hit += 1;
return Some(metadata);
}
self.get_sst_meta_data(file_id, metrics, page_index_policy)
.await
.map(|metadata| metadata.parquet_metadata())
}
// Try to get metadata from write cache
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache
.file_cache()
.get_parquet_meta_data(key, metrics, page_index_policy)
.await
{
metrics.file_cache_hit += 1;
let metadata = Arc::new(metadata);
// Put metadata into sst meta cache
self.put_parquet_meta_data(file_id, metadata.clone());
return Some(metadata);
};
metrics.cache_miss += 1;
None
/// Gets cached fused SST metadata from in-memory cache.
/// This method does not perform I/O.
pub(crate) fn get_sst_meta_data_from_mem_cache(
&self,
file_id: RegionFileId,
) -> Option<Arc<CachedSstMeta>> {
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
update_hit_miss(value, SST_META_TYPE)
})
}
/// Gets cached [ParquetMetaData] from in-memory cache.
@@ -375,15 +506,12 @@ impl CacheManager {
&self,
file_id: RegionFileId,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
update_hit_miss(value, SST_META_TYPE)
})
self.get_sst_meta_data_from_mem_cache(file_id)
.map(|metadata| metadata.parquet_metadata())
}
/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
/// Puts fused SST metadata into the cache.
pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
if let Some(cache) = &self.sst_meta_cache {
let key = SstMetaKey(file_id.region_id(), file_id.file_id());
CACHE_BYTES
@@ -393,6 +521,25 @@ impl CacheManager {
}
}
/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
if self.sst_meta_cache.is_some() {
let file_path = format!(
"region_id={}, file_id={}",
file_id.region_id(),
file_id.file_id()
);
match CachedSstMeta::try_new(&file_path, Arc::unwrap_or_clone(metadata)) {
Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
Err(err) => warn!(
err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
file_id.region_id(),
file_id.file_id()
),
}
}
}
/// Removes [ParquetMetaData] from the cache.
pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
if let Some(cache) = &self.sst_meta_cache {
@@ -728,9 +875,10 @@ impl CacheManagerBuilder {
}
}
fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
(k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_size_hint)
as u32
}
fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
@@ -892,8 +1040,8 @@ impl SelectorResultValue {
}
}
/// Maps (region id, file id) to [ParquetMetaData].
type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
/// Maps (region id, file id) to fused SST metadata.
type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
/// Maps [Value] to a vector that holds this value repeatedly.
///
/// e.g. `"hello" => ["hello", "hello", "hello"]`
@@ -915,7 +1063,7 @@ mod tests {
use super::*;
use crate::cache::index::bloom_filter_index::Tag;
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::test_util::parquet_meta;
use crate::cache::test_util::{parquet_meta, sst_parquet_meta};
use crate::sst::parquet::row_selection::RowGroupSelection;
#[tokio::test]
@@ -966,13 +1114,23 @@ mod tests {
.await
.is_none()
);
let metadata = parquet_meta();
let (metadata, region_metadata) = sst_parquet_meta();
cache.put_parquet_meta_data(file_id, metadata);
let cached = cache
.get_sst_meta_data(file_id, &mut metrics, Default::default())
.await
.unwrap();
assert_eq!(region_metadata, cached.region_metadata());
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_some()
cached
.parquet_metadata()
.file_metadata()
.key_value_metadata()
.is_none_or(|key_values| {
key_values
.iter()
.all(|key_value| key_value.key != PARQUET_METADATA_KEY)
})
);
cache.remove_parquet_meta_data(file_id);
assert!(

View File

@@ -34,7 +34,7 @@ use store_api::storage::{FileId, RegionId};
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use crate::access_layer::TempFileCleaner;
use crate::cache::{FILE_TYPE, INDEX_TYPE};
use crate::cache::{CachedSstMeta, FILE_TYPE, INDEX_TYPE};
use crate::error::{self, OpenDalSnafu, Result};
use crate::metrics::{
CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
@@ -612,6 +612,34 @@ impl FileCache {
}
}
/// Get fused SST metadata from the file cache.
/// If the file is not in the cache, or metadata loading/decoding fails, return None.
pub(crate) async fn get_sst_meta_data(
&self,
key: IndexKey,
cache_metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<CachedSstMeta>> {
let file_path = self.inner.cache_file_path(key);
self.get_parquet_meta_data(key, cache_metrics, page_index_policy)
.await
.and_then(
|metadata| match CachedSstMeta::try_new(&file_path, metadata) {
Ok(metadata) => Some(Arc::new(metadata)),
Err(err) => {
CACHE_MISS
.with_label_values(&[key.file_type.metric_label()])
.inc();
warn!(
err; "Failed to decode cached parquet metadata for key {:?}",
key
);
None
}
},
)
}
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.inner.local_store.exists(file_path).await? {
Ok(Some(self.inner.local_store.reader(file_path).await?))

View File

@@ -23,8 +23,13 @@ use object_store::ObjectStore;
use object_store::services::Fs;
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{KeyValue, ParquetMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics;
use store_api::metadata::RegionMetadataRef;
use crate::sst::parquet::PARQUET_METADATA_KEY;
use crate::test_util::sst_util::sst_region_metadata;
/// Returns a parquet meta data.
pub(crate) fn parquet_meta() -> Arc<ParquetMetaData> {
@@ -33,13 +38,34 @@ pub(crate) fn parquet_meta() -> Arc<ParquetMetaData> {
builder.metadata().clone()
}
/// Returns parquet metadata for an SST parquet file and its decoded region metadata.
pub(crate) fn sst_parquet_meta() -> (Arc<ParquetMetaData>, RegionMetadataRef) {
let region_metadata = Arc::new(sst_region_metadata());
let file_data = parquet_file_data_with_region_metadata(&region_metadata);
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(file_data)).unwrap();
(builder.metadata().clone(), region_metadata)
}
/// Write a test parquet file to a buffer
fn parquet_file_data() -> Vec<u8> {
parquet_file_data_inner(None)
}
fn parquet_file_data_with_region_metadata(region_metadata: &RegionMetadataRef) -> Vec<u8> {
let json = region_metadata.to_json().unwrap();
let key_value = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
parquet_file_data_inner(Some(vec![key_value]))
}
fn parquet_file_data_inner(key_value_metadata: Option<Vec<KeyValue>>) -> Vec<u8> {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
let props = WriterProperties::builder()
.set_key_value_metadata(key_value_metadata)
.build();
let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).unwrap();
writer.close().unwrap();

View File

@@ -686,9 +686,15 @@ mod tests {
.cache(CacheStrategy::EnableAll(cache_manager.clone()))
.page_index_policy(PageIndexPolicy::Optional);
let reader = builder.build().await.unwrap().unwrap();
let cached_write_parquet_metadata = crate::cache::CachedSstMeta::try_new(
"test.sst",
Arc::unwrap_or_clone(write_parquet_metadata),
)
.unwrap()
.parquet_metadata();
// Check parquet metadata
assert_parquet_metadata_equal(write_parquet_metadata, reader.parquet_metadata());
assert_parquet_metadata_equal(cached_write_parquet_metadata, reader.parquet_metadata());
}
#[tokio::test]

View File

@@ -1153,6 +1153,8 @@ mod tests {
use object_store::ObjectStore;
use object_store::services::{Fs, Memory};
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
@@ -1161,7 +1163,27 @@ mod tests {
use crate::cache::file_cache::{FileType, IndexKey};
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::parquet::PARQUET_METADATA_KEY;
use crate::test_util::TestEnv;
use crate::test_util::sst_util::sst_region_metadata;
fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
let key_value_meta = KeyValue::new(
PARQUET_METADATA_KEY.to_string(),
sst_region_metadata().to_json().unwrap(),
);
let props = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.build();
let mut parquet_bytes = Vec::new();
let mut writer =
ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
writer.write(batch).unwrap();
writer.close().unwrap();
parquet_bytes
}
#[tokio::test]
async fn test_preload_parquet_meta_cache_uses_file_cache() {
@@ -1183,10 +1205,7 @@ mod tests {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut parquet_bytes = Vec::new();
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let parquet_bytes = sst_parquet_bytes(&batch);
let file_size = parquet_bytes.len() as u64;
let file_meta = FileMeta {
@@ -1334,10 +1353,7 @@ mod tests {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut parquet_bytes = Vec::new();
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let parquet_bytes = sst_parquet_bytes(&batch);
// file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
// the local filesystem to retrieve it.

View File

@@ -382,8 +382,12 @@ mod tests {
.page_index_policy(PageIndexPolicy::Optional);
let reader = builder.build().await.unwrap().unwrap();
let reader_metadata = reader.parquet_metadata();
let cached_writer_metadata =
crate::cache::CachedSstMeta::try_new("test.sst", Arc::unwrap_or_clone(writer_metadata))
.unwrap()
.parquet_metadata();
assert_parquet_metadata_equal(writer_metadata, reader_metadata);
assert_parquet_metadata_equal(cached_writer_metadata, reader_metadata);
}
#[tokio::test]

View File

@@ -35,22 +35,21 @@ use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use partition::expr::PartitionExpr;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId};
use table::predicate::Predicate;
use crate::cache::CacheStrategy;
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::{CacheStrategy, CachedSstMeta};
#[cfg(feature = "vector_index")]
use crate::error::ApplyVectorIndexSnafu;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
ReadParquetSnafu, Result, SerializePartitionExprSnafu,
ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
@@ -71,6 +70,7 @@ use crate::sst::index::inverted_index::applier::{
};
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
row_group_contains_delete,
@@ -80,7 +80,6 @@ use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
use crate::sst::tag_maybe_to_dictionary_field;
const INDEX_TYPE_FULLTEXT: &str = "fulltext";
@@ -338,7 +337,7 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
let (parquet_meta, cache_miss) = self
let (sst_meta, cache_miss) = self
.read_parquet_metadata(
&file_path,
file_size,
@@ -346,9 +345,8 @@ impl ParquetReaderBuilder {
self.page_index_policy,
)
.await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
let parquet_meta = sst_meta.parquet_metadata();
let region_meta = sst_meta.region_metadata();
let region_partition_expr_str = self
.expected_metadata
.as_ref()
@@ -599,42 +597,15 @@ impl ParquetReaderBuilder {
}))
}
/// Decodes region metadata from key value.
fn get_region_metadata(
file_path: &str,
key_value_meta: Option<&Vec<KeyValue>>,
) -> Result<RegionMetadata> {
let key_values = key_value_meta.context(InvalidParquetSnafu {
file: file_path,
reason: "missing key value meta",
})?;
let meta_value = key_values
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("key {} not found", PARQUET_METADATA_KEY),
})?;
let json = meta_value
.value
.as_ref()
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("No value for key {}", PARQUET_METADATA_KEY),
})?;
RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
}
/// Reads parquet metadata of specific file.
/// Returns (metadata, cache_miss_flag).
/// Returns (fused metadata, cache_miss_flag).
async fn read_parquet_metadata(
&self,
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Result<(Arc<ParquetMetaData>, bool)> {
) -> Result<(Arc<CachedSstMeta>, bool)> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"])
@@ -644,7 +615,7 @@ impl ParquetReaderBuilder {
// Tries to get from cache with metrics tracking.
if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data(file_id, cache_metrics, page_index_policy)
.get_sst_meta_data(file_id, cache_metrics, page_index_policy)
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
@@ -657,10 +628,10 @@ impl ParquetReaderBuilder {
metadata_loader.with_page_index_policy(page_index_policy);
let metadata = metadata_loader.load(cache_metrics).await?;
let metadata = Arc::new(metadata);
let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
// Cache the metadata.
self.cache_strategy
.put_parquet_meta_data(file_id, metadata.clone());
.put_sst_meta_data(file_id, metadata.clone());
cache_metrics.metadata_load_cost += start.elapsed();
Ok((metadata, true))