diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs index 3c10f0568d..d3491a7e9d 100644 --- a/src/index/src/fulltext_index/tests.rs +++ b/src/index/src/fulltext_index/tests.rs @@ -25,7 +25,7 @@ use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCr use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use crate::fulltext_index::{Analyzer, Config}; -async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc) { +async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc>) { let staging_dir = create_temp_dir(prefix); let path = staging_dir.path().to_path_buf(); ( @@ -68,13 +68,13 @@ async fn test_search( let file_accessor = Arc::new(MockFileAccessor::new(prefix)); let puffin_manager = FsPuffinManager::new(stager, file_accessor); - let file_name = "fulltext_index"; - let blob_key = "fulltext_index"; - let mut writer = puffin_manager.writer(file_name).await.unwrap(); - create_index(prefix, &mut writer, blob_key, texts, config).await; + let file_name = "fulltext_index".to_string(); + let blob_key = "fulltext_index".to_string(); + let mut writer = puffin_manager.writer(&file_name).await.unwrap(); + create_index(prefix, &mut writer, &blob_key, texts, config).await; - let reader = puffin_manager.reader(file_name).await.unwrap(); - let index_dir = reader.dir(blob_key).await.unwrap(); + let reader = puffin_manager.reader(&file_name).await.unwrap(); + let index_dir = reader.dir(&blob_key).await.unwrap(); let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap(); let results = searcher.search(query).await.unwrap(); diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 51dd7a962a..f1a22cf54d 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -146,11 +146,14 @@ impl AccessLayer { } else { // Write cache is disabled. let store = self.object_store.clone(); + let path_provider = RegionFilePathFactory::new(self.region_dir.clone()); let indexer_builder = IndexerBuilderImpl { op_type: request.op_type, metadata: request.metadata.clone(), row_group_size: write_opts.row_group_size, - puffin_manager: self.puffin_manager_factory.build(store), + puffin_manager: self + .puffin_manager_factory + .build(store, path_provider.clone()), intermediate_manager: self.intermediate_manager.clone(), index_options: request.index_options, inverted_index_config: request.inverted_index_config, @@ -161,9 +164,7 @@ impl AccessLayer { self.object_store.clone(), request.metadata, indexer_builder, - RegionFilePathFactory { - region_dir: self.region_dir.clone(), - }, + path_provider, ) .await; writer @@ -248,8 +249,18 @@ pub trait FilePathProvider: Send + Sync { /// Path provider that builds paths in local write cache. #[derive(Clone)] pub(crate) struct WriteCachePathProvider { - pub(crate) region_id: RegionId, - pub(crate) file_cache: FileCacheRef, + region_id: RegionId, + file_cache: FileCacheRef, +} + +impl WriteCachePathProvider { + /// Creates a new `WriteCachePathProvider` instance. + pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self { + Self { + region_id, + file_cache, + } + } } impl FilePathProvider for WriteCachePathProvider { @@ -267,7 +278,14 @@ impl FilePathProvider for WriteCachePathProvider { /// Path provider that builds paths in region storage path. #[derive(Clone, Debug)] pub(crate) struct RegionFilePathFactory { - pub(crate) region_dir: String, + region_dir: String, +} + +impl RegionFilePathFactory { + /// Creates a new `RegionFilePathFactory` instance. + pub fn new(region_dir: String) -> Self { + Self { region_dir } + } } impl FilePathProvider for RegionFilePathFactory { diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 0ae00b3c6c..257692c67b 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -114,15 +114,14 @@ impl WriteCache { let region_id = write_request.metadata.region_id; let store = self.file_cache.local_store(); - let path_provider = WriteCachePathProvider { - file_cache: self.file_cache.clone(), - region_id, - }; + let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone()); let indexer = IndexerBuilderImpl { op_type: write_request.op_type, metadata: write_request.metadata.clone(), row_group_size: write_opts.row_group_size, - puffin_manager: self.puffin_manager_factory.build(store), + puffin_manager: self + .puffin_manager_factory + .build(store, path_provider.clone()), intermediate_manager: self.intermediate_manager.clone(), index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, @@ -355,9 +354,7 @@ mod tests { // and now just use local file system to mock. let mut env = TestEnv::new(); let mock_store = env.init_object_store_manager(); - let path_provider = RegionFilePathFactory { - region_dir: "test".to_string(), - }; + let path_provider = RegionFilePathFactory::new("test".to_string()); let local_dir = create_temp_dir(""); let local_store = new_fs_store(local_dir.path().to_str().unwrap()); @@ -488,9 +485,7 @@ mod tests { ..Default::default() }; let upload_request = SstUploadRequest { - dest_path_provider: RegionFilePathFactory { - region_dir: data_home.clone(), - }, + dest_path_provider: RegionFilePathFactory::new(data_home.clone()), remote_store: mock_store.clone(), }; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 193e3c3e17..4dd5baf5b1 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -32,7 +32,6 @@ use tokio::sync::{mpsc, Semaphore}; use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; -use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheStrategy; use crate::config::DEFAULT_SCAN_CHANNEL_SIZE; use crate::error::Result; @@ -427,12 +426,7 @@ impl ScanRegion { return None; } - let file_cache = || -> Option { - let write_cache = self.cache_strategy.write_cache()?; - let file_cache = write_cache.file_cache(); - Some(file_cache) - }(); - + let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache()); let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned(); let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned(); @@ -467,14 +461,8 @@ impl ScanRegion { return None; } - let file_cache = || -> Option { - let write_cache = self.cache_strategy.write_cache()?; - let file_cache = write_cache.file_cache(); - Some(file_cache) - }(); - + let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache()); let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned(); - let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned(); BloomFilterIndexApplierBuilder::new( @@ -499,12 +487,18 @@ impl ScanRegion { return None; } + let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache()); + let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned(); + FulltextIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), + self.version.metadata.region_id, self.access_layer.object_store().clone(), self.access_layer.puffin_manager_factory().clone(), self.version.metadata.as_ref(), ) + .with_file_cache(file_cache) + .with_puffin_metadata_cache(puffin_metadata_cache) .build(&self.request.filters) .inspect_err(|err| warn!(err; "Failed to build fulltext index applier")) .ok() diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index dc8829c330..68d2419b12 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -174,31 +174,8 @@ impl FileMeta { .contains(&IndexType::BloomFilterIndex) } - /// Returns the size of the inverted index file - pub fn inverted_index_size(&self) -> Option { - if self.available_indexes.len() == 1 && self.inverted_index_available() { - Some(self.index_file_size) - } else { - None - } - } - - /// Returns the size of the fulltext index file - pub fn fulltext_index_size(&self) -> Option { - if self.available_indexes.len() == 1 && self.fulltext_index_available() { - Some(self.index_file_size) - } else { - None - } - } - - /// Returns the size of the bloom filter index file - pub fn bloom_filter_index_size(&self) -> Option { - if self.available_indexes.len() == 1 && self.bloom_filter_index_available() { - Some(self.index_file_size) - } else { - None - } + pub fn index_file_size(&self) -> u64 { + self.index_file_size } } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index da59d3aec2..10dcd7f51e 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -113,11 +113,9 @@ impl FilePurger for LocalFilePurger { } // Purges index content in the stager. - let puffin_file_name = - crate::sst::location::index_file_path(sst_layer.region_dir(), file_meta.file_id); if let Err(e) = sst_layer .puffin_manager_factory() - .purge_stager(&puffin_file_name) + .purge_stager(file_meta.file_id) .await { error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index da1f6c86a3..6a8338cff8 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -103,7 +103,6 @@ pub type BloomFilterOutput = IndexBaseOutput; #[derive(Default)] pub struct Indexer { file_id: FileId, - file_path: String, region_id: RegionId, puffin_manager: Option, inverted_indexer: Option, @@ -170,7 +169,7 @@ impl Indexer { #[async_trait::async_trait] pub trait IndexerBuilder { /// Builds indexer of given file id to [index_file_path]. - async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer; + async fn build(&self, file_id: FileId) -> Indexer; } pub(crate) struct IndexerBuilderImpl { @@ -188,10 +187,9 @@ pub(crate) struct IndexerBuilderImpl { #[async_trait::async_trait] impl IndexerBuilder for IndexerBuilderImpl { /// Sanity check for arguments and create a new [Indexer] if arguments are valid. - async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer { + async fn build(&self, file_id: FileId) -> Indexer { let mut indexer = Indexer { file_id, - file_path: index_file_path, region_id: self.metadata.region_id, ..Default::default() }; @@ -392,6 +390,7 @@ mod tests { use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use super::*; + use crate::access_layer::FilePathProvider; use crate::config::{FulltextIndexConfig, Mode}; struct MetaConfig { @@ -484,6 +483,18 @@ mod tests { IntermediateManager::init_fs(path).await.unwrap() } + struct NoopPathProvider; + + impl FilePathProvider for NoopPathProvider { + fn build_index_file_path(&self, _file_id: FileId) -> String { + unreachable!() + } + + fn build_sst_file_path(&self, _file_id: FileId) -> String { + unreachable!() + } + } + #[tokio::test] async fn test_build_indexer_basic() { let (dir, factory) = @@ -499,14 +510,14 @@ mod tests { op_type: OperationType::Flush, metadata, row_group_size: 1024, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -529,7 +540,7 @@ mod tests { op_type: OperationType::Flush, metadata: metadata.clone(), row_group_size: 1024, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig { @@ -539,7 +550,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_none()); @@ -550,7 +561,7 @@ mod tests { op_type: OperationType::Compact, metadata: metadata.clone(), row_group_size: 1024, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), @@ -560,7 +571,7 @@ mod tests { }, bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -571,7 +582,7 @@ mod tests { op_type: OperationType::Compact, metadata, row_group_size: 1024, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), @@ -581,7 +592,7 @@ mod tests { ..Default::default() }, } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -604,14 +615,14 @@ mod tests { op_type: OperationType::Flush, metadata: metadata.clone(), row_group_size: 1024, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_none()); @@ -627,14 +638,14 @@ mod tests { op_type: OperationType::Flush, metadata: metadata.clone(), row_group_size: 1024, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -650,14 +661,14 @@ mod tests { op_type: OperationType::Flush, metadata: metadata.clone(), row_group_size: 1024, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -680,14 +691,14 @@ mod tests { op_type: OperationType::Flush, metadata, row_group_size: 0, - puffin_manager: factory.build(mock_object_store()), + puffin_manager: factory.build(mock_object_store(), NoopPathProvider), intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random(), "test".to_string()) + .build(FileId::random()) .await; assert!(indexer.inverted_indexer.is_none()); diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 7807434592..2008d7cbfb 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -28,6 +28,7 @@ use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; +use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider}; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::bloom_filter_index::{ BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, @@ -43,7 +44,6 @@ use crate::sst::index::bloom_filter::applier::builder::Predicate; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; -use crate::sst::location; pub(crate) type BloomFilterIndexApplierRef = Arc; @@ -247,11 +247,12 @@ impl BloomFilterIndexApplier { return Ok(None); }; - let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store()); - let puffin_file_name = file_cache.cache_file_path(index_key); - + let puffin_manager = self.puffin_manager_factory.build( + file_cache.local_store(), + WriteCachePathProvider::new(self.region_id, file_cache.clone()), + ); let reader = puffin_manager - .reader(&puffin_file_name) + .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? .with_file_size_hint(file_size_hint) @@ -278,12 +279,14 @@ impl BloomFilterIndexApplier { ) -> Result { let puffin_manager = self .puffin_manager_factory - .build(self.object_store.clone()) + .build( + self.object_store.clone(), + RegionFilePathFactory::new(self.region_dir.clone()), + ) .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); - let file_path = location::index_file_path(&self.region_dir, file_id); puffin_manager - .reader(&file_path) + .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? .with_file_size_hint(file_size_hint) @@ -447,7 +450,6 @@ mod tests { let memory_usage_threshold = Some(1024); let file_id = FileId::random(); let region_dir = "region_dir".to_string(); - let path = location::index_file_path(®ion_dir, file_id); let mut indexer = BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold) @@ -460,9 +462,12 @@ mod tests { let mut batch = new_batch("tag2", 10..20); indexer.update(&mut batch).await.unwrap(); - let puffin_manager = factory.build(object_store.clone()); + let puffin_manager = factory.build( + object_store.clone(), + RegionFilePathFactory::new(region_dir.clone()), + ); - let mut puffin_writer = puffin_manager.writer(&path).await.unwrap(); + let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap(); indexer.finish(&mut puffin_writer).await.unwrap(); puffin_writer.finish().await.unwrap(); diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index da79677b31..59437961b5 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -356,6 +356,7 @@ pub(crate) mod tests { use store_api::storage::RegionId; use super::*; + use crate::access_layer::FilePathProvider; use crate::read::BatchColumn; use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -368,6 +369,18 @@ pub(crate) mod tests { IntermediateManager::init_fs(path).await.unwrap() } + pub struct TestPathProvider; + + impl FilePathProvider for TestPathProvider { + fn build_index_file_path(&self, file_id: FileId) -> String { + file_id.to_string() + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + file_id.to_string() + } + } + /// tag_str: /// - type: string /// - index: bloom filter @@ -483,16 +496,16 @@ pub(crate) mod tests { indexer.update(&mut batch).await.unwrap(); let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; - let puffin_manager = factory.build(object_store); + let puffin_manager = factory.build(object_store, TestPathProvider); - let index_file_name = "index_file"; - let mut puffin_writer = puffin_manager.writer(index_file_name).await.unwrap(); + let file_id = FileId::random(); + let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap(); let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap(); assert_eq!(row_count, 20); assert!(byte_count > 0); puffin_writer.finish().await.unwrap(); - let puffin_reader = puffin_manager.reader(index_file_name).await.unwrap(); + let puffin_reader = puffin_manager.reader(&file_id).await.unwrap(); // tag_str { diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 7d3230781e..c6b773eb47 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -15,19 +15,22 @@ use std::collections::BTreeSet; use std::sync::Arc; +use common_telemetry::warn; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use object_store::ObjectStore; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, RegionId}; +use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider}; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; use crate::metrics::INDEX_APPLY_ELAPSED; use crate::sst::file::FileId; use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY; use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir}; use crate::sst::index::TYPE_FULLTEXT_INDEX; -use crate::sst::location; pub mod builder; @@ -36,6 +39,9 @@ pub struct FulltextIndexApplier { /// The root directory of the region. region_dir: String, + /// The region ID. + region_id: RegionId, + /// Queries to apply to the index. queries: Vec<(ColumnId, String)>, @@ -44,6 +50,12 @@ pub struct FulltextIndexApplier { /// Store responsible for accessing index files. store: ObjectStore, + + /// File cache to be used by the `FulltextIndexApplier`. + file_cache: Option, + + /// The puffin metadata cache. + puffin_metadata_cache: Option, } pub type FulltextIndexApplierRef = Arc; @@ -52,20 +64,43 @@ impl FulltextIndexApplier { /// Creates a new `FulltextIndexApplier`. pub fn new( region_dir: String, + region_id: RegionId, store: ObjectStore, queries: Vec<(ColumnId, String)>, puffin_manager_factory: PuffinManagerFactory, ) -> Self { Self { region_dir, + region_id, store, queries, puffin_manager_factory, + file_cache: None, + puffin_metadata_cache: None, } } + /// Sets the file cache. + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + /// Sets the puffin metadata cache. + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + /// Applies the queries to the fulltext index of the specified SST file. - pub async fn apply(&self, file_id: FileId) -> Result> { + pub async fn apply( + &self, + file_id: FileId, + file_size_hint: Option, + ) -> Result> { let _timer = INDEX_APPLY_ELAPSED .with_label_values(&[TYPE_FULLTEXT_INDEX]) .start_timer(); @@ -74,7 +109,9 @@ impl FulltextIndexApplier { let mut row_ids = BTreeSet::new(); for (column_id, query) in &self.queries { - let dir = self.index_dir_path(file_id, *column_id).await?; + let dir = self + .index_dir_path(file_id, *column_id, file_size_hint) + .await?; let path = match &dir { Some(dir) => dir.path(), None => { @@ -110,15 +147,74 @@ impl FulltextIndexApplier { &self, file_id: FileId, column_id: ColumnId, + file_size_hint: Option, ) -> Result> { - let puffin_manager = self.puffin_manager_factory.build(self.store.clone()); - let file_path = location::index_file_path(&self.region_dir, file_id); + let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}"); - match puffin_manager - .reader(&file_path) + // FAST PATH: Try to read the index from the file cache. + if let Some(file_cache) = &self.file_cache { + let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + if file_cache.get(index_key).await.is_some() { + match self + .get_index_from_file_cache(file_cache, file_id, file_size_hint, &blob_key) + .await + { + Ok(dir) => return Ok(dir), + Err(err) => { + warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") + } + } + } + } + + // SLOW PATH: Try to read the index from the remote file. + self.get_index_from_remote_file(file_id, file_size_hint, &blob_key) + .await + } + + async fn get_index_from_file_cache( + &self, + file_cache: &FileCacheRef, + file_id: FileId, + file_size_hint: Option, + blob_key: &str, + ) -> Result> { + match self + .puffin_manager_factory + .build( + file_cache.local_store(), + WriteCachePathProvider::new(self.region_id, file_cache.clone()), + ) + .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? - .dir(&format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}")) + .with_file_size_hint(file_size_hint) + .dir(blob_key) + .await + { + Ok(dir) => Ok(Some(dir)), + Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None), + Err(err) => Err(err).context(PuffinReadBlobSnafu), + } + } + + async fn get_index_from_remote_file( + &self, + file_id: FileId, + file_size_hint: Option, + blob_key: &str, + ) -> Result> { + match self + .puffin_manager_factory + .build( + self.store.clone(), + RegionFilePathFactory::new(self.region_dir.clone()), + ) + .reader(&file_id) + .await + .context(PuffinBuildReaderSnafu)? + .with_file_size_hint(file_size_hint) + .dir(blob_key) .await { Ok(dir) => Ok(Some(dir)), diff --git a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs index 5a10ffd160..b76bdc2f1b 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs @@ -15,9 +15,11 @@ use datafusion_common::ScalarValue; use datafusion_expr::Expr; use object_store::ObjectStore; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use store_api::metadata::RegionMetadata; -use store_api::storage::{ColumnId, ConcreteDataType}; +use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; +use crate::cache::file_cache::FileCacheRef; use crate::error::Result; use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -25,27 +27,49 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory; /// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`. pub struct FulltextIndexApplierBuilder<'a> { region_dir: String, + region_id: RegionId, store: ObjectStore, puffin_manager_factory: PuffinManagerFactory, metadata: &'a RegionMetadata, + file_cache: Option, + puffin_metadata_cache: Option, } impl<'a> FulltextIndexApplierBuilder<'a> { /// Creates a new `FulltextIndexApplierBuilder`. pub fn new( region_dir: String, + region_id: RegionId, store: ObjectStore, puffin_manager_factory: PuffinManagerFactory, metadata: &'a RegionMetadata, ) -> Self { Self { region_dir, + region_id, store, puffin_manager_factory, metadata, + file_cache: None, + puffin_metadata_cache: None, } } + /// Sets the file cache to be used by the `FulltextIndexApplier`. + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + /// Sets the puffin metadata cache to be used by the `FulltextIndexApplier`. + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + /// Builds `SstIndexApplier` from the given expressions. pub fn build(self, exprs: &[Expr]) -> Result> { let mut queries = Vec::with_capacity(exprs.len()); @@ -58,10 +82,13 @@ impl<'a> FulltextIndexApplierBuilder<'a> { Ok((!queries.is_empty()).then(|| { FulltextIndexApplier::new( self.region_dir, + self.region_id, self.store, queries, self.puffin_manager_factory, ) + .with_file_cache(self.file_cache) + .with_puffin_metadata_cache(self.puffin_metadata_cache) })) } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 28b77fdf44..1a88c1eafa 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -350,11 +350,11 @@ mod tests { use store_api::storage::{ConcreteDataType, RegionId}; use super::*; + use crate::access_layer::RegionFilePathFactory; use crate::read::{Batch, BatchColumn}; use crate::sst::file::FileId; use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; use crate::sst::index::puffin_manager::PuffinManagerFactory; - use crate::sst::location; fn mock_object_store() -> ObjectStore { ObjectStore::new(Memory::default()).unwrap().finish() @@ -494,7 +494,6 @@ mod tests { let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let region_dir = "region0".to_string(); let sst_file_id = FileId::random(); - let file_path = location::index_file_path(®ion_dir, sst_file_id); let object_store = mock_object_store(); let region_metadata = mock_region_metadata(); let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; @@ -514,8 +513,11 @@ mod tests { let mut batch = new_batch(rows); indexer.update(&mut batch).await.unwrap(); - let puffin_manager = factory.build(object_store.clone()); - let mut writer = puffin_manager.writer(&file_path).await.unwrap(); + let puffin_manager = factory.build( + object_store.clone(), + RegionFilePathFactory::new(region_dir.clone()), + ); + let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap(); let _ = indexer.finish(&mut writer).await.unwrap(); writer.finish().await.unwrap(); @@ -523,6 +525,7 @@ mod tests { let _d = &d; let applier = FulltextIndexApplier::new( region_dir.clone(), + region_metadata.region_id, object_store.clone(), queries .into_iter() @@ -531,7 +534,7 @@ mod tests { factory.clone(), ); - async move { applier.apply(sst_file_id).await.unwrap() }.boxed() + async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed() } } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index 025eead758..ce00be0ae0 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -62,7 +62,7 @@ impl Indexer { async fn build_puffin_writer(&mut self) -> Option { let puffin_manager = self.puffin_manager.take()?; - let err = match puffin_manager.writer(&self.file_path).await { + let err = match puffin_manager.writer(&self.file_id).await { Ok(writer) => return Some(writer), Err(err) => err, }; diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 61865f76f4..5362c1dd1d 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -28,6 +28,7 @@ use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::RegionId; +use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider}; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; use crate::error::{ @@ -38,7 +39,6 @@ use crate::sst::file::FileId; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::TYPE_INVERTED_INDEX; -use crate::sst::location; /// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. @@ -172,12 +172,14 @@ impl InvertedIndexApplier { return Ok(None); }; - let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store()); - let puffin_file_name = file_cache.cache_file_path(index_key); + let puffin_manager = self.puffin_manager_factory.build( + file_cache.local_store(), + WriteCachePathProvider::new(self.region_id, file_cache.clone()), + ); // Adds file size hint to the puffin reader to avoid extra metadata read. let reader = puffin_manager - .reader(&puffin_file_name) + .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? .with_file_size_hint(file_size_hint) @@ -198,12 +200,14 @@ impl InvertedIndexApplier { ) -> Result { let puffin_manager = self .puffin_manager_factory - .build(self.store.clone()) + .build( + self.store.clone(), + RegionFilePathFactory::new(self.region_dir.clone()), + ) .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); - let file_path = location::index_file_path(&self.region_dir, file_id); puffin_manager - .reader(&file_path) + .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? .with_file_size_hint(file_size_hint) @@ -239,10 +243,12 @@ mod tests { let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); let file_id = FileId::random(); let region_dir = "region_dir".to_string(); - let path = location::index_file_path(®ion_dir, file_id); - let puffin_manager = puffin_manager_factory.build(object_store.clone()); - let mut writer = puffin_manager.writer(&path).await.unwrap(); + let puffin_manager = puffin_manager_factory.build( + object_store.clone(), + RegionFilePathFactory::new(region_dir.clone()), + ); + let mut writer = puffin_manager.writer(&file_id).await.unwrap(); writer .put_blob(INDEX_BLOB_TYPE, Cursor::new(vec![]), Default::default()) .await @@ -285,10 +291,12 @@ mod tests { let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); let file_id = FileId::random(); let region_dir = "region_dir".to_string(); - let path = location::index_file_path(®ion_dir, file_id); - let puffin_manager = puffin_manager_factory.build(object_store.clone()); - let mut writer = puffin_manager.writer(&path).await.unwrap(); + let puffin_manager = puffin_manager_factory.build( + object_store.clone(), + RegionFilePathFactory::new(region_dir.clone()), + ); + let mut writer = puffin_manager.writer(&file_id).await.unwrap(); writer .put_blob("invalid_blob_type", Cursor::new(vec![]), Default::default()) .await diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 8bb664405a..83510f49ca 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -336,13 +336,13 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::access_layer::RegionFilePathFactory; use crate::cache::index::inverted_index::InvertedIndexCache; use crate::metrics::CACHE_BYTES; use crate::read::BatchColumn; use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::puffin_manager::PuffinManagerFactory; - use crate::sst::location; fn mock_object_store() -> ObjectStore { ObjectStore::new(Memory::default()).unwrap().finish() @@ -438,7 +438,6 @@ mod tests { let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let region_dir = "region0".to_string(); let sst_file_id = FileId::random(); - let file_path = location::index_file_path(®ion_dir, sst_file_id); let object_store = mock_object_store(); let region_metadata = mock_region_metadata(); let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; @@ -460,8 +459,11 @@ mod tests { creator.update(&mut batch).await.unwrap(); } - let puffin_manager = factory.build(object_store.clone()); - let mut writer = puffin_manager.writer(&file_path).await.unwrap(); + let puffin_manager = factory.build( + object_store.clone(), + RegionFilePathFactory::new(region_dir.clone()), + ); + let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap(); let (row_count, _) = creator.finish(&mut writer).await.unwrap(); assert_eq!(row_count, rows.len() * segment_row_count); writer.finish().await.unwrap(); diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 5d54da5ffb..161a791d32 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -26,18 +26,20 @@ use puffin::puffin_manager::stager::{BoundedStager, Stager}; use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; +use crate::access_layer::FilePathProvider; use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result}; use crate::metrics::{ StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, }; +use crate::sst::file::FileId; use crate::sst::index::store::{self, InstrumentedStore}; type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>; type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>; pub(crate) type SstPuffinManager = - FsPuffinManager, ObjectStorePuffinFileAccessor>; + FsPuffinManager>, ObjectStorePuffinFileAccessor>; pub(crate) type SstPuffinReader = ::Reader; pub(crate) type SstPuffinWriter = ::Writer; pub(crate) type SstPuffinBlob = ::Blob; @@ -50,7 +52,7 @@ const STAGING_DIR: &str = "staging"; #[derive(Clone)] pub struct PuffinManagerFactory { /// The stager used by the puffin manager. - stager: Arc, + stager: Arc>, /// The size of the write buffer used to create object store. write_buffer_size: Option, @@ -79,15 +81,20 @@ impl PuffinManagerFactory { }) } - pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager { + pub(crate) fn build( + &self, + store: ObjectStore, + path_provider: impl FilePathProvider + 'static, + ) -> SstPuffinManager { let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size); - let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store); + let puffin_file_accessor = + ObjectStorePuffinFileAccessor::new(store, Arc::new(path_provider)); SstPuffinManager::new(self.stager.clone(), puffin_file_accessor) } - pub(crate) async fn purge_stager(&self, puffin_file_name: &str) -> Result<()> { + pub(crate) async fn purge_stager(&self, file_id: FileId) -> Result<()> { self.stager - .purge(puffin_file_name) + .purge(&file_id) .await .context(PuffinPurgeStagerSnafu) } @@ -119,11 +126,15 @@ impl PuffinManagerFactory { #[derive(Clone)] pub(crate) struct ObjectStorePuffinFileAccessor { object_store: InstrumentedStore, + path_provider: Arc, } impl ObjectStorePuffinFileAccessor { - pub fn new(object_store: InstrumentedStore) -> Self { - Self { object_store } + pub fn new(object_store: InstrumentedStore, path_provider: Arc) -> Self { + Self { + object_store, + path_provider, + } } } @@ -131,11 +142,13 @@ impl ObjectStorePuffinFileAccessor { impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { type Reader = InstrumentedRangeReader; type Writer = InstrumentedAsyncWrite; + type FileHandle = FileId; - async fn reader(&self, puffin_file_name: &str) -> PuffinResult { + async fn reader(&self, handle: &FileId) -> PuffinResult { + let file_path = self.path_provider.build_index_file_path(*handle); self.object_store .range_reader( - puffin_file_name, + &file_path, &INDEX_PUFFIN_READ_BYTES_TOTAL, &INDEX_PUFFIN_READ_OP_TOTAL, ) @@ -144,10 +157,11 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { .context(puffin_error::ExternalSnafu) } - async fn writer(&self, puffin_file_name: &str) -> PuffinResult { + async fn writer(&self, handle: &FileId) -> PuffinResult { + let file_path = self.path_provider.build_index_file_path(*handle); self.object_store .writer( - puffin_file_name, + &file_path, &INDEX_PUFFIN_WRITE_BYTES_TOTAL, &INDEX_PUFFIN_WRITE_OP_TOTAL, &INDEX_PUFFIN_FLUSH_OP_TOTAL, @@ -169,20 +183,32 @@ mod tests { use super::*; + struct TestFilePathProvider; + + impl FilePathProvider for TestFilePathProvider { + fn build_index_file_path(&self, file_id: FileId) -> String { + file_id.to_string() + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + file_id.to_string() + } + } + #[tokio::test] async fn test_puffin_manager_factory() { let (_dir, factory) = PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await; let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); - let manager = factory.build(object_store); + let manager = factory.build(object_store, TestFilePathProvider); - let file_name = "my-puffin-file"; + let file_id = FileId::random(); let blob_key = "blob-key"; let dir_key = "dir-key"; let raw_data = b"hello world!"; - let mut writer = manager.writer(file_name).await.unwrap(); + let mut writer = manager.writer(&file_id).await.unwrap(); writer .put_blob(blob_key, Cursor::new(raw_data), PutOptions::default()) .await @@ -203,7 +229,7 @@ mod tests { .unwrap(); writer.finish().await.unwrap(); - let reader = manager.reader(file_name).await.unwrap(); + let reader = manager.reader(&file_id).await.unwrap(); let blob_guard = reader.blob(blob_key).await.unwrap(); let blob_reader = blob_guard.reader().await.unwrap(); let meta = blob_reader.metadata().await.unwrap(); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 12d16b7cda..14496312e3 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -131,7 +131,7 @@ mod tests { #[async_trait::async_trait] impl IndexerBuilder for NoopIndexBuilder { - async fn build(&self, _file_id: FileId, _path: String) -> Indexer { + async fn build(&self, _file_id: FileId) -> Indexer { Indexer::default() } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4aecf744d6..d34aaf2229 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -387,7 +387,11 @@ impl ParquetReaderBuilder { return false; } - let apply_res = match index_applier.apply(self.file_handle.file_id()).await { + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let apply_res = match index_applier + .apply(self.file_handle.file_id(), Some(file_size_hint)) + .await + { Ok(res) => res, Err(err) => { if cfg!(any(test, feature = "test")) { @@ -467,9 +471,9 @@ impl ParquetReaderBuilder { if !self.file_handle.meta_ref().inverted_index_available() { return false; } - let file_size_hint = self.file_handle.meta_ref().inverted_index_size(); + let file_size_hint = self.file_handle.meta_ref().index_file_size(); let apply_output = match index_applier - .apply(self.file_handle.file_id(), file_size_hint) + .apply(self.file_handle.file_id(), Some(file_size_hint)) .await { Ok(output) => output, @@ -578,11 +582,11 @@ impl ParquetReaderBuilder { return false; } - let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size(); + let file_size_hint = self.file_handle.meta_ref().index_file_size(); let apply_output = match index_applier .apply( self.file_handle.file_id(), - file_size_hint, + Some(file_size_hint), parquet_meta .row_groups() .iter() diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 8d0fd38e28..3aad380eb5 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -121,8 +121,7 @@ where path_provider: P, ) -> ParquetWriter { let init_file = FileId::random(); - let index_file_path = path_provider.build_index_file_path(init_file); - let indexer = indexer_builder.build(init_file, index_file_path).await; + let indexer = indexer_builder.build(init_file).await; ParquetWriter { path_provider, @@ -140,11 +139,7 @@ where match self.current_indexer { None => { self.current_file = FileId::random(); - let index_file_path = self.path_provider.build_index_file_path(self.current_file); - let indexer = self - .indexer_builder - .build(self.current_file, index_file_path) - .await; + let indexer = self.indexer_builder.build(self.current_file).await; self.current_indexer = Some(indexer); // safety: self.current_indexer already set above. self.current_indexer.as_mut().unwrap() diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 5217a3e6cc..2ceccf2ce1 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -36,12 +36,13 @@ use crate::file_metadata::FileMetadata; pub trait PuffinManager { type Reader: PuffinReader; type Writer: PuffinWriter; + type FileHandle: ToString + Clone + Send + Sync; - /// Creates a `PuffinReader` for the specified `puffin_file_name`. - async fn reader(&self, puffin_file_name: &str) -> Result; + /// Creates a `PuffinReader` for the specified `handle`. + async fn reader(&self, handle: &Self::FileHandle) -> Result; - /// Creates a `PuffinWriter` for the specified `puffin_file_name`. - async fn writer(&self, puffin_file_name: &str) -> Result; + /// Creates a `PuffinWriter` for the specified `handle`. + async fn writer(&self, handle: &Self::FileHandle) -> Result; } /// The `PuffinWriter` trait provides methods for writing blobs and directories to a Puffin file. diff --git a/src/puffin/src/puffin_manager/file_accessor.rs b/src/puffin/src/puffin_manager/file_accessor.rs index 193aa037f5..557f9c7914 100644 --- a/src/puffin/src/puffin_manager/file_accessor.rs +++ b/src/puffin/src/puffin_manager/file_accessor.rs @@ -27,12 +27,13 @@ use crate::error::Result; pub trait PuffinFileAccessor: Send + Sync + 'static { type Reader: SizeAwareRangeReader + Sync; type Writer: AsyncWrite + Unpin + Send; + type FileHandle: ToString + Clone + Send + Sync; - /// Opens a reader for the given puffin file. - async fn reader(&self, puffin_file_name: &str) -> Result; + /// Opens a reader for the given puffin file handle. + async fn reader(&self, handle: &Self::FileHandle) -> Result; - /// Creates a writer for the given puffin file. - async fn writer(&self, puffin_file_name: &str) -> Result; + /// Creates a writer for the given puffin file handle. + async fn writer(&self, handle: &Self::FileHandle) -> Result; } pub struct MockFileAccessor { @@ -50,15 +51,16 @@ impl MockFileAccessor { impl PuffinFileAccessor for MockFileAccessor { type Reader = FileReader; type Writer = Compat; + type FileHandle = String; - async fn reader(&self, puffin_file_name: &str) -> Result { - Ok(FileReader::new(self.tempdir.path().join(puffin_file_name)) + async fn reader(&self, handle: &String) -> Result { + Ok(FileReader::new(self.tempdir.path().join(handle)) .await .unwrap()) } - async fn writer(&self, puffin_file_name: &str) -> Result { - let p = self.tempdir.path().join(puffin_file_name); + async fn writer(&self, handle: &String) -> Result { + let p = self.tempdir.path().join(handle); if let Some(p) = p.parent() { if !tokio::fs::try_exists(p).await.unwrap() { tokio::fs::create_dir_all(p).await.unwrap(); diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index c03a86aaf6..af57041e68 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -61,25 +61,26 @@ impl FsPuffinManager { #[async_trait] impl PuffinManager for FsPuffinManager where - S: Stager + Clone + 'static, F: PuffinFileAccessor + Clone, + S: Stager + Clone + 'static, { type Reader = FsPuffinReader; type Writer = FsPuffinWriter; + type FileHandle = F::FileHandle; - async fn reader(&self, puffin_file_name: &str) -> Result { + async fn reader(&self, handle: &Self::FileHandle) -> Result { Ok(FsPuffinReader::new( - puffin_file_name.to_string(), + handle.clone(), self.stager.clone(), self.puffin_file_accessor.clone(), self.puffin_metadata_cache.clone(), )) } - async fn writer(&self, puffin_file_name: &str) -> Result { - let writer = self.puffin_file_accessor.writer(puffin_file_name).await?; + async fn writer(&self, handle: &Self::FileHandle) -> Result { + let writer = self.puffin_file_accessor.writer(handle).await?; Ok(FsPuffinWriter::new( - puffin_file_name.to_string(), + handle.clone(), self.stager.clone(), writer, )) diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 1202be3e08..2d08cd81a0 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -39,9 +39,13 @@ use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; use crate::puffin_manager::{BlobGuard, PuffinReader}; /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. -pub struct FsPuffinReader { - /// The name of the puffin file. - puffin_file_name: String, +pub struct FsPuffinReader +where + S: Stager + 'static, + F: PuffinFileAccessor + Clone, +{ + /// The handle of the puffin file. + handle: F::FileHandle, /// The file size hint. file_size_hint: Option, @@ -56,15 +60,19 @@ pub struct FsPuffinReader { puffin_file_metadata_cache: Option, } -impl FsPuffinReader { +impl FsPuffinReader +where + S: Stager + 'static, + F: PuffinFileAccessor + Clone, +{ pub(crate) fn new( - puffin_file_name: String, + handle: F::FileHandle, stager: S, puffin_file_accessor: F, puffin_file_metadata_cache: Option, ) -> Self { Self { - puffin_file_name, + handle, file_size_hint: None, stager, puffin_file_accessor, @@ -76,8 +84,8 @@ impl FsPuffinReader { #[async_trait] impl PuffinReader for FsPuffinReader where - S: Stager + 'static, F: PuffinFileAccessor + Clone, + S: Stager + 'static, { type Blob = Either, S::Blob>; type Dir = S::Dir; @@ -88,19 +96,13 @@ where } async fn metadata(&self) -> Result> { - let reader = self - .puffin_file_accessor - .reader(&self.puffin_file_name) - .await?; + let reader = self.puffin_file_accessor.reader(&self.handle).await?; let mut file = PuffinFileReader::new(reader); self.get_puffin_file_metadata(&mut file).await } async fn blob(&self, key: &str) -> Result { - let mut reader = self - .puffin_file_accessor - .reader(&self.puffin_file_name) - .await?; + let mut reader = self.puffin_file_accessor.reader(&self.handle).await?; if let Some(file_size_hint) = self.file_size_hint { reader.with_file_size_hint(file_size_hint); } @@ -117,7 +119,7 @@ where let blob = if blob_metadata.compression_codec.is_none() { // If the blob is not compressed, we can directly read it from the puffin file. Either::L(RandomReadBlob { - file_name: self.puffin_file_name.clone(), + handle: self.handle.clone(), accessor: self.puffin_file_accessor.clone(), blob_metadata, }) @@ -126,7 +128,7 @@ where let staged_blob = self .stager .get_blob( - self.puffin_file_name.as_str(), + &self.handle, key, Box::new(|writer| { Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer)) @@ -143,17 +145,18 @@ where async fn dir(&self, key: &str) -> Result { self.stager .get_dir( - self.puffin_file_name.as_str(), + &self.handle, key, Box::new(|writer_provider| { let accessor = self.puffin_file_accessor.clone(); - let puffin_file_name = self.puffin_file_name.clone(); + let handle = self.handle.clone(); let key = key.to_string(); Box::pin(Self::init_dir_to_stager( - puffin_file_name, + handle, key, writer_provider, accessor, + self.file_size_hint, )) }), ) @@ -170,15 +173,16 @@ where &self, reader: &mut PuffinFileReader, ) -> Result> { + let id = self.handle.to_string(); if let Some(cache) = self.puffin_file_metadata_cache.as_ref() { - if let Some(metadata) = cache.get_metadata(&self.puffin_file_name) { + if let Some(metadata) = cache.get_metadata(&id) { return Ok(metadata); } } let metadata = Arc::new(reader.metadata().await?); if let Some(cache) = self.puffin_file_metadata_cache.as_ref() { - cache.put_metadata(self.puffin_file_name.to_string(), metadata.clone()); + cache.put_metadata(id, metadata.clone()); } Ok(metadata) } @@ -196,12 +200,16 @@ where } async fn init_dir_to_stager( - puffin_file_name: String, + handle: F::FileHandle, key: String, writer_provider: DirWriterProviderRef, accessor: F, + file_size_hint: Option, ) -> Result { - let reader = accessor.reader(&puffin_file_name).await?; + let mut reader = accessor.reader(&handle).await?; + if let Some(file_size_hint) = file_size_hint { + reader.with_file_size_hint(file_size_hint); + } let mut file = PuffinFileReader::new(reader); let puffin_metadata = file.metadata().await?; @@ -237,7 +245,7 @@ where } ); - let reader = accessor.reader(&puffin_file_name).await?; + let reader = accessor.reader(&handle).await?; let writer = writer_provider.writer(&file_meta.relative_path).await?; let task = common_runtime::spawn_global(async move { let reader = PuffinFileReader::new(reader).into_blob_reader(&blob_meta); @@ -284,8 +292,8 @@ where } /// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file. -pub struct RandomReadBlob { - file_name: String, +pub struct RandomReadBlob { + handle: F::FileHandle, accessor: F, blob_metadata: BlobMetadata, } @@ -302,7 +310,7 @@ impl BlobGuard for RandomReadBlob { } ); - let reader = self.accessor.reader(&self.file_name).await?; + let reader = self.accessor.reader(&self.handle).await?; let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata); Ok(blob_reader) } diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs index ab7227606d..924ff5f990 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs @@ -34,9 +34,9 @@ use crate::puffin_manager::stager::Stager; use crate::puffin_manager::{PuffinWriter, PutOptions}; /// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file. -pub struct FsPuffinWriter { +pub struct FsPuffinWriter { /// The name of the puffin file. - puffin_file_name: String, + handle: S::FileHandle, /// The stager. stager: S, @@ -48,10 +48,10 @@ pub struct FsPuffinWriter { blob_keys: HashSet, } -impl FsPuffinWriter { - pub(crate) fn new(puffin_file_name: String, stager: S, writer: W) -> Self { +impl FsPuffinWriter { + pub(crate) fn new(handle: S::FileHandle, stager: S, writer: W) -> Self { Self { - puffin_file_name, + handle, stager, puffin_file_writer: PuffinFileWriter::new(writer), blob_keys: HashSet::new(), @@ -147,7 +147,7 @@ where // Move the directory into the stager. self.stager - .put_dir(&self.puffin_file_name, key, dir_path, dir_size) + .put_dir(&self.handle, key, dir_path, dir_size) .await?; Ok(written_bytes) } diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index ad21f88989..98cc194b9c 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -57,6 +57,7 @@ pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult; pub trait Stager: Send + Sync { type Blob: BlobGuard + Sync; type Dir: DirGuard; + type FileHandle: ToString + Clone + Send + Sync; /// Retrieves a blob, initializing it if necessary using the provided `init_fn`. /// @@ -64,7 +65,7 @@ pub trait Stager: Send + Sync { /// The caller is responsible for holding the `BlobGuard` until they are done with the blob. async fn get_blob<'a>( &self, - puffin_file_name: &str, + handle: &Self::FileHandle, key: &str, init_factory: Box, ) -> Result; @@ -75,7 +76,7 @@ pub trait Stager: Send + Sync { /// The caller is responsible for holding the `DirGuard` until they are done with the directory. async fn get_dir<'a>( &self, - puffin_file_name: &str, + handle: &Self::FileHandle, key: &str, init_fn: Box, ) -> Result; @@ -83,14 +84,14 @@ pub trait Stager: Send + Sync { /// Stores a directory in the staging area. async fn put_dir( &self, - puffin_file_name: &str, + handle: &Self::FileHandle, key: &str, dir_path: PathBuf, dir_size: u64, ) -> Result<()>; /// Purges all content for the given puffin file from the staging area. - async fn purge(&self, puffin_file_name: &str) -> Result<()>; + async fn purge(&self, handle: &Self::FileHandle) -> Result<()>; } /// `StagerNotifier` provides a way to notify the caller of the staging events. diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 508ba68a31..63f4c9d537 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -48,7 +48,7 @@ const DELETED_EXTENSION: &str = "deleted"; const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60); /// `BoundedStager` is a `Stager` that uses `moka` to manage staging area. -pub struct BoundedStager { +pub struct BoundedStager { /// The base directory of the staging area. base_dir: PathBuf, @@ -71,9 +71,11 @@ pub struct BoundedStager { /// Notifier for the stager. notifier: Option>, + + _phantom: std::marker::PhantomData, } -impl BoundedStager { +impl BoundedStager { pub async fn new( base_dir: PathBuf, capacity: u64, @@ -124,6 +126,7 @@ impl BoundedStager { delete_queue, recycle_bin, notifier, + _phantom: std::marker::PhantomData, }; stager.recover().await?; @@ -133,17 +136,19 @@ impl BoundedStager { } #[async_trait] -impl Stager for BoundedStager { +impl Stager for BoundedStager { type Blob = Arc; type Dir = Arc; + type FileHandle = H; async fn get_blob<'a>( &self, - puffin_file_name: &str, + handle: &Self::FileHandle, key: &str, init_fn: Box, ) -> Result { - let cache_key = Self::encode_cache_key(puffin_file_name, key); + let handle_str = handle.to_string(); + let cache_key = Self::encode_cache_key(&handle_str, key); let mut miss = false; let v = self @@ -169,7 +174,7 @@ impl Stager for BoundedStager { notifier.on_load_blob(timer.elapsed()); } let guard = Arc::new(FsBlobGuard { - puffin_file_name: puffin_file_name.to_string(), + handle: handle_str, path, delete_queue: self.delete_queue.clone(), size, @@ -194,11 +199,13 @@ impl Stager for BoundedStager { async fn get_dir<'a>( &self, - puffin_file_name: &str, + handle: &Self::FileHandle, key: &str, init_fn: Box, ) -> Result { - let cache_key = Self::encode_cache_key(puffin_file_name, key); + let handle_str = handle.to_string(); + + let cache_key = Self::encode_cache_key(&handle_str, key); let mut miss = false; let v = self @@ -224,7 +231,7 @@ impl Stager for BoundedStager { notifier.on_load_dir(timer.elapsed()); } let guard = Arc::new(FsDirGuard { - puffin_file_name: puffin_file_name.to_string(), + handle: handle_str, path, size, delete_queue: self.delete_queue.clone(), @@ -249,12 +256,13 @@ impl Stager for BoundedStager { async fn put_dir( &self, - puffin_file_name: &str, + handle: &Self::FileHandle, key: &str, dir_path: PathBuf, size: u64, ) -> Result<()> { - let cache_key = Self::encode_cache_key(puffin_file_name, key); + let handle_str = handle.to_string(); + let cache_key = Self::encode_cache_key(&handle_str, key); self.cache .try_get_with(cache_key.clone(), async move { @@ -275,7 +283,7 @@ impl Stager for BoundedStager { notifier.on_cache_insert(size); } let guard = Arc::new(FsDirGuard { - puffin_file_name: puffin_file_name.to_string(), + handle: handle_str, path, size, delete_queue: self.delete_queue.clone(), @@ -295,17 +303,17 @@ impl Stager for BoundedStager { Ok(()) } - async fn purge(&self, puffin_file_name: &str) -> Result<()> { - let file_name = puffin_file_name.to_string(); + async fn purge(&self, handle: &Self::FileHandle) -> Result<()> { + let handle_str = handle.to_string(); self.cache - .invalidate_entries_if(move |_k, v| v.puffin_file_name() == file_name) + .invalidate_entries_if(move |_k, v| v.handle() == handle_str) .unwrap(); // SAFETY: `support_invalidation_closures` is enabled self.cache.run_pending_tasks().await; Ok(()) } } -impl BoundedStager { +impl BoundedStager { fn encode_cache_key(puffin_file_name: &str, key: &str) -> String { let mut hasher = Sha256::new(); hasher.update(puffin_file_name); @@ -400,7 +408,7 @@ impl BoundedStager { delete_queue: self.delete_queue.clone(), // placeholder - puffin_file_name: String::new(), + handle: String::new(), })); // A duplicate dir will be moved to the delete queue. let _dup_dir = elems.insert(key, v); @@ -412,7 +420,7 @@ impl BoundedStager { delete_queue: self.delete_queue.clone(), // placeholder - puffin_file_name: String::new(), + handle: String::new(), })); // A duplicate file will be moved to the delete queue. let _dup_file = elems.insert(key, v); @@ -511,7 +519,7 @@ impl BoundedStager { } } -impl Drop for BoundedStager { +impl Drop for BoundedStager { fn drop(&mut self) { let _ = self.delete_queue.try_send(DeleteTask::Terminate); } @@ -535,10 +543,10 @@ impl CacheValue { self.size().try_into().unwrap_or(u32::MAX) } - fn puffin_file_name(&self) -> &str { + fn handle(&self) -> &str { match self { - CacheValue::File(guard) => &guard.puffin_file_name, - CacheValue::Dir(guard) => &guard.puffin_file_name, + CacheValue::File(guard) => &guard.handle, + CacheValue::Dir(guard) => &guard.handle, } } } @@ -553,7 +561,7 @@ enum DeleteTask { /// automatically deleting the file on drop. #[derive(Debug)] pub struct FsBlobGuard { - puffin_file_name: String, + handle: String, path: PathBuf, size: u64, delete_queue: Sender, @@ -586,7 +594,7 @@ impl Drop for FsBlobGuard { /// automatically deleting the directory on drop. #[derive(Debug)] pub struct FsDirGuard { - puffin_file_name: String, + handle: String, path: PathBuf, size: u64, delete_queue: Sender, @@ -636,7 +644,7 @@ impl DirWriterProvider for MokaDirWriterProvider { } #[cfg(test)] -impl BoundedStager { +impl BoundedStager { pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File { let cache_key = Self::encode_cache_key(puffin_file_name, key); let value = self.cache.get(&cache_key).await.unwrap(); @@ -796,11 +804,11 @@ mod tests { .await .unwrap(); - let puffin_file_name = "test_get_blob"; + let puffin_file_name = "test_get_blob".to_string(); let key = "key"; let reader = stager .get_blob( - puffin_file_name, + &puffin_file_name, key, Box::new(|mut writer| { Box::pin(async move { @@ -819,7 +827,7 @@ mod tests { let buf = reader.read(0..m.content_length).await.unwrap(); assert_eq!(&*buf, b"hello world"); - let mut file = stager.must_get_file(puffin_file_name, key).await; + let mut file = stager.must_get_file(&puffin_file_name, key).await; let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); @@ -861,11 +869,11 @@ mod tests { ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()), ]; - let puffin_file_name = "test_get_dir"; + let puffin_file_name = "test_get_dir".to_string(); let key = "key"; let dir_path = stager .get_dir( - puffin_file_name, + &puffin_file_name, key, Box::new(|writer_provider| { Box::pin(async move { @@ -890,7 +898,7 @@ mod tests { assert_eq!(buf, *content); } - let dir_path = stager.must_get_dir(puffin_file_name, key).await; + let dir_path = stager.must_get_dir(&puffin_file_name, key).await; for (rel_path, content) in &files_in_dir { let file_path = dir_path.join(rel_path); let mut file = tokio::fs::File::open(&file_path).await.unwrap(); @@ -929,11 +937,11 @@ mod tests { .unwrap(); // initialize stager - let puffin_file_name = "test_recover"; + let puffin_file_name = "test_recover".to_string(); let blob_key = "blob_key"; let guard = stager .get_blob( - puffin_file_name, + &puffin_file_name, blob_key, Box::new(|mut writer| { Box::pin(async move { @@ -957,7 +965,7 @@ mod tests { let dir_key = "dir_key"; let guard = stager .get_dir( - puffin_file_name, + &puffin_file_name, dir_key, Box::new(|writer_provider| { Box::pin(async move { @@ -983,7 +991,7 @@ mod tests { let reader = stager .get_blob( - puffin_file_name, + &puffin_file_name, blob_key, Box::new(|_| Box::pin(async { Ok(0) })), ) @@ -999,7 +1007,7 @@ mod tests { let dir_path = stager .get_dir( - puffin_file_name, + &puffin_file_name, dir_key, Box::new(|_| Box::pin(async { Ok(0) })), ) @@ -1042,13 +1050,13 @@ mod tests { .await .unwrap(); - let puffin_file_name = "test_eviction"; + let puffin_file_name = "test_eviction".to_string(); let blob_key = "blob_key"; // First time to get the blob let reader = stager .get_blob( - puffin_file_name, + &puffin_file_name, blob_key, Box::new(|mut writer| { Box::pin(async move { @@ -1065,7 +1073,7 @@ mod tests { // The blob should be evicted stager.cache.run_pending_tasks().await; - assert!(!stager.in_cache(puffin_file_name, blob_key)); + assert!(!stager.in_cache(&puffin_file_name, blob_key)); let stats = notifier.stats(); assert_eq!( @@ -1089,7 +1097,7 @@ mod tests { // Second time to get the blob, get from recycle bin let reader = stager .get_blob( - puffin_file_name, + &puffin_file_name, blob_key, Box::new(|_| async { Ok(0) }.boxed()), ) @@ -1101,7 +1109,7 @@ mod tests { // The blob should be evicted stager.cache.run_pending_tasks().await; - assert!(!stager.in_cache(puffin_file_name, blob_key)); + assert!(!stager.in_cache(&puffin_file_name, blob_key)); let stats = notifier.stats(); assert_eq!( @@ -1134,7 +1142,7 @@ mod tests { // First time to get the directory let guard_0 = stager .get_dir( - puffin_file_name, + &puffin_file_name, dir_key, Box::new(|writer_provider| { Box::pin(async move { @@ -1161,7 +1169,7 @@ mod tests { // The directory should be evicted stager.cache.run_pending_tasks().await; - assert!(!stager.in_cache(puffin_file_name, dir_key)); + assert!(!stager.in_cache(&puffin_file_name, dir_key)); let stats = notifier.stats(); assert_eq!( @@ -1181,7 +1189,7 @@ mod tests { // Second time to get the directory let guard_1 = stager .get_dir( - puffin_file_name, + &puffin_file_name, dir_key, Box::new(|_| async { Ok(0) }.boxed()), ) @@ -1198,7 +1206,7 @@ mod tests { // Still hold the guard stager.cache.run_pending_tasks().await; - assert!(!stager.in_cache(puffin_file_name, dir_key)); + assert!(!stager.in_cache(&puffin_file_name, dir_key)); let stats = notifier.stats(); assert_eq!( @@ -1220,7 +1228,7 @@ mod tests { drop(guard_1); let guard_2 = stager .get_dir( - puffin_file_name, + &puffin_file_name, dir_key, Box::new(|_| Box::pin(async move { Ok(0) })), ) @@ -1229,7 +1237,7 @@ mod tests { // Still hold the guard, so the directory should not be removed even if it's evicted stager.cache.run_pending_tasks().await; - assert!(!stager.in_cache(puffin_file_name, blob_key)); + assert!(!stager.in_cache(&puffin_file_name, blob_key)); for (rel_path, content) in &files_in_dir { let file_path = guard_2.path().join(rel_path); @@ -1262,13 +1270,14 @@ mod tests { .await .unwrap(); - let puffin_file_name = "test_get_blob_concurrency_on_fail"; + let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string(); let key = "key"; let stager = Arc::new(stager); let handles = (0..10) .map(|_| { let stager = stager.clone(); + let puffin_file_name = puffin_file_name.clone(); let task = async move { let failed_init = Box::new(|_| { async { @@ -1277,7 +1286,7 @@ mod tests { } .boxed() }); - stager.get_blob(puffin_file_name, key, failed_init).await + stager.get_blob(&puffin_file_name, key, failed_init).await }; tokio::spawn(task) @@ -1289,7 +1298,7 @@ mod tests { assert!(r.is_err()); } - assert!(!stager.in_cache(puffin_file_name, key)); + assert!(!stager.in_cache(&puffin_file_name, key)); } #[tokio::test] @@ -1299,13 +1308,14 @@ mod tests { .await .unwrap(); - let puffin_file_name = "test_get_dir_concurrency_on_fail"; + let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string(); let key = "key"; let stager = Arc::new(stager); let handles = (0..10) .map(|_| { let stager = stager.clone(); + let puffin_file_name = puffin_file_name.clone(); let task = async move { let failed_init = Box::new(|_| { async { @@ -1314,7 +1324,7 @@ mod tests { } .boxed() }); - stager.get_dir(puffin_file_name, key, failed_init).await + stager.get_dir(&puffin_file_name, key, failed_init).await }; tokio::spawn(task) @@ -1326,7 +1336,7 @@ mod tests { assert!(r.is_err()); } - assert!(!stager.in_cache(puffin_file_name, key)); + assert!(!stager.in_cache(&puffin_file_name, key)); } #[tokio::test] @@ -1343,11 +1353,11 @@ mod tests { .unwrap(); // initialize stager - let puffin_file_name = "test_purge"; + let puffin_file_name = "test_purge".to_string(); let blob_key = "blob_key"; let guard = stager .get_blob( - puffin_file_name, + &puffin_file_name, blob_key, Box::new(|mut writer| { Box::pin(async move { @@ -1371,7 +1381,7 @@ mod tests { let dir_key = "dir_key"; let guard = stager .get_dir( - puffin_file_name, + &puffin_file_name, dir_key, Box::new(|writer_provider| { Box::pin(async move { @@ -1390,8 +1400,7 @@ mod tests { drop(guard); // purge the stager - stager.purge(puffin_file_name).await.unwrap(); - stager.cache.run_pending_tasks().await; + stager.purge(&puffin_file_name).await.unwrap(); let stats = notifier.stats(); assert_eq!( diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index adfc44692e..582e8864d8 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -27,7 +27,7 @@ use crate::puffin_manager::{ BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions, }; -async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc) { +async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc>) { let staging_dir = create_temp_dir(prefix); let path = staging_dir.path().to_path_buf(); ( @@ -52,8 +52,8 @@ async fn test_put_get_file() { let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); - let puffin_file_name = "puffin_file"; - let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + let puffin_file_name = "puffin_file".to_string(); + let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap(); let key = "blob_a"; let raw_data = "Hello, world!".as_bytes(); @@ -61,9 +61,9 @@ async fn test_put_get_file() { writer.finish().await.unwrap(); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); check_blob( - puffin_file_name, + &puffin_file_name, key, raw_data, &stager, @@ -76,9 +76,9 @@ async fn test_put_get_file() { let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); check_blob( - puffin_file_name, + &puffin_file_name, key, raw_data, &stager, @@ -102,8 +102,8 @@ async fn test_put_get_files() { let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); - let puffin_file_name = "puffin_file"; - let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + let puffin_file_name = "puffin_file".to_string(); + let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap(); let blobs = [ ("blob_a", "Hello, world!".as_bytes()), @@ -119,10 +119,10 @@ async fn test_put_get_files() { writer.finish().await.unwrap(); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { check_blob( - puffin_file_name, + &puffin_file_name, key, raw_data, &stager, @@ -135,10 +135,10 @@ async fn test_put_get_files() { // renew cache manager let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { check_blob( - puffin_file_name, + &puffin_file_name, key, raw_data, &stager, @@ -164,8 +164,8 @@ async fn test_put_get_dir() { let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); - let puffin_file_name = "puffin_file"; - let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + let puffin_file_name = "puffin_file".to_string(); + let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap(); let key = "dir_a"; @@ -181,15 +181,15 @@ async fn test_put_get_dir() { writer.finish().await.unwrap(); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); - check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await; + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); + check_dir(&puffin_file_name, key, &files_in_dir, &stager, &reader).await; // renew cache manager let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); - check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await; + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); + check_dir(&puffin_file_name, key, &files_in_dir, &stager, &reader).await; } } } @@ -207,8 +207,8 @@ async fn test_put_get_mix_file_dir() { let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); - let puffin_file_name = "puffin_file"; - let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + let puffin_file_name = "puffin_file".to_string(); + let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap(); let blobs = [ ("blob_a", "Hello, world!".as_bytes()), @@ -234,10 +234,10 @@ async fn test_put_get_mix_file_dir() { writer.finish().await.unwrap(); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { check_blob( - puffin_file_name, + &puffin_file_name, key, raw_data, &stager, @@ -246,17 +246,17 @@ async fn test_put_get_mix_file_dir() { ) .await; } - check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; + check_dir(&puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; // renew cache manager let (_staging_dir, stager) = new_bounded_stager("test_put_get_mix_file_dir_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); - let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + let reader = puffin_manager.reader(&puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { check_blob( - puffin_file_name, + &puffin_file_name, key, raw_data, &stager, @@ -265,7 +265,7 @@ async fn test_put_get_mix_file_dir() { ) .await; } - check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; + check_dir(&puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; } } } @@ -292,7 +292,7 @@ async fn check_blob( puffin_file_name: &str, key: &str, raw_data: &[u8], - stager: &BoundedStager, + stager: &BoundedStager, puffin_reader: &impl PuffinReader, compressed: bool, ) { @@ -346,7 +346,7 @@ async fn check_dir( puffin_file_name: &str, key: &str, files_in_dir: &[(&str, &[u8])], - stager: &BoundedStager, + stager: &BoundedStager, puffin_reader: &impl PuffinReader, ) { let res_dir = puffin_reader.dir(key).await.unwrap();