feat: introduce PuffinMetadataCache (#5148)

* feat: introduce `PuffinMetadataCache`

* refactor: remove too_many_arguments

* chore: fmt toml
This commit is contained in:
Weny Xu
2024-12-12 12:09:36 +08:00
committed by GitHub
parent 8c1959c580
commit d53fbcb936
22 changed files with 258 additions and 84 deletions

1
Cargo.lock generated
View File

@@ -8883,6 +8883,7 @@ dependencies = [
"lz4_flex 0.11.3",
"moka",
"pin-project",
"prometheus",
"serde",
"serde_json",
"sha2",

View File

@@ -32,6 +32,7 @@ use moka::notification::RemovalCause;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
use crate::cache::cache_size::parquet_meta_size;
@@ -68,6 +69,8 @@ pub struct CacheManager {
write_cache: Option<WriteCacheRef>,
/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
selector_result_cache: Option<SelectorResultCache>,
}
@@ -217,6 +220,10 @@ impl CacheManager {
pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
self.index_cache.as_ref()
}
pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
self.puffin_metadata_cache.as_ref()
}
}
/// Increases selector cache miss metrics.
@@ -237,6 +244,7 @@ pub struct CacheManagerBuilder {
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
}
@@ -278,6 +286,12 @@ impl CacheManagerBuilder {
self
}
/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
self
}
/// Sets selector result cache size.
pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
self.selector_result_cache_size = bytes;
@@ -340,6 +354,8 @@ impl CacheManagerBuilder {
});
let inverted_index_cache =
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.selector_result_cache_size)
@@ -361,6 +377,7 @@ impl CacheManagerBuilder {
page_cache,
write_cache: self.write_cache,
index_cache: Some(Arc::new(inverted_index_cache)),
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
}
}

View File

@@ -304,6 +304,9 @@ pub struct IndexConfig {
/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,
/// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
pub metadata_cache_size: ReadableSize,
}
impl Default for IndexConfig {
@@ -312,6 +315,7 @@ impl Default for IndexConfig {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
write_buffer_size: ReadableSize::mb(8),
metadata_cache_size: ReadableSize::mb(64),
}
}
}

View File

@@ -413,11 +413,15 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();
let puffin_metadata_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.puffin_metadata_cache())
.cloned();
InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
index_cache,
self.version.metadata.as_ref(),
self.version.metadata.inverted_indexed_column_ids(
self.version
@@ -429,6 +433,9 @@ impl ScanRegion {
),
self.access_layer.puffin_manager_factory().clone(),
)
.with_file_cache(file_cache)
.with_index_cache(index_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
.ok()

View File

@@ -149,6 +149,7 @@ impl FileMeta {
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}
pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}

View File

@@ -22,6 +22,7 @@ use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
@@ -60,6 +61,9 @@ pub(crate) struct InvertedIndexApplier {
/// In-memory cache for inverted index.
inverted_index_cache: Option<InvertedIndexCacheRef>,
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
@@ -70,8 +74,6 @@ impl InvertedIndexApplier {
region_dir: String,
region_id: RegionId,
store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
index_applier: Box<dyn IndexApplier>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
@@ -81,13 +83,35 @@ impl InvertedIndexApplier {
region_dir,
region_id,
store,
file_cache,
file_cache: None,
index_applier,
puffin_manager_factory,
inverted_index_cache: index_cache,
inverted_index_cache: None,
puffin_metadata_cache: None,
}
}
/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
/// Sets the index cache.
pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
self.inverted_index_cache = index_cache;
self
}
/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
@@ -105,6 +129,7 @@ impl InvertedIndexApplier {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
self.remote_blob_reader(file_id).await?
}
};
@@ -157,7 +182,10 @@ impl InvertedIndexApplier {
/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
@@ -219,8 +247,6 @@ mod tests {
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
@@ -261,8 +287,6 @@ mod tests {
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);

View File

@@ -28,6 +28,7 @@ use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
@@ -65,6 +66,9 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> {
/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,
/// Cache for puffin metadata.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl<'a> InvertedIndexApplierBuilder<'a> {
@@ -72,8 +76,6 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
pub fn new(
region_dir: String,
object_store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
metadata: &'a RegionMetadata,
indexed_column_ids: HashSet<ColumnId>,
puffin_manager_factory: PuffinManagerFactory,
@@ -81,15 +83,37 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
Self {
region_dir,
object_store,
file_cache,
metadata,
indexed_column_ids,
output: HashMap::default(),
index_cache,
puffin_manager_factory,
file_cache: None,
index_cache: None,
puffin_metadata_cache: None,
}
}
/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
/// Sets the index cache.
pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
self.index_cache = index_cache;
self
}
/// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
/// the expressions provided. If no predicates match, returns `None`.
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
@@ -108,15 +132,18 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
.collect();
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(InvertedIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
self.file_cache,
self.index_cache,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)))
Ok(Some(
InvertedIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
.with_index_cache(self.index_cache),
))
}
/// Recursively traverses expressions to collect predicates.
@@ -322,8 +349,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,

View File

@@ -75,8 +75,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -118,8 +116,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -144,8 +140,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -187,8 +181,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -214,8 +206,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,

View File

@@ -231,8 +231,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -260,8 +258,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -280,8 +276,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -315,8 +309,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,

View File

@@ -137,8 +137,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -175,8 +173,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -204,8 +200,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -224,8 +218,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -244,8 +236,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -303,8 +293,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -341,8 +329,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,

View File

@@ -68,8 +68,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -101,8 +99,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -126,8 +122,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -159,8 +153,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -186,8 +178,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,

View File

@@ -62,8 +62,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -91,8 +89,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -120,8 +116,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
@@ -142,8 +136,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,

View File

@@ -310,12 +310,14 @@ mod tests {
use futures::future::BoxFuture;
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCache;
use puffin::puffin_manager::PuffinManager;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::cache::index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
@@ -447,15 +449,16 @@ mod tests {
move |expr| {
let _d = &d;
let cache = Arc::new(InvertedIndexCache::new(10, 10));
let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
let applier = InvertedIndexApplierBuilder::new(
region_dir.clone(),
object_store.clone(),
None,
Some(cache),
&region_metadata,
indexed_column_ids.clone(),
factory.clone(),
)
.with_index_cache(Some(cache))
.with_puffin_metadata_cache(Some(puffin_metadata_cache))
.build(&[expr])
.unwrap()
.unwrap();

View File

@@ -170,6 +170,7 @@ impl WorkerGroup {
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes())
.index_content_size(config.inverted_index.content_cache_size.as_bytes())
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);

View File

@@ -25,6 +25,7 @@ futures.workspace = true
lz4_flex = "0.11"
moka = { workspace = true, features = ["future", "sync"] }
pin-project.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2 = "0.10.8"

View File

@@ -68,6 +68,20 @@ pub struct BlobMetadata {
pub properties: HashMap<String, String>,
}
impl BlobMetadata {
/// Calculates the memory usage of the blob metadata in bytes.
pub fn memory_usage(&self) -> usize {
self.blob_type.len()
+ self.input_fields.len() * std::mem::size_of::<i32>()
+ self
.properties
.iter()
.map(|(k, v)| k.len() + v.len())
.sum::<usize>()
+ std::mem::size_of::<Self>()
}
}
/// Compression codec used to compress the blob
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]

View File

@@ -46,6 +46,11 @@ impl<R> PuffinFileReader<R> {
}
}
pub fn with_metadata(mut self, metadata: Option<FileMetadata>) -> Self {
self.metadata = metadata;
self
}
fn validate_file_size(file_size: u64) -> Result<()> {
ensure!(
file_size >= MIN_FILE_SIZE,

View File

@@ -33,6 +33,22 @@ pub struct FileMetadata {
pub properties: HashMap<String, String>,
}
impl FileMetadata {
/// Calculates the memory usage of the file metadata in bytes.
pub fn memory_usage(&self) -> usize {
self.blobs
.iter()
.map(|blob| blob.memory_usage())
.sum::<usize>()
+ self
.properties
.iter()
.map(|(k, v)| k.len() + v.len())
.sum::<usize>()
+ std::mem::size_of::<Self>()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod cache;
pub mod file_accessor;
pub mod fs_puffin_manager;
pub mod stager;

View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use prometheus::IntGaugeVec;
use crate::file_metadata::FileMetadata;
/// Metrics for index metadata.
const PUFFIN_METADATA_TYPE: &str = "puffin_metadata";
pub type PuffinMetadataCacheRef = Arc<PuffinMetadataCache>;
/// A cache for storing the metadata of the index files.
pub struct PuffinMetadataCache {
cache: moka::sync::Cache<String, Arc<FileMetadata>>,
}
fn puffin_metadata_weight(k: &String, v: &Arc<FileMetadata>) -> u32 {
(k.as_bytes().len() + v.memory_usage()) as u32
}
impl PuffinMetadataCache {
pub fn new(capacity: u64, cache_bytes: &'static IntGaugeVec) -> Self {
common_telemetry::debug!("Building PuffinMetadataCache with capacity: {capacity}");
Self {
cache: moka::sync::CacheBuilder::new(capacity)
.name("puffin_metadata")
.weigher(puffin_metadata_weight)
.eviction_listener(|k, v, _cause| {
let size = puffin_metadata_weight(&k, &v);
cache_bytes
.with_label_values(&[PUFFIN_METADATA_TYPE])
.sub(size.into());
})
.build(),
}
}
/// Gets the metadata from the cache.
pub fn get_metadata(&self, file_id: &str) -> Option<Arc<FileMetadata>> {
self.cache.get(file_id)
}
/// Puts the metadata into the cache.
pub fn put_metadata(&self, file_id: String, metadata: Arc<FileMetadata>) {
self.cache.insert(file_id, metadata);
}
}

View File

@@ -21,6 +21,7 @@ pub use reader::FsPuffinReader;
pub use writer::FsPuffinWriter;
use crate::error::Result;
use crate::puffin_manager::cache::PuffinMetadataCacheRef;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::PuffinManager;
@@ -31,16 +32,29 @@ pub struct FsPuffinManager<S, F> {
stager: S,
/// The puffin file accessor.
puffin_file_accessor: F,
/// The puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl<S, F> FsPuffinManager<S, F> {
/// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`.
/// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`,
/// and optionally with a `puffin_metadata_cache`.
pub fn new(stager: S, puffin_file_accessor: F) -> Self {
Self {
stager,
puffin_file_accessor,
puffin_metadata_cache: None,
}
}
/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
}
#[async_trait]
@@ -57,6 +71,7 @@ where
puffin_file_name.to_string(),
self.stager.clone(),
self.puffin_file_accessor.clone(),
self.puffin_metadata_cache.clone(),
))
}

View File

@@ -14,6 +14,7 @@
use std::io;
use std::ops::Range;
use std::sync::Arc;
use async_compression::futures::bufread::ZstdDecoder;
use async_trait::async_trait;
@@ -23,12 +24,14 @@ use futures::io::BufReader;
use futures::{AsyncRead, AsyncWrite};
use snafu::{ensure, OptionExt, ResultExt};
use super::PuffinMetadataCacheRef;
use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::{
BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
MetadataSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
use crate::file_metadata::FileMetadata;
use crate::partial_reader::PartialReader;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
@@ -45,14 +48,23 @@ pub struct FsPuffinReader<S, F> {
/// The puffin file accessor.
puffin_file_accessor: F,
/// The puffin file metadata cache.
puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl<S, F> FsPuffinReader<S, F> {
pub(crate) fn new(puffin_file_name: String, stager: S, puffin_file_accessor: F) -> Self {
pub(crate) fn new(
puffin_file_name: String,
stager: S,
puffin_file_accessor: F,
puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
Self {
puffin_file_name,
stager,
puffin_file_accessor,
puffin_file_metadata_cache,
}
}
}
@@ -73,13 +85,13 @@ where
.await?;
let mut file = PuffinFileReader::new(reader);
// TODO(zhongzc): cache the metadata.
let metadata = file.metadata().await?;
let metadata = self.get_puffin_file_metadata(&mut file).await?;
let blob_metadata = metadata
.blobs
.into_iter()
.iter()
.find(|m| m.blob_type == key)
.context(BlobNotFoundSnafu { blob: key })?;
.context(BlobNotFoundSnafu { blob: key })?
.clone();
let blob = if blob_metadata.compression_codec.is_none() {
// If the blob is not compressed, we can directly read it from the puffin file.
@@ -133,6 +145,23 @@ where
S: Stager,
F: PuffinFileAccessor + Clone,
{
async fn get_puffin_file_metadata(
&self,
reader: &mut PuffinFileReader<F::Reader>,
) -> Result<Arc<FileMetadata>> {
if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
if let Some(metadata) = cache.get_metadata(&self.puffin_file_name) {
return Ok(metadata);
}
}
let metadata = Arc::new(reader.metadata().await?);
if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
cache.put_metadata(self.puffin_file_name.to_string(), metadata.clone());
}
Ok(metadata)
}
async fn init_blob_to_stager(
reader: PuffinFileReader<F::Reader>,
blob_metadata: BlobMetadata,