take optional pre-exist metadata

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-18 02:56:29 +08:00
parent a337587433
commit e16f8230b0
3 changed files with 76 additions and 40 deletions

View File

@@ -339,6 +339,7 @@ impl AccessLayer {
metrics: &mut Metrics,
) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id;
let region_metadata = request.metadata.clone();
let cache_manager = request.cache_manager.clone();
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
@@ -412,6 +413,7 @@ impl AccessLayer {
cache_manager.put_parquet_meta_data(
RegionFileId::new(region_id, sst.file_id),
parquet_metadata.clone(),
Some(region_metadata.clone()),
)
}
}

View File

@@ -83,38 +83,44 @@ pub(crate) struct CachedSstMeta {
impl CachedSstMeta {
pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
let (region_metadata, region_metadata_weight) = {
let file_metadata = parquet_metadata.file_metadata();
let key_values = file_metadata
.key_value_metadata()
.context(InvalidParquetSnafu {
file: file_path,
reason: "missing key value meta",
})?;
let meta_value = key_values
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("key {} not found", PARQUET_METADATA_KEY),
})?;
let json = meta_value
.value
.as_ref()
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("No value for key {}", PARQUET_METADATA_KEY),
})?;
let region_metadata = Arc::new(
Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
}
pub(crate) fn try_new_with_region_metadata(
file_path: &str,
parquet_metadata: ParquetMetaData,
region_metadata: Option<RegionMetadataRef>,
) -> Result<Self> {
let file_metadata = parquet_metadata.file_metadata();
let key_values = file_metadata
.key_value_metadata()
.context(InvalidParquetSnafu {
file: file_path,
reason: "missing key value meta",
})?;
let meta_value = key_values
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("key {} not found", PARQUET_METADATA_KEY),
})?;
let json = meta_value
.value
.as_ref()
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("No value for key {}", PARQUET_METADATA_KEY),
})?;
let region_metadata = match region_metadata {
Some(region_metadata) => region_metadata,
None => Arc::new(
store_api::metadata::RegionMetadata::from_json(json)
.context(InvalidMetadataSnafu)?,
);
// Keep the previous JSON-byte floor and charge the decoded structures as well.
(
region_metadata.clone(),
region_metadata.estimated_size().max(json.len()),
)
),
};
// Keep the previous JSON-byte floor and charge the decoded structures as well.
let region_metadata_weight = region_metadata.estimated_size().max(json.len());
let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
Ok(Self {
@@ -233,13 +239,15 @@ impl CacheStrategy {
}
/// Calls [CacheManager::put_parquet_meta_data()].
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
pub fn put_parquet_meta_data(
&self,
file_id: RegionFileId,
metadata: Arc<ParquetMetaData>,
region_metadata: Option<RegionMetadataRef>,
) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.put_parquet_meta_data(file_id, metadata);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.put_parquet_meta_data(file_id, metadata);
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
}
CacheStrategy::Disabled => {}
}
@@ -526,14 +534,23 @@ impl CacheManager {
}
/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
pub fn put_parquet_meta_data(
&self,
file_id: RegionFileId,
metadata: Arc<ParquetMetaData>,
region_metadata: Option<RegionMetadataRef>,
) {
if self.sst_meta_cache.is_some() {
let file_path = format!(
"region_id={}, file_id={}",
file_id.region_id(),
file_id.file_id()
);
match CachedSstMeta::try_new(&file_path, Arc::unwrap_or_clone(metadata)) {
match CachedSstMeta::try_new_with_region_metadata(
&file_path,
Arc::unwrap_or_clone(metadata),
region_metadata,
) {
Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
Err(err) => warn!(
err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
@@ -1085,7 +1102,7 @@ mod tests {
let file_id = RegionFileId::new(region_id, FileId::random());
let metadata = parquet_meta();
let mut metrics = MetadataCacheMetrics::default();
cache.put_parquet_meta_data(file_id, metadata);
cache.put_parquet_meta_data(file_id, metadata, None);
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
@@ -1123,7 +1140,7 @@ mod tests {
.is_none()
);
let (metadata, region_metadata) = sst_parquet_meta();
cache.put_parquet_meta_data(file_id, metadata);
cache.put_parquet_meta_data(file_id, metadata, None);
let cached = cache
.get_sst_meta_data(file_id, &mut metrics, Default::default())
.await
@@ -1149,6 +1166,23 @@ mod tests {
);
}
#[tokio::test]
async fn test_parquet_meta_cache_with_provided_region_metadata() {
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
let mut metrics = MetadataCacheMetrics::default();
let region_id = RegionId::new(1, 1);
let file_id = RegionFileId::new(region_id, FileId::random());
let (metadata, region_metadata) = sst_parquet_meta();
cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
let cached = cache
.get_sst_meta_data(file_id, &mut metrics, Default::default())
.await
.unwrap();
assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
}
#[test]
fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
let region_metadata = Arc::new(wide_region_metadata(128));

View File

@@ -1043,7 +1043,7 @@ async fn preload_parquet_meta_cache_for_files(
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
match loader.load(&mut cache_metrics).await {
Ok(metadata) => {
cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata));
cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
loaded += 1;
}
Err(err) => {