feat: unify puffin name passed to stager (#5564)

* feat: purge a given puffin file in staging area

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish log

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* ttl set to 2d

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: expose staging_ttl to index config

* feat: unify puffin name passed to stager

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fallback to remote index

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Zhenchi
2025-02-21 17:27:03 +08:00
committed by GitHub
parent d7b6718be0
commit 8d05fb3503
27 changed files with 501 additions and 307 deletions

View File

@@ -25,7 +25,7 @@ use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCr
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use crate::fulltext_index::{Analyzer, Config};
async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager>) {
async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager<String>>) {
let staging_dir = create_temp_dir(prefix);
let path = staging_dir.path().to_path_buf();
(
@@ -68,13 +68,13 @@ async fn test_search(
let file_accessor = Arc::new(MockFileAccessor::new(prefix));
let puffin_manager = FsPuffinManager::new(stager, file_accessor);
let file_name = "fulltext_index";
let blob_key = "fulltext_index";
let mut writer = puffin_manager.writer(file_name).await.unwrap();
create_index(prefix, &mut writer, blob_key, texts, config).await;
let file_name = "fulltext_index".to_string();
let blob_key = "fulltext_index".to_string();
let mut writer = puffin_manager.writer(&file_name).await.unwrap();
create_index(prefix, &mut writer, &blob_key, texts, config).await;
let reader = puffin_manager.reader(file_name).await.unwrap();
let index_dir = reader.dir(blob_key).await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap();
let results = searcher.search(query).await.unwrap();

View File

@@ -146,11 +146,14 @@ impl AccessLayer {
} else {
// Write cache is disabled.
let store = self.object_store.clone();
let path_provider = RegionFilePathFactory::new(self.region_dir.clone());
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
@@ -161,9 +164,7 @@ impl AccessLayer {
self.object_store.clone(),
request.metadata,
indexer_builder,
RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
path_provider,
)
.await;
writer
@@ -248,8 +249,18 @@ pub trait FilePathProvider: Send + Sync {
/// Path provider that builds paths in local write cache.
#[derive(Clone)]
pub(crate) struct WriteCachePathProvider {
pub(crate) region_id: RegionId,
pub(crate) file_cache: FileCacheRef,
region_id: RegionId,
file_cache: FileCacheRef,
}
impl WriteCachePathProvider {
/// Creates a new `WriteCachePathProvider` instance.
pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self {
Self {
region_id,
file_cache,
}
}
}
impl FilePathProvider for WriteCachePathProvider {
@@ -267,7 +278,14 @@ impl FilePathProvider for WriteCachePathProvider {
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
pub(crate) region_dir: String,
region_dir: String,
}
impl RegionFilePathFactory {
/// Creates a new `RegionFilePathFactory` instance.
pub fn new(region_dir: String) -> Self {
Self { region_dir }
}
}
impl FilePathProvider for RegionFilePathFactory {

View File

@@ -114,15 +114,14 @@ impl WriteCache {
let region_id = write_request.metadata.region_id;
let store = self.file_cache.local_store();
let path_provider = WriteCachePathProvider {
file_cache: self.file_cache.clone(),
region_id,
};
let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone());
let indexer = IndexerBuilderImpl {
op_type: write_request.op_type,
metadata: write_request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
@@ -355,9 +354,7 @@ mod tests {
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let path_provider = RegionFilePathFactory::new("test".to_string());
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
@@ -488,9 +485,7 @@ mod tests {
..Default::default()
};
let upload_request = SstUploadRequest {
dest_path_provider: RegionFilePathFactory {
region_dir: data_home.clone(),
},
dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
remote_store: mock_store.clone(),
};

View File

@@ -32,7 +32,6 @@ use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
@@ -427,12 +426,7 @@ impl ScanRegion {
return None;
}
let file_cache = || -> Option<FileCacheRef> {
let write_cache = self.cache_strategy.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
@@ -467,14 +461,8 @@ impl ScanRegion {
return None;
}
let file_cache = || -> Option<FileCacheRef> {
let write_cache = self.cache_strategy.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
BloomFilterIndexApplierBuilder::new(
@@ -499,12 +487,18 @@ impl ScanRegion {
return None;
}
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
FulltextIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.version.metadata.region_id,
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
self.version.metadata.as_ref(),
)
.with_file_cache(file_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
.ok()

View File

@@ -174,31 +174,8 @@ impl FileMeta {
.contains(&IndexType::BloomFilterIndex)
}
/// Returns the size of the inverted index file
pub fn inverted_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.inverted_index_available() {
Some(self.index_file_size)
} else {
None
}
}
/// Returns the size of the fulltext index file
pub fn fulltext_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.fulltext_index_available() {
Some(self.index_file_size)
} else {
None
}
}
/// Returns the size of the bloom filter index file
pub fn bloom_filter_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.bloom_filter_index_available() {
Some(self.index_file_size)
} else {
None
}
pub fn index_file_size(&self) -> u64 {
self.index_file_size
}
}

View File

@@ -113,11 +113,9 @@ impl FilePurger for LocalFilePurger {
}
// Purges index content in the stager.
let puffin_file_name =
crate::sst::location::index_file_path(sst_layer.region_dir(), file_meta.file_id);
if let Err(e) = sst_layer
.puffin_manager_factory()
.purge_stager(&puffin_file_name)
.purge_stager(file_meta.file_id)
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",

View File

@@ -103,7 +103,6 @@ pub type BloomFilterOutput = IndexBaseOutput;
#[derive(Default)]
pub struct Indexer {
file_id: FileId,
file_path: String,
region_id: RegionId,
puffin_manager: Option<SstPuffinManager>,
inverted_indexer: Option<InvertedIndexer>,
@@ -170,7 +169,7 @@ impl Indexer {
#[async_trait::async_trait]
pub trait IndexerBuilder {
/// Builds indexer of given file id to [index_file_path].
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer;
async fn build(&self, file_id: FileId) -> Indexer;
}
pub(crate) struct IndexerBuilderImpl {
@@ -188,10 +187,9 @@ pub(crate) struct IndexerBuilderImpl {
#[async_trait::async_trait]
impl IndexerBuilder for IndexerBuilderImpl {
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer {
async fn build(&self, file_id: FileId) -> Indexer {
let mut indexer = Indexer {
file_id,
file_path: index_file_path,
region_id: self.metadata.region_id,
..Default::default()
};
@@ -392,6 +390,7 @@ mod tests {
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
use crate::access_layer::FilePathProvider;
use crate::config::{FulltextIndexConfig, Mode};
struct MetaConfig {
@@ -484,6 +483,18 @@ mod tests {
IntermediateManager::init_fs(path).await.unwrap()
}
struct NoopPathProvider;
impl FilePathProvider for NoopPathProvider {
fn build_index_file_path(&self, _file_id: FileId) -> String {
unreachable!()
}
fn build_sst_file_path(&self, _file_id: FileId) -> String {
unreachable!()
}
}
#[tokio::test]
async fn test_build_indexer_basic() {
let (dir, factory) =
@@ -499,14 +510,14 @@ mod tests {
op_type: OperationType::Flush,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -529,7 +540,7 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig {
@@ -539,7 +550,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -550,7 +561,7 @@ mod tests {
op_type: OperationType::Compact,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -560,7 +571,7 @@ mod tests {
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -571,7 +582,7 @@ mod tests {
op_type: OperationType::Compact,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -581,7 +592,7 @@ mod tests {
..Default::default()
},
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -604,14 +615,14 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -627,14 +638,14 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -650,14 +661,14 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -680,14 +691,14 @@ mod tests {
op_type: OperationType::Flush,
metadata,
row_group_size: 0,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_none());

View File

@@ -28,6 +28,7 @@ use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
@@ -43,7 +44,6 @@ use crate::sst::index::bloom_filter::applier::builder::Predicate;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
use crate::sst::location;
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
@@ -247,11 +247,12 @@ impl BloomFilterIndexApplier {
return Ok(None);
};
let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store());
let puffin_file_name = file_cache.cache_file_path(index_key);
let puffin_manager = self.puffin_manager_factory.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
);
let reader = puffin_manager
.reader(&puffin_file_name)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -278,12 +279,14 @@ impl BloomFilterIndexApplier {
) -> Result<BlobReader> {
let puffin_manager = self
.puffin_manager_factory
.build(self.object_store.clone())
.build(
self.object_store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -447,7 +450,6 @@ mod tests {
let memory_usage_threshold = Some(1024);
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut indexer =
BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
@@ -460,9 +462,12 @@ mod tests {
let mut batch = new_batch("tag2", 10..20);
indexer.update(&mut batch).await.unwrap();
let puffin_manager = factory.build(object_store.clone());
let puffin_manager = factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut puffin_writer = puffin_manager.writer(&path).await.unwrap();
let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
indexer.finish(&mut puffin_writer).await.unwrap();
puffin_writer.finish().await.unwrap();

View File

@@ -356,6 +356,7 @@ pub(crate) mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::access_layer::FilePathProvider;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -368,6 +369,18 @@ pub(crate) mod tests {
IntermediateManager::init_fs(path).await.unwrap()
}
pub struct TestPathProvider;
impl FilePathProvider for TestPathProvider {
fn build_index_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
}
/// tag_str:
/// - type: string
/// - index: bloom filter
@@ -483,16 +496,16 @@ pub(crate) mod tests {
indexer.update(&mut batch).await.unwrap();
let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let puffin_manager = factory.build(object_store);
let puffin_manager = factory.build(object_store, TestPathProvider);
let index_file_name = "index_file";
let mut puffin_writer = puffin_manager.writer(index_file_name).await.unwrap();
let file_id = FileId::random();
let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
assert_eq!(row_count, 20);
assert!(byte_count > 0);
puffin_writer.finish().await.unwrap();
let puffin_reader = puffin_manager.reader(index_file_name).await.unwrap();
let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
// tag_str
{

View File

@@ -15,19 +15,22 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use common_telemetry::warn;
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::location;
pub mod builder;
@@ -36,6 +39,9 @@ pub struct FulltextIndexApplier {
/// The root directory of the region.
region_dir: String,
/// The region ID.
region_id: RegionId,
/// Queries to apply to the index.
queries: Vec<(ColumnId, String)>,
@@ -44,6 +50,12 @@ pub struct FulltextIndexApplier {
/// Store responsible for accessing index files.
store: ObjectStore,
/// File cache to be used by the `FulltextIndexApplier`.
file_cache: Option<FileCacheRef>,
/// The puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
@@ -52,20 +64,43 @@ impl FulltextIndexApplier {
/// Creates a new `FulltextIndexApplier`.
pub fn new(
region_dir: String,
region_id: RegionId,
store: ObjectStore,
queries: Vec<(ColumnId, String)>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
region_id,
store,
queries,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
}
}
/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
/// Applies the queries to the fulltext index of the specified SST file.
pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<RowId>> {
pub async fn apply(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<BTreeSet<RowId>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
@@ -74,7 +109,9 @@ impl FulltextIndexApplier {
let mut row_ids = BTreeSet::new();
for (column_id, query) in &self.queries {
let dir = self.index_dir_path(file_id, *column_id).await?;
let dir = self
.index_dir_path(file_id, *column_id, file_size_hint)
.await?;
let path = match &dir {
Some(dir) => dir.path(),
None => {
@@ -110,15 +147,74 @@ impl FulltextIndexApplier {
&self,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinDir>> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
match puffin_manager
.reader(&file_path)
// FAST PATH: Try to read the index from the file cache.
if let Some(file_cache) = &self.file_cache {
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
if file_cache.get(index_key).await.is_some() {
match self
.get_index_from_file_cache(file_cache, file_id, file_size_hint, &blob_key)
.await
{
Ok(dir) => return Ok(dir),
Err(err) => {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
}
}
}
// SLOW PATH: Try to read the index from the remote file.
self.get_index_from_remote_file(file_id, file_size_hint, &blob_key)
.await
}
async fn get_index_from_file_cache(
&self,
file_cache: &FileCacheRef,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
.puffin_manager_factory
.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.dir(&format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}"))
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),
Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
}
async fn get_index_from_remote_file(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
.puffin_manager_factory
.build(
self.store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),

View File

@@ -15,9 +15,11 @@
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, ConcreteDataType};
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
use crate::cache::file_cache::FileCacheRef;
use crate::error::Result;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -25,27 +27,49 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
pub struct FulltextIndexApplierBuilder<'a> {
region_dir: String,
region_id: RegionId,
store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
metadata: &'a RegionMetadata,
file_cache: Option<FileCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl<'a> FulltextIndexApplierBuilder<'a> {
/// Creates a new `FulltextIndexApplierBuilder`.
pub fn new(
region_dir: String,
region_id: RegionId,
store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
metadata: &'a RegionMetadata,
) -> Self {
Self {
region_dir,
region_id,
store,
puffin_manager_factory,
metadata,
file_cache: None,
puffin_metadata_cache: None,
}
}
/// Sets the file cache to be used by the `FulltextIndexApplier`.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
/// Sets the puffin metadata cache to be used by the `FulltextIndexApplier`.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
/// Builds `SstIndexApplier` from the given expressions.
pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
let mut queries = Vec::with_capacity(exprs.len());
@@ -58,10 +82,13 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
Ok((!queries.is_empty()).then(|| {
FulltextIndexApplier::new(
self.region_dir,
self.region_id,
self.store,
queries,
self.puffin_manager_factory,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
}))
}

View File

@@ -350,11 +350,11 @@ mod tests {
use store_api::storage::{ConcreteDataType, RegionId};
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::read::{Batch, BatchColumn};
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
@@ -494,7 +494,6 @@ mod tests {
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
let file_path = location::index_file_path(&region_dir, sst_file_id);
let object_store = mock_object_store();
let region_metadata = mock_region_metadata();
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
@@ -514,8 +513,11 @@ mod tests {
let mut batch = new_batch(rows);
indexer.update(&mut batch).await.unwrap();
let puffin_manager = factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&file_path).await.unwrap();
let puffin_manager = factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
let _ = indexer.finish(&mut writer).await.unwrap();
writer.finish().await.unwrap();
@@ -523,6 +525,7 @@ mod tests {
let _d = &d;
let applier = FulltextIndexApplier::new(
region_dir.clone(),
region_metadata.region_id,
object_store.clone(),
queries
.into_iter()
@@ -531,7 +534,7 @@ mod tests {
factory.clone(),
);
async move { applier.apply(sst_file_id).await.unwrap() }.boxed()
async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed()
}
}

View File

@@ -62,7 +62,7 @@ impl Indexer {
async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
let puffin_manager = self.puffin_manager.take()?;
let err = match puffin_manager.writer(&self.file_path).await {
let err = match puffin_manager.writer(&self.file_id).await {
Ok(writer) => return Some(writer),
Err(err) => err,
};

View File

@@ -28,6 +28,7 @@ use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::error::{
@@ -38,7 +39,6 @@ use crate::sst::file::FileId;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::location;
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
@@ -172,12 +172,14 @@ impl InvertedIndexApplier {
return Ok(None);
};
let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store());
let puffin_file_name = file_cache.cache_file_path(index_key);
let puffin_manager = self.puffin_manager_factory.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
);
// Adds file size hint to the puffin reader to avoid extra metadata read.
let reader = puffin_manager
.reader(&puffin_file_name)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -198,12 +200,14 @@ impl InvertedIndexApplier {
) -> Result<BlobReader> {
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.build(
self.store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -239,10 +243,12 @@ mod tests {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let puffin_manager = puffin_manager_factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&path).await.unwrap();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
writer
.put_blob(INDEX_BLOB_TYPE, Cursor::new(vec![]), Default::default())
.await
@@ -285,10 +291,12 @@ mod tests {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let puffin_manager = puffin_manager_factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&path).await.unwrap();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
writer
.put_blob("invalid_blob_type", Cursor::new(vec![]), Default::default())
.await

View File

@@ -336,13 +336,13 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::cache::index::inverted_index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
@@ -438,7 +438,6 @@ mod tests {
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
let file_path = location::index_file_path(&region_dir, sst_file_id);
let object_store = mock_object_store();
let region_metadata = mock_region_metadata();
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
@@ -460,8 +459,11 @@ mod tests {
creator.update(&mut batch).await.unwrap();
}
let puffin_manager = factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&file_path).await.unwrap();
let puffin_manager = factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
let (row_count, _) = creator.finish(&mut writer).await.unwrap();
assert_eq!(row_count, rows.len() * segment_row_count);
writer.finish().await.unwrap();

View File

@@ -26,18 +26,20 @@ use puffin::puffin_manager::stager::{BoundedStager, Stager};
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use crate::access_layer::FilePathProvider;
use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result};
use crate::metrics::{
StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
};
use crate::sst::file::FileId;
use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;
FsPuffinManager<Arc<BoundedStager<FileId>>, ObjectStorePuffinFileAccessor>;
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
@@ -50,7 +52,7 @@ const STAGING_DIR: &str = "staging";
#[derive(Clone)]
pub struct PuffinManagerFactory {
/// The stager used by the puffin manager.
stager: Arc<BoundedStager>,
stager: Arc<BoundedStager<FileId>>,
/// The size of the write buffer used to create object store.
write_buffer_size: Option<usize>,
@@ -79,15 +81,20 @@ impl PuffinManagerFactory {
})
}
pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager {
pub(crate) fn build(
&self,
store: ObjectStore,
path_provider: impl FilePathProvider + 'static,
) -> SstPuffinManager {
let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store);
let puffin_file_accessor =
ObjectStorePuffinFileAccessor::new(store, Arc::new(path_provider));
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
pub(crate) async fn purge_stager(&self, puffin_file_name: &str) -> Result<()> {
pub(crate) async fn purge_stager(&self, file_id: FileId) -> Result<()> {
self.stager
.purge(puffin_file_name)
.purge(&file_id)
.await
.context(PuffinPurgeStagerSnafu)
}
@@ -119,11 +126,15 @@ impl PuffinManagerFactory {
#[derive(Clone)]
pub(crate) struct ObjectStorePuffinFileAccessor {
object_store: InstrumentedStore,
path_provider: Arc<dyn FilePathProvider>,
}
impl ObjectStorePuffinFileAccessor {
pub fn new(object_store: InstrumentedStore) -> Self {
Self { object_store }
pub fn new(object_store: InstrumentedStore, path_provider: Arc<dyn FilePathProvider>) -> Self {
Self {
object_store,
path_provider,
}
}
}
@@ -131,11 +142,13 @@ impl ObjectStorePuffinFileAccessor {
impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
type Reader = InstrumentedRangeReader;
type Writer = InstrumentedAsyncWrite;
type FileHandle = FileId;
async fn reader(&self, puffin_file_name: &str) -> PuffinResult<Self::Reader> {
async fn reader(&self, handle: &FileId) -> PuffinResult<Self::Reader> {
let file_path = self.path_provider.build_index_file_path(*handle);
self.object_store
.range_reader(
puffin_file_name,
&file_path,
&INDEX_PUFFIN_READ_BYTES_TOTAL,
&INDEX_PUFFIN_READ_OP_TOTAL,
)
@@ -144,10 +157,11 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
.context(puffin_error::ExternalSnafu)
}
async fn writer(&self, puffin_file_name: &str) -> PuffinResult<Self::Writer> {
async fn writer(&self, handle: &FileId) -> PuffinResult<Self::Writer> {
let file_path = self.path_provider.build_index_file_path(*handle);
self.object_store
.writer(
puffin_file_name,
&file_path,
&INDEX_PUFFIN_WRITE_BYTES_TOTAL,
&INDEX_PUFFIN_WRITE_OP_TOTAL,
&INDEX_PUFFIN_FLUSH_OP_TOTAL,
@@ -169,20 +183,32 @@ mod tests {
use super::*;
struct TestFilePathProvider;
impl FilePathProvider for TestFilePathProvider {
fn build_index_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
}
#[tokio::test]
async fn test_puffin_manager_factory() {
let (_dir, factory) =
PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let manager = factory.build(object_store);
let manager = factory.build(object_store, TestFilePathProvider);
let file_name = "my-puffin-file";
let file_id = FileId::random();
let blob_key = "blob-key";
let dir_key = "dir-key";
let raw_data = b"hello world!";
let mut writer = manager.writer(file_name).await.unwrap();
let mut writer = manager.writer(&file_id).await.unwrap();
writer
.put_blob(blob_key, Cursor::new(raw_data), PutOptions::default())
.await
@@ -203,7 +229,7 @@ mod tests {
.unwrap();
writer.finish().await.unwrap();
let reader = manager.reader(file_name).await.unwrap();
let reader = manager.reader(&file_id).await.unwrap();
let blob_guard = reader.blob(blob_key).await.unwrap();
let blob_reader = blob_guard.reader().await.unwrap();
let meta = blob_reader.metadata().await.unwrap();

View File

@@ -131,7 +131,7 @@ mod tests {
#[async_trait::async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(&self, _file_id: FileId, _path: String) -> Indexer {
async fn build(&self, _file_id: FileId) -> Indexer {
Indexer::default()
}
}

View File

@@ -387,7 +387,11 @@ impl ParquetReaderBuilder {
return false;
}
let apply_res = match index_applier.apply(self.file_handle.file_id()).await {
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = match index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await
{
Ok(res) => res,
Err(err) => {
if cfg!(any(test, feature = "test")) {
@@ -467,9 +471,9 @@ impl ParquetReaderBuilder {
if !self.file_handle.meta_ref().inverted_index_available() {
return false;
}
let file_size_hint = self.file_handle.meta_ref().inverted_index_size();
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_output = match index_applier
.apply(self.file_handle.file_id(), file_size_hint)
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await
{
Ok(output) => output,
@@ -578,11 +582,11 @@ impl ParquetReaderBuilder {
return false;
}
let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size();
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_output = match index_applier
.apply(
self.file_handle.file_id(),
file_size_hint,
Some(file_size_hint),
parquet_meta
.row_groups()
.iter()

View File

@@ -121,8 +121,7 @@ where
path_provider: P,
) -> ParquetWriter<F, I, P> {
let init_file = FileId::random();
let index_file_path = path_provider.build_index_file_path(init_file);
let indexer = indexer_builder.build(init_file, index_file_path).await;
let indexer = indexer_builder.build(init_file).await;
ParquetWriter {
path_provider,
@@ -140,11 +139,7 @@ where
match self.current_indexer {
None => {
self.current_file = FileId::random();
let index_file_path = self.path_provider.build_index_file_path(self.current_file);
let indexer = self
.indexer_builder
.build(self.current_file, index_file_path)
.await;
let indexer = self.indexer_builder.build(self.current_file).await;
self.current_indexer = Some(indexer);
// safety: self.current_indexer already set above.
self.current_indexer.as_mut().unwrap()

View File

@@ -36,12 +36,13 @@ use crate::file_metadata::FileMetadata;
pub trait PuffinManager {
type Reader: PuffinReader;
type Writer: PuffinWriter;
type FileHandle: ToString + Clone + Send + Sync;
/// Creates a `PuffinReader` for the specified `puffin_file_name`.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;
/// Creates a `PuffinReader` for the specified `handle`.
async fn reader(&self, handle: &Self::FileHandle) -> Result<Self::Reader>;
/// Creates a `PuffinWriter` for the specified `puffin_file_name`.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
/// Creates a `PuffinWriter` for the specified `handle`.
async fn writer(&self, handle: &Self::FileHandle) -> Result<Self::Writer>;
}
/// The `PuffinWriter` trait provides methods for writing blobs and directories to a Puffin file.

View File

@@ -27,12 +27,13 @@ use crate::error::Result;
pub trait PuffinFileAccessor: Send + Sync + 'static {
type Reader: SizeAwareRangeReader + Sync;
type Writer: AsyncWrite + Unpin + Send;
type FileHandle: ToString + Clone + Send + Sync;
/// Opens a reader for the given puffin file.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;
/// Opens a reader for the given puffin file handle.
async fn reader(&self, handle: &Self::FileHandle) -> Result<Self::Reader>;
/// Creates a writer for the given puffin file.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
/// Creates a writer for the given puffin file handle.
async fn writer(&self, handle: &Self::FileHandle) -> Result<Self::Writer>;
}
pub struct MockFileAccessor {
@@ -50,15 +51,16 @@ impl MockFileAccessor {
impl PuffinFileAccessor for MockFileAccessor {
type Reader = FileReader;
type Writer = Compat<File>;
type FileHandle = String;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FileReader::new(self.tempdir.path().join(puffin_file_name))
async fn reader(&self, handle: &String) -> Result<Self::Reader> {
Ok(FileReader::new(self.tempdir.path().join(handle))
.await
.unwrap())
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let p = self.tempdir.path().join(puffin_file_name);
async fn writer(&self, handle: &String) -> Result<Self::Writer> {
let p = self.tempdir.path().join(handle);
if let Some(p) = p.parent() {
if !tokio::fs::try_exists(p).await.unwrap() {
tokio::fs::create_dir_all(p).await.unwrap();

View File

@@ -61,25 +61,26 @@ impl<S, F> FsPuffinManager<S, F> {
#[async_trait]
impl<S, F> PuffinManager for FsPuffinManager<S, F>
where
S: Stager + Clone + 'static,
F: PuffinFileAccessor + Clone,
S: Stager<FileHandle = F::FileHandle> + Clone + 'static,
{
type Reader = FsPuffinReader<S, F>;
type Writer = FsPuffinWriter<S, F::Writer>;
type FileHandle = F::FileHandle;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
async fn reader(&self, handle: &Self::FileHandle) -> Result<Self::Reader> {
Ok(FsPuffinReader::new(
puffin_file_name.to_string(),
handle.clone(),
self.stager.clone(),
self.puffin_file_accessor.clone(),
self.puffin_metadata_cache.clone(),
))
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let writer = self.puffin_file_accessor.writer(puffin_file_name).await?;
async fn writer(&self, handle: &Self::FileHandle) -> Result<Self::Writer> {
let writer = self.puffin_file_accessor.writer(handle).await?;
Ok(FsPuffinWriter::new(
puffin_file_name.to_string(),
handle.clone(),
self.stager.clone(),
writer,
))

View File

@@ -39,9 +39,13 @@ use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,
pub struct FsPuffinReader<S, F>
where
S: Stager + 'static,
F: PuffinFileAccessor + Clone,
{
/// The handle of the puffin file.
handle: F::FileHandle,
/// The file size hint.
file_size_hint: Option<u64>,
@@ -56,15 +60,19 @@ pub struct FsPuffinReader<S, F> {
puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl<S, F> FsPuffinReader<S, F> {
impl<S, F> FsPuffinReader<S, F>
where
S: Stager + 'static,
F: PuffinFileAccessor + Clone,
{
pub(crate) fn new(
puffin_file_name: String,
handle: F::FileHandle,
stager: S,
puffin_file_accessor: F,
puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
Self {
puffin_file_name,
handle,
file_size_hint: None,
stager,
puffin_file_accessor,
@@ -76,8 +84,8 @@ impl<S, F> FsPuffinReader<S, F> {
#[async_trait]
impl<S, F> PuffinReader for FsPuffinReader<S, F>
where
S: Stager + 'static,
F: PuffinFileAccessor + Clone,
S: Stager<FileHandle = F::FileHandle> + 'static,
{
type Blob = Either<RandomReadBlob<F>, S::Blob>;
type Dir = S::Dir;
@@ -88,19 +96,13 @@ where
}
async fn metadata(&self) -> Result<Arc<FileMetadata>> {
let reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
let reader = self.puffin_file_accessor.reader(&self.handle).await?;
let mut file = PuffinFileReader::new(reader);
self.get_puffin_file_metadata(&mut file).await
}
async fn blob(&self, key: &str) -> Result<Self::Blob> {
let mut reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
let mut reader = self.puffin_file_accessor.reader(&self.handle).await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
@@ -117,7 +119,7 @@ where
let blob = if blob_metadata.compression_codec.is_none() {
// If the blob is not compressed, we can directly read it from the puffin file.
Either::L(RandomReadBlob {
file_name: self.puffin_file_name.clone(),
handle: self.handle.clone(),
accessor: self.puffin_file_accessor.clone(),
blob_metadata,
})
@@ -126,7 +128,7 @@ where
let staged_blob = self
.stager
.get_blob(
self.puffin_file_name.as_str(),
&self.handle,
key,
Box::new(|writer| {
Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer))
@@ -143,17 +145,18 @@ where
async fn dir(&self, key: &str) -> Result<Self::Dir> {
self.stager
.get_dir(
self.puffin_file_name.as_str(),
&self.handle,
key,
Box::new(|writer_provider| {
let accessor = self.puffin_file_accessor.clone();
let puffin_file_name = self.puffin_file_name.clone();
let handle = self.handle.clone();
let key = key.to_string();
Box::pin(Self::init_dir_to_stager(
puffin_file_name,
handle,
key,
writer_provider,
accessor,
self.file_size_hint,
))
}),
)
@@ -170,15 +173,16 @@ where
&self,
reader: &mut PuffinFileReader<F::Reader>,
) -> Result<Arc<FileMetadata>> {
let id = self.handle.to_string();
if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
if let Some(metadata) = cache.get_metadata(&self.puffin_file_name) {
if let Some(metadata) = cache.get_metadata(&id) {
return Ok(metadata);
}
}
let metadata = Arc::new(reader.metadata().await?);
if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
cache.put_metadata(self.puffin_file_name.to_string(), metadata.clone());
cache.put_metadata(id, metadata.clone());
}
Ok(metadata)
}
@@ -196,12 +200,16 @@ where
}
async fn init_dir_to_stager(
puffin_file_name: String,
handle: F::FileHandle,
key: String,
writer_provider: DirWriterProviderRef,
accessor: F,
file_size_hint: Option<u64>,
) -> Result<u64> {
let reader = accessor.reader(&puffin_file_name).await?;
let mut reader = accessor.reader(&handle).await?;
if let Some(file_size_hint) = file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
let mut file = PuffinFileReader::new(reader);
let puffin_metadata = file.metadata().await?;
@@ -237,7 +245,7 @@ where
}
);
let reader = accessor.reader(&puffin_file_name).await?;
let reader = accessor.reader(&handle).await?;
let writer = writer_provider.writer(&file_meta.relative_path).await?;
let task = common_runtime::spawn_global(async move {
let reader = PuffinFileReader::new(reader).into_blob_reader(&blob_meta);
@@ -284,8 +292,8 @@ where
}
/// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file.
pub struct RandomReadBlob<F> {
file_name: String,
pub struct RandomReadBlob<F: PuffinFileAccessor> {
handle: F::FileHandle,
accessor: F,
blob_metadata: BlobMetadata,
}
@@ -302,7 +310,7 @@ impl<F: PuffinFileAccessor + Clone> BlobGuard for RandomReadBlob<F> {
}
);
let reader = self.accessor.reader(&self.file_name).await?;
let reader = self.accessor.reader(&self.handle).await?;
let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata);
Ok(blob_reader)
}

View File

@@ -34,9 +34,9 @@ use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::{PuffinWriter, PutOptions};
/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct FsPuffinWriter<S, W> {
pub struct FsPuffinWriter<S: Stager, W> {
/// The name of the puffin file.
puffin_file_name: String,
handle: S::FileHandle,
/// The stager.
stager: S,
@@ -48,10 +48,10 @@ pub struct FsPuffinWriter<S, W> {
blob_keys: HashSet<String>,
}
impl<S, W> FsPuffinWriter<S, W> {
pub(crate) fn new(puffin_file_name: String, stager: S, writer: W) -> Self {
impl<S: Stager, W> FsPuffinWriter<S, W> {
pub(crate) fn new(handle: S::FileHandle, stager: S, writer: W) -> Self {
Self {
puffin_file_name,
handle,
stager,
puffin_file_writer: PuffinFileWriter::new(writer),
blob_keys: HashSet::new(),
@@ -147,7 +147,7 @@ where
// Move the directory into the stager.
self.stager
.put_dir(&self.puffin_file_name, key, dir_path, dir_size)
.put_dir(&self.handle, key, dir_path, dir_size)
.await?;
Ok(written_bytes)
}

View File

@@ -57,6 +57,7 @@ pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
pub trait Stager: Send + Sync {
type Blob: BlobGuard + Sync;
type Dir: DirGuard;
type FileHandle: ToString + Clone + Send + Sync;
/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
///
@@ -64,7 +65,7 @@ pub trait Stager: Send + Sync {
/// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob>;
@@ -75,7 +76,7 @@ pub trait Stager: Send + Sync {
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;
@@ -83,14 +84,14 @@ pub trait Stager: Send + Sync {
/// Stores a directory in the staging area.
async fn put_dir(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
dir_path: PathBuf,
dir_size: u64,
) -> Result<()>;
/// Purges all content for the given puffin file from the staging area.
async fn purge(&self, puffin_file_name: &str) -> Result<()>;
async fn purge(&self, handle: &Self::FileHandle) -> Result<()>;
}
/// `StagerNotifier` provides a way to notify the caller of the staging events.

View File

@@ -48,7 +48,7 @@ const DELETED_EXTENSION: &str = "deleted";
const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area.
pub struct BoundedStager {
pub struct BoundedStager<H> {
/// The base directory of the staging area.
base_dir: PathBuf,
@@ -71,9 +71,11 @@ pub struct BoundedStager {
/// Notifier for the stager.
notifier: Option<Arc<dyn StagerNotifier>>,
_phantom: std::marker::PhantomData<H>,
}
impl BoundedStager {
impl<H: 'static> BoundedStager<H> {
pub async fn new(
base_dir: PathBuf,
capacity: u64,
@@ -124,6 +126,7 @@ impl BoundedStager {
delete_queue,
recycle_bin,
notifier,
_phantom: std::marker::PhantomData,
};
stager.recover().await?;
@@ -133,17 +136,19 @@ impl BoundedStager {
}
#[async_trait]
impl Stager for BoundedStager {
impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
type Blob = Arc<FsBlobGuard>;
type Dir = Arc<FsDirGuard>;
type FileHandle = H;
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
let mut miss = false;
let v = self
@@ -169,7 +174,7 @@ impl Stager for BoundedStager {
notifier.on_load_blob(timer.elapsed());
}
let guard = Arc::new(FsBlobGuard {
puffin_file_name: puffin_file_name.to_string(),
handle: handle_str,
path,
delete_queue: self.delete_queue.clone(),
size,
@@ -194,11 +199,13 @@ impl Stager for BoundedStager {
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
let mut miss = false;
let v = self
@@ -224,7 +231,7 @@ impl Stager for BoundedStager {
notifier.on_load_dir(timer.elapsed());
}
let guard = Arc::new(FsDirGuard {
puffin_file_name: puffin_file_name.to_string(),
handle: handle_str,
path,
size,
delete_queue: self.delete_queue.clone(),
@@ -249,12 +256,13 @@ impl Stager for BoundedStager {
async fn put_dir(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
dir_path: PathBuf,
size: u64,
) -> Result<()> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
self.cache
.try_get_with(cache_key.clone(), async move {
@@ -275,7 +283,7 @@ impl Stager for BoundedStager {
notifier.on_cache_insert(size);
}
let guard = Arc::new(FsDirGuard {
puffin_file_name: puffin_file_name.to_string(),
handle: handle_str,
path,
size,
delete_queue: self.delete_queue.clone(),
@@ -295,17 +303,17 @@ impl Stager for BoundedStager {
Ok(())
}
async fn purge(&self, puffin_file_name: &str) -> Result<()> {
let file_name = puffin_file_name.to_string();
async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
let handle_str = handle.to_string();
self.cache
.invalidate_entries_if(move |_k, v| v.puffin_file_name() == file_name)
.invalidate_entries_if(move |_k, v| v.handle() == handle_str)
.unwrap(); // SAFETY: `support_invalidation_closures` is enabled
self.cache.run_pending_tasks().await;
Ok(())
}
}
impl BoundedStager {
impl<H> BoundedStager<H> {
fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(puffin_file_name);
@@ -400,7 +408,7 @@ impl BoundedStager {
delete_queue: self.delete_queue.clone(),
// placeholder
puffin_file_name: String::new(),
handle: String::new(),
}));
// A duplicate dir will be moved to the delete queue.
let _dup_dir = elems.insert(key, v);
@@ -412,7 +420,7 @@ impl BoundedStager {
delete_queue: self.delete_queue.clone(),
// placeholder
puffin_file_name: String::new(),
handle: String::new(),
}));
// A duplicate file will be moved to the delete queue.
let _dup_file = elems.insert(key, v);
@@ -511,7 +519,7 @@ impl BoundedStager {
}
}
impl Drop for BoundedStager {
impl<H> Drop for BoundedStager<H> {
fn drop(&mut self) {
let _ = self.delete_queue.try_send(DeleteTask::Terminate);
}
@@ -535,10 +543,10 @@ impl CacheValue {
self.size().try_into().unwrap_or(u32::MAX)
}
fn puffin_file_name(&self) -> &str {
fn handle(&self) -> &str {
match self {
CacheValue::File(guard) => &guard.puffin_file_name,
CacheValue::Dir(guard) => &guard.puffin_file_name,
CacheValue::File(guard) => &guard.handle,
CacheValue::Dir(guard) => &guard.handle,
}
}
}
@@ -553,7 +561,7 @@ enum DeleteTask {
/// automatically deleting the file on drop.
#[derive(Debug)]
pub struct FsBlobGuard {
puffin_file_name: String,
handle: String,
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
@@ -586,7 +594,7 @@ impl Drop for FsBlobGuard {
/// automatically deleting the directory on drop.
#[derive(Debug)]
pub struct FsDirGuard {
puffin_file_name: String,
handle: String,
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
@@ -636,7 +644,7 @@ impl DirWriterProvider for MokaDirWriterProvider {
}
#[cfg(test)]
impl BoundedStager {
impl<H> BoundedStager<H> {
pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let value = self.cache.get(&cache_key).await.unwrap();
@@ -796,11 +804,11 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_get_blob";
let puffin_file_name = "test_get_blob".to_string();
let key = "key";
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -819,7 +827,7 @@ mod tests {
let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"hello world");
let mut file = stager.must_get_file(puffin_file_name, key).await;
let mut file = stager.must_get_file(&puffin_file_name, key).await;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
@@ -861,11 +869,11 @@ mod tests {
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
let puffin_file_name = "test_get_dir";
let puffin_file_name = "test_get_dir".to_string();
let key = "key";
let dir_path = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -890,7 +898,7 @@ mod tests {
assert_eq!(buf, *content);
}
let dir_path = stager.must_get_dir(puffin_file_name, key).await;
let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -929,11 +937,11 @@ mod tests {
.unwrap();
// initialize stager
let puffin_file_name = "test_recover";
let puffin_file_name = "test_recover".to_string();
let blob_key = "blob_key";
let guard = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -957,7 +965,7 @@ mod tests {
let dir_key = "dir_key";
let guard = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -983,7 +991,7 @@ mod tests {
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|_| Box::pin(async { Ok(0) })),
)
@@ -999,7 +1007,7 @@ mod tests {
let dir_path = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|_| Box::pin(async { Ok(0) })),
)
@@ -1042,13 +1050,13 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_eviction";
let puffin_file_name = "test_eviction".to_string();
let blob_key = "blob_key";
// First time to get the blob
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -1065,7 +1073,7 @@ mod tests {
// The blob should be evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
assert!(!stager.in_cache(&puffin_file_name, blob_key));
let stats = notifier.stats();
assert_eq!(
@@ -1089,7 +1097,7 @@ mod tests {
// Second time to get the blob, get from recycle bin
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|_| async { Ok(0) }.boxed()),
)
@@ -1101,7 +1109,7 @@ mod tests {
// The blob should be evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
assert!(!stager.in_cache(&puffin_file_name, blob_key));
let stats = notifier.stats();
assert_eq!(
@@ -1134,7 +1142,7 @@ mod tests {
// First time to get the directory
let guard_0 = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -1161,7 +1169,7 @@ mod tests {
// The directory should be evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
assert!(!stager.in_cache(&puffin_file_name, dir_key));
let stats = notifier.stats();
assert_eq!(
@@ -1181,7 +1189,7 @@ mod tests {
// Second time to get the directory
let guard_1 = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|_| async { Ok(0) }.boxed()),
)
@@ -1198,7 +1206,7 @@ mod tests {
// Still hold the guard
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
assert!(!stager.in_cache(&puffin_file_name, dir_key));
let stats = notifier.stats();
assert_eq!(
@@ -1220,7 +1228,7 @@ mod tests {
drop(guard_1);
let guard_2 = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|_| Box::pin(async move { Ok(0) })),
)
@@ -1229,7 +1237,7 @@ mod tests {
// Still hold the guard, so the directory should not be removed even if it's evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
assert!(!stager.in_cache(&puffin_file_name, blob_key));
for (rel_path, content) in &files_in_dir {
let file_path = guard_2.path().join(rel_path);
@@ -1262,13 +1270,14 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_get_blob_concurrency_on_fail";
let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
let key = "key";
let stager = Arc::new(stager);
let handles = (0..10)
.map(|_| {
let stager = stager.clone();
let puffin_file_name = puffin_file_name.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
@@ -1277,7 +1286,7 @@ mod tests {
}
.boxed()
});
stager.get_blob(puffin_file_name, key, failed_init).await
stager.get_blob(&puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
@@ -1289,7 +1298,7 @@ mod tests {
assert!(r.is_err());
}
assert!(!stager.in_cache(puffin_file_name, key));
assert!(!stager.in_cache(&puffin_file_name, key));
}
#[tokio::test]
@@ -1299,13 +1308,14 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_get_dir_concurrency_on_fail";
let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
let key = "key";
let stager = Arc::new(stager);
let handles = (0..10)
.map(|_| {
let stager = stager.clone();
let puffin_file_name = puffin_file_name.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
@@ -1314,7 +1324,7 @@ mod tests {
}
.boxed()
});
stager.get_dir(puffin_file_name, key, failed_init).await
stager.get_dir(&puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
@@ -1326,7 +1336,7 @@ mod tests {
assert!(r.is_err());
}
assert!(!stager.in_cache(puffin_file_name, key));
assert!(!stager.in_cache(&puffin_file_name, key));
}
#[tokio::test]
@@ -1343,11 +1353,11 @@ mod tests {
.unwrap();
// initialize stager
let puffin_file_name = "test_purge";
let puffin_file_name = "test_purge".to_string();
let blob_key = "blob_key";
let guard = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -1371,7 +1381,7 @@ mod tests {
let dir_key = "dir_key";
let guard = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -1390,8 +1400,7 @@ mod tests {
drop(guard);
// purge the stager
stager.purge(puffin_file_name).await.unwrap();
stager.cache.run_pending_tasks().await;
stager.purge(&puffin_file_name).await.unwrap();
let stats = notifier.stats();
assert_eq!(

View File

@@ -27,7 +27,7 @@ use crate::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<BoundedStager>) {
async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<BoundedStager<String>>) {
let staging_dir = create_temp_dir(prefix);
let path = staging_dir.path().to_path_buf();
(
@@ -52,8 +52,8 @@ async fn test_put_get_file() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let key = "blob_a";
let raw_data = "Hello, world!".as_bytes();
@@ -61,9 +61,9 @@ async fn test_put_get_file() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -76,9 +76,9 @@ async fn test_put_get_file() {
let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -102,8 +102,8 @@ async fn test_put_get_files() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let blobs = [
("blob_a", "Hello, world!".as_bytes()),
@@ -119,10 +119,10 @@ async fn test_put_get_files() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -135,10 +135,10 @@ async fn test_put_get_files() {
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -164,8 +164,8 @@ async fn test_put_get_dir() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let key = "dir_a";
@@ -181,15 +181,15 @@ async fn test_put_get_dir() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await;
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_dir(&puffin_file_name, key, &files_in_dir, &stager, &reader).await;
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await;
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_dir(&puffin_file_name, key, &files_in_dir, &stager, &reader).await;
}
}
}
@@ -207,8 +207,8 @@ async fn test_put_get_mix_file_dir() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let blobs = [
("blob_a", "Hello, world!".as_bytes()),
@@ -234,10 +234,10 @@ async fn test_put_get_mix_file_dir() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -246,17 +246,17 @@ async fn test_put_get_mix_file_dir() {
)
.await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
check_dir(&puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
// renew cache manager
let (_staging_dir, stager) =
new_bounded_stager("test_put_get_mix_file_dir_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -265,7 +265,7 @@ async fn test_put_get_mix_file_dir() {
)
.await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
check_dir(&puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
}
}
}
@@ -292,7 +292,7 @@ async fn check_blob(
puffin_file_name: &str,
key: &str,
raw_data: &[u8],
stager: &BoundedStager,
stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader,
compressed: bool,
) {
@@ -346,7 +346,7 @@ async fn check_dir(
puffin_file_name: &str,
key: &str,
files_in_dir: &[(&str, &[u8])],
stager: &BoundedStager,
stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader,
) {
let res_dir = puffin_reader.dir(key).await.unwrap();