diff --git a/config/config.md b/config/config.md index a594e73680..32f34304c6 100644 --- a/config/config.md +++ b/config/config.md @@ -118,12 +118,15 @@ | `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | +| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | +| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | | `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. | | `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically
- `disable`: never | | `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically
- `disable`: never | | `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically
- `disable`: never | | `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.
Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. | -| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). | +| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | @@ -399,12 +402,15 @@ | `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | +| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | +| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | | `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. | | `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically
- `disable`: never | | `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically
- `disable`: never | | `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically
- `disable`: never | | `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.
Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. | -| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). | +| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index b3be8b5836..c12606110f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -394,6 +394,21 @@ parallel_scan_channel_size = 32 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## The options for index in Mito engine. +[region_engine.mito.index] + +## Auxiliary directory path for the index in filesystem, used to store intermediate files for +## creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`. +## The default name for this directory is `index_intermediate` for backward compatibility. +## +## This path contains two subdirectories: +## - `__intm`: for storing intermediate files used during creating index. +## - `staging`: for storing staging files used during searching index. +aux_path = "" + +## The max capacity of the staging directory. +staging_size = "2GB" + ## The options for inverted index in Mito engine. [region_engine.mito.inverted_index] @@ -416,7 +431,7 @@ apply_on_query = "auto" ## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. mem_threshold_on_create = "64M" -## File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). +## Deprecated, use `region_engine.mito.index.aux_path` instead. intermediate_path = "" [region_engine.mito.memtable] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0a2544a772..32c1840eea 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -417,6 +417,21 @@ parallel_scan_channel_size = 32 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## The options for index in Mito engine. +[region_engine.mito.index] + +## Auxiliary directory path for the index in filesystem, used to store intermediate files for +## creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`. +## The default name for this directory is `index_intermediate` for backward compatibility. +## +## This path contains two subdirectories: +## - `__intm`: for storing intermediate files used during creating index. +## - `staging`: for storing staging files used during searching index. +aux_path = "" + +## The max capacity of the staging directory. +staging_size = "2GB" + ## The options for inverted index in Mito engine. [region_engine.mito.inverted_index] @@ -439,7 +454,7 @@ apply_on_query = "auto" ## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. mem_threshold_on_create = "64M" -## File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). +## Deprecated, use `region_engine.mito.index.aux_path` instead. intermediate_path = "" [region_engine.mito.memtable] diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 40308124f5..98d9396bf7 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -27,6 +27,7 @@ use crate::read::Source; use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::index::IndexerBuilder; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; @@ -40,6 +41,8 @@ pub struct AccessLayer { region_dir: String, /// Target object store. object_store: ObjectStore, + /// Puffin manager factory for index. + puffin_manager_factory: PuffinManagerFactory, /// Intermediate manager for inverted index. intermediate_manager: IntermediateManager, } @@ -57,11 +60,13 @@ impl AccessLayer { pub fn new( region_dir: impl Into, object_store: ObjectStore, + puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> AccessLayer { AccessLayer { region_dir: region_dir.into(), object_store, + puffin_manager_factory, intermediate_manager, } } @@ -76,6 +81,11 @@ impl AccessLayer { &self.object_store } + /// Returns the puffin manager factory. + pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory { + &self.puffin_manager_factory + } + /// Deletes a SST file (and its index file if it has one) with given file id. pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> { let path = location::sst_file_path(&self.region_dir, file_meta.file_id); @@ -86,15 +96,13 @@ impl AccessLayer { file_id: file_meta.file_id, })?; - if file_meta.inverted_index_available() { - let path = location::index_file_path(&self.region_dir, file_meta.file_id); - self.object_store - .delete(&path) - .await - .context(DeleteIndexSnafu { - file_id: file_meta.file_id, - })?; - } + let path = location::index_file_path(&self.region_dir, file_meta.file_id); + self.object_store + .delete(&path) + .await + .context(DeleteIndexSnafu { + file_id: file_meta.file_id, + })?; Ok(()) } diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 931e506269..008a717593 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -117,6 +117,7 @@ impl FileCache { } /// Reads a file from the cache. + #[allow(unused)] pub(crate) async fn reader(&self, key: IndexKey) -> Option { // We must use `get()` to update the estimator of the cache. // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 062e5423c4..a303367a34 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -45,6 +45,7 @@ use crate::schedule::scheduler::LocalScheduler; use crate::sst::file::{FileMeta, IndexType}; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::parquet::WriteOptions; /// CompactionRegion represents a region that needs to be compacted. @@ -93,13 +94,19 @@ pub async fn open_compaction_region( }; let access_layer = { + let puffin_manager_factory = PuffinManagerFactory::new( + &mito_config.index.aux_path, + mito_config.index.staging_size.as_bytes(), + Some(mito_config.index.write_buffer_size.as_bytes() as _), + ) + .await?; let intermediate_manager = - IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone()) - .await?; + IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?; Arc::new(AccessLayer::new( req.region_dir.as_str(), object_store.clone(), + puffin_manager_factory, intermediate_manager, )) }; @@ -266,7 +273,7 @@ impl Compactor for DefaultCompactor { let index_write_buffer_size = Some( compaction_region .engine_config - .inverted_index + .index .write_buffer_size .as_bytes() as usize, ); diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 5f5799ec2f..04d085dda8 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -15,6 +15,7 @@ //! Configurations. use std::cmp; +use std::path::Path; use std::time::Duration; use common_base::readable_size::ReadableSize; @@ -104,6 +105,8 @@ pub struct MitoConfig { /// Whether to allow stale entries read during replay. pub allow_stale_entries: bool, + /// Index configs. + pub index: IndexConfig, /// Inverted index configs. pub inverted_index: InvertedIndexConfig, @@ -134,6 +137,7 @@ impl Default for MitoConfig { scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, + index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), memtable: MemtableConfig::default(), }; @@ -202,7 +206,7 @@ impl MitoConfig { self.experimental_write_cache_path = join_dir(data_home, "write_cache"); } - self.inverted_index.sanitize(data_home)?; + self.index.sanitize(data_home, &self.inverted_index)?; Ok(()) } @@ -246,6 +250,70 @@ impl MitoConfig { } } +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct IndexConfig { + /// Auxiliary directory path for the index in filesystem, used to + /// store intermediate files for creating the index and staging files + /// for searching the index, defaults to `{data_home}/index_intermediate`. + /// + /// This path contains two subdirectories: + /// - `__intm`: for storing intermediate files used during creating index. + /// - `staging`: for storing staging files used during searching index. + /// + /// The default name for this directory is `index_intermediate` for backward compatibility. + pub aux_path: String, + + /// The max capacity of the staging directory. + pub staging_size: ReadableSize, + + /// Write buffer size for creating the index. + pub write_buffer_size: ReadableSize, +} + +impl Default for IndexConfig { + fn default() -> Self { + Self { + aux_path: String::new(), + staging_size: ReadableSize::gb(2), + write_buffer_size: ReadableSize::mb(8), + } + } +} + +impl IndexConfig { + pub fn sanitize( + &mut self, + data_home: &str, + inverted_index: &InvertedIndexConfig, + ) -> Result<()> { + #[allow(deprecated)] + if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() { + self.aux_path.clone_from(&inverted_index.intermediate_path); + warn!( + "`inverted_index.intermediate_path` is deprecated, use + `index.aux_path` instead. Set `index.aux_path` to {}", + &inverted_index.intermediate_path + ) + } + if self.aux_path.is_empty() { + let path = Path::new(data_home).join("index_intermediate"); + self.aux_path = path.as_os_str().to_string_lossy().to_string(); + } + + if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE { + self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE; + warn!( + "Sanitize index write buffer size to {}", + self.write_buffer_size + ); + } + + Ok(()) + } +} + /// Operational mode for certain actions. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)] #[serde(rename_all = "snake_case")] @@ -280,17 +348,23 @@ pub struct InvertedIndexConfig { pub create_on_compaction: Mode, /// Whether to apply the index on query: automatically or never. pub apply_on_query: Mode, - /// Write buffer size for creating the index. - pub write_buffer_size: ReadableSize, + /// Memory threshold for performing an external sort during index creation. /// `None` means all sorting will happen in memory. #[serde_as(as = "NoneAsEmptyString")] pub mem_threshold_on_create: Option, - /// File system path to store intermediate files for external sort, defaults to `{data_home}/index_intermediate`. + + #[deprecated = "use [IndexConfig::aux_path] instead"] + #[serde(skip_serializing)] pub intermediate_path: String, + + #[deprecated = "use [IndexConfig::write_buffer_size] instead"] + #[serde(skip_serializing)] + pub write_buffer_size: ReadableSize, } impl Default for InvertedIndexConfig { + #[allow(deprecated)] fn default() -> Self { Self { create_on_flush: Mode::Auto, @@ -303,24 +377,6 @@ impl Default for InvertedIndexConfig { } } -impl InvertedIndexConfig { - pub fn sanitize(&mut self, data_home: &str) -> Result<()> { - if self.intermediate_path.is_empty() { - self.intermediate_path = join_dir(data_home, "index_intermediate"); - } - - if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE { - self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE; - warn!( - "Sanitize index write buffer size to {}", - self.write_buffer_size - ); - } - - Ok(()) - } -} - /// Divide cpu num by a non-zero `divisor` and returns at least 1. fn divide_num_cpus(divisor: usize) -> usize { debug_assert!(divisor > 0); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1306edf09d..ed665e445c 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -597,13 +597,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Blob type not found, blob_type: {blob_type}"))] - PuffinBlobTypeNotFound { - blob_type: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to write puffin completely"))] PuffinFinish { source: puffin::error::Error, @@ -783,6 +776,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to initialize puffin stager"))] + PuffinInitStager { + source: puffin::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to build puffin reader"))] + PuffinBuildReader { + source: puffin::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -821,7 +828,6 @@ impl ErrorExt for Error { | CreateDefault { .. } | InvalidParquet { .. } | OperateAbortedIndex { .. } - | PuffinBlobTypeNotFound { .. } | UnexpectedReplay { .. } | IndexEncodeNull { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, @@ -886,7 +892,9 @@ impl ErrorExt for Error { PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } | PuffinFinish { source, .. } - | PuffinAddBlob { source, .. } => source.status_code(), + | PuffinAddBlob { source, .. } + | PuffinInitStager { source, .. } + | PuffinBuildReader { source, .. } => source.status_code(), CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, StaleLogEntry { .. } => StatusCode::Unexpected, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 971295e08d..2d573b423b 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -327,12 +327,8 @@ impl RegionFlushTask { .inverted_index .mem_threshold_on_create .map(|m| m.as_bytes() as _); - let index_write_buffer_size = Some( - self.engine_config - .inverted_index - .write_buffer_size - .as_bytes() as usize, - ); + let index_write_buffer_size = + Some(self.engine_config.index.write_buffer_size.as_bytes() as usize); // Flush to level 0. let write_request = SstWriteRequest { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index e29b1611a2..c25a040295 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -343,6 +343,7 @@ impl ScanRegion { .iter() .copied() .collect(), + self.access_layer.puffin_manager_factory().clone(), ) .build(&self.request.filters) .inspect_err(|err| warn!(err; "Failed to build index applier")) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 50aa7c68cd..65429478f5 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -48,6 +48,7 @@ use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; @@ -63,6 +64,7 @@ pub(crate) struct RegionOpener { options: Option, cache_manager: Option, skip_wal_replay: bool, + puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, time_provider: Option, stats: ManifestStats, @@ -77,6 +79,7 @@ impl RegionOpener { memtable_builder_provider: MemtableBuilderProvider, object_store_manager: ObjectStoreManagerRef, purge_scheduler: SchedulerRef, + puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> RegionOpener { RegionOpener { @@ -89,6 +92,7 @@ impl RegionOpener { options: None, cache_manager: None, skip_wal_replay: false, + puffin_manager_factory, intermediate_manager, time_provider: None, stats: Default::default(), @@ -216,6 +220,7 @@ impl RegionOpener { let access_layer = Arc::new(AccessLayer::new( self.region_dir, object_store, + self.puffin_manager_factory, self.intermediate_manager, )); let time_provider = self @@ -317,6 +322,7 @@ impl RegionOpener { let access_layer = Arc::new(AccessLayer::new( self.region_dir.clone(), object_store, + self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), )); let file_purger = Arc::new(LocalFilePurger::new( diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 4f81170933..0753b1a3eb 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -97,7 +97,6 @@ impl FilePurger for LocalFilePurger { mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; - use object_store::util::join_dir; use object_store::ObjectStore; use smallvec::SmallVec; @@ -106,6 +105,7 @@ mod tests { use crate::schedule::scheduler::{LocalScheduler, Scheduler}; use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType}; use crate::sst::index::intermediate::IntermediateManager; + use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location; #[tokio::test] @@ -119,7 +119,12 @@ mod tests { let sst_file_id = FileId::random(); let sst_dir = "table1"; let path = location::sst_file_path(sst_dir, sst_file_id); - let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm")) + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) .await .unwrap(); @@ -127,7 +132,12 @@ mod tests { object_store.write(&path, vec![0; 4096]).await.unwrap(); let scheduler = Arc::new(LocalScheduler::new(3)); - let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr)); + let layer = Arc::new(AccessLayer::new( + sst_dir, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); @@ -165,11 +175,16 @@ mod tests { builder.root(&dir_path); let sst_file_id = FileId::random(); let sst_dir = "table1"; - let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm")) + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) .await .unwrap(); - let path = location::sst_file_path(sst_dir, sst_file_id); + let path = location::sst_file_path(sst_dir, sst_file_id); let object_store = ObjectStore::new(builder).unwrap().finish(); object_store.write(&path, vec![0; 4096]).await.unwrap(); @@ -180,7 +195,12 @@ mod tests { .unwrap(); let scheduler = Arc::new(LocalScheduler::new(3)); - let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr)); + let layer = Arc::new(AccessLayer::new( + sst_dir, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index ebc561c829..5bfee47ef7 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -16,6 +16,7 @@ pub(crate) mod applier; mod codec; pub(crate) mod creator; pub(crate) mod intermediate; +pub(crate) mod puffin_manager; mod store; use std::num::NonZeroUsize; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index a823de56c8..d99d5ea8cd 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -16,27 +16,21 @@ pub mod builder; use std::sync::Arc; -use futures::{AsyncRead, AsyncSeek}; +use common_telemetry::warn; use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; use object_store::ObjectStore; -use puffin::file_format::reader::{AsyncReader, PuffinFileReader}; -use snafu::{OptionExt, ResultExt}; +use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; +use snafu::ResultExt; use store_api::storage::RegionId; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; -use crate::error::{ - ApplyIndexSnafu, OpenDalSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, - PuffinReadMetadataSnafu, Result, -}; -use crate::metrics::{ - INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL, - INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_SEEK_OP_TOTAL, -}; +use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; +use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE}; use crate::sst::file::FileId; -use crate::sst::index::store::InstrumentedStore; +use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::INDEX_BLOB_TYPE; use crate::sst::location; @@ -50,7 +44,7 @@ pub(crate) struct SstIndexApplier { region_id: RegionId, /// Store responsible for accessing remote index files. - store: InstrumentedStore, + store: ObjectStore, /// The cache of index files. file_cache: Option, @@ -58,6 +52,9 @@ pub(crate) struct SstIndexApplier { /// Predefined index applier used to apply predicates to index files /// and return the relevant row group ids for further scan. index_applier: Box, + + /// The puffin manager factory. + puffin_manager_factory: PuffinManagerFactory, } pub(crate) type SstIndexApplierRef = Arc; @@ -67,18 +64,20 @@ impl SstIndexApplier { pub fn new( region_dir: String, region_id: RegionId, - object_store: ObjectStore, + store: ObjectStore, file_cache: Option, index_applier: Box, + puffin_manager_factory: PuffinManagerFactory, ) -> Self { INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); Self { region_dir, region_id, - store: InstrumentedStore::new(object_store), + store, file_cache, index_applier, + puffin_manager_factory, } } @@ -91,94 +90,65 @@ impl SstIndexApplier { index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, }; - match self.cached_puffin_reader(file_id).await? { - Some(mut puffin_reader) => { - let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; - let mut index_reader = InvertedIndexBlobReader::new(blob_reader); - self.index_applier - .apply(context, &mut index_reader) - .await - .context(ApplyIndexSnafu) + let blob = match self.cached_blob_reader(file_id).await { + Ok(Some(puffin_reader)) => puffin_reader, + other => { + if let Err(err) = other { + warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") + } + self.remote_blob_reader(file_id).await? } - None => { - let mut puffin_reader = self.remote_puffin_reader(file_id).await?; - let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; - let mut index_reader = InvertedIndexBlobReader::new(blob_reader); - self.index_applier - .apply(context, &mut index_reader) - .await - .context(ApplyIndexSnafu) - } - } + }; + let mut blob_reader = InvertedIndexBlobReader::new(blob); + let output = self + .index_applier + .apply(context, &mut blob_reader) + .await + .context(ApplyIndexSnafu)?; + Ok(output) } - /// Helper function to create a [`PuffinFileReader`] from the cached index file. - async fn cached_puffin_reader( - &self, - file_id: FileId, - ) -> Result>> { + /// Creates a blob reader from the cached index file. + async fn cached_blob_reader(&self, file_id: FileId) -> Result> { let Some(file_cache) = &self.file_cache else { return Ok(None); }; - let Some(indexed_value) = file_cache - .get(IndexKey::new(self.region_id, file_id, FileType::Puffin)) - .await - else { + let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + if file_cache.get(index_key).await.is_none() { return Ok(None); }; - let Some(reader) = file_cache - .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) - .await - else { - return Ok(None); - }; + let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store()); + let puffin_file_name = file_cache.cache_file_path(index_key); - let reader = reader - .into_futures_async_read(0..indexed_value.file_size as u64) + let reader = puffin_manager + .reader(&puffin_file_name) .await - .context(OpenDalSnafu)?; - - Ok(Some(PuffinFileReader::new(reader))) + .context(PuffinBuildReaderSnafu)? + .blob(INDEX_BLOB_TYPE) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu)?; + Ok(Some(reader)) } - /// Helper function to create a [`PuffinFileReader`] from the remote index file. - async fn remote_puffin_reader( - &self, - file_id: FileId, - ) -> Result> { + /// Creates a blob reader from the remote index file. + async fn remote_blob_reader(&self, file_id: FileId) -> Result { + let puffin_manager = self.puffin_manager_factory.build(self.store.clone()); let file_path = location::index_file_path(&self.region_dir, file_id); - let file_reader = self - .store - .reader( - &file_path, - &INDEX_PUFFIN_READ_BYTES_TOTAL, - &INDEX_PUFFIN_READ_OP_TOTAL, - &INDEX_PUFFIN_SEEK_OP_TOTAL, - ) - .await?; - Ok(PuffinFileReader::new(file_reader)) - } - - /// Helper function to create a [`PuffinBlobReader`] for the index blob of the provided index file reader. - async fn index_blob_reader( - puffin_reader: &mut PuffinFileReader, - ) -> Result { - let file_meta = puffin_reader - .metadata() + puffin_manager + .reader(&file_path) .await - .context(PuffinReadMetadataSnafu)?; - let blob_meta = file_meta - .blobs - .iter() - .find(|blob| blob.blob_type == INDEX_BLOB_TYPE) - .context(PuffinBlobTypeNotFoundSnafu { - blob_type: INDEX_BLOB_TYPE, - })?; - puffin_reader - .blob_reader(blob_meta) - .context(PuffinReadBlobSnafu) + .context(PuffinBuildReaderSnafu)? + .blob(INDEX_BLOB_TYPE) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu) } } @@ -194,35 +164,26 @@ mod tests { use futures::io::Cursor; use index::inverted_index::search::index_apply::MockIndexApplier; use object_store::services::Memory; - use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; + use puffin::puffin_manager::PuffinWriter; use super::*; - use crate::error::Error; #[tokio::test] async fn test_index_applier_apply_basic() { + let (_d, puffin_manager_factory) = + PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await; let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); let file_id = FileId::random(); let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new( - object_store - .writer(&path) - .await - .unwrap() - .into_futures_async_write(), - ); - puffin_writer - .add_blob(Blob { - blob_type: INDEX_BLOB_TYPE.to_string(), - compressed_data: Cursor::new(vec![]), - properties: Default::default(), - compression_codec: None, - }) + let puffin_manager = puffin_manager_factory.build(object_store.clone()); + let mut writer = puffin_manager.writer(&path).await.unwrap(); + writer + .put_blob(INDEX_BLOB_TYPE, Cursor::new(vec![]), Default::default()) .await .unwrap(); - puffin_writer.finish().await.unwrap(); + writer.finish().await.unwrap(); let mut mock_index_applier = MockIndexApplier::new(); mock_index_applier.expect_memory_usage().returning(|| 100); @@ -240,6 +201,7 @@ mod tests { object_store, None, Box::new(mock_index_applier), + puffin_manager_factory, ); let output = sst_index_applier.apply(file_id).await.unwrap(); assert_eq!( @@ -254,28 +216,21 @@ mod tests { #[tokio::test] async fn test_index_applier_apply_invalid_blob_type() { + let (_d, puffin_manager_factory) = + PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_") + .await; let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); let file_id = FileId::random(); let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new( - object_store - .writer(&path) - .await - .unwrap() - .into_futures_async_write(), - ); - puffin_writer - .add_blob(Blob { - blob_type: "invalid_blob_type".to_string(), - compressed_data: Cursor::new(vec![]), - properties: Default::default(), - compression_codec: None, - }) + let puffin_manager = puffin_manager_factory.build(object_store.clone()); + let mut writer = puffin_manager.writer(&path).await.unwrap(); + writer + .put_blob("invalid_blob_type", Cursor::new(vec![]), Default::default()) .await .unwrap(); - puffin_writer.finish().await.unwrap(); + writer.finish().await.unwrap(); let mut mock_index_applier = MockIndexApplier::new(); mock_index_applier.expect_memory_usage().returning(|| 100); @@ -287,8 +242,9 @@ mod tests { object_store, None, Box::new(mock_index_applier), + puffin_manager_factory, ); let res = sst_index_applier.apply(file_id).await; - assert!(matches!(res, Err(Error::PuffinBlobTypeNotFound { .. }))); + assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); } } diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index c414e91deb..1a4c1735ab 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -38,6 +38,7 @@ use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnaf use crate::row_converter::SortField; use crate::sst::index::applier::SstIndexApplier; use crate::sst::index::codec::IndexValueCodec; +use crate::sst::index::puffin_manager::PuffinManagerFactory; /// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. pub(crate) struct SstIndexApplierBuilder<'a> { @@ -58,6 +59,9 @@ pub(crate) struct SstIndexApplierBuilder<'a> { /// Stores predicates during traversal on the Expr tree. output: HashMap>, + + /// The puffin manager factory. + puffin_manager_factory: PuffinManagerFactory, } impl<'a> SstIndexApplierBuilder<'a> { @@ -68,6 +72,7 @@ impl<'a> SstIndexApplierBuilder<'a> { file_cache: Option, metadata: &'a RegionMetadata, ignore_column_ids: HashSet, + puffin_manager_factory: PuffinManagerFactory, ) -> Self { Self { region_dir, @@ -76,6 +81,7 @@ impl<'a> SstIndexApplierBuilder<'a> { metadata, ignore_column_ids, output: HashMap::default(), + puffin_manager_factory, } } @@ -102,6 +108,7 @@ impl<'a> SstIndexApplierBuilder<'a> { self.object_store, self.file_cache, Box::new(applier.context(BuildIndexApplierSnafu)?), + self.puffin_manager_factory, ))) } @@ -306,6 +313,8 @@ mod tests { #[test] fn test_collect_and_basic() { + let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_"); + let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -313,6 +322,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let expr = Expr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs index 9f761328f3..00740c8521 100644 --- a/src/mito2/src/sst/index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -66,9 +66,11 @@ mod tests { encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; + use crate::sst::index::puffin_manager::PuffinManagerFactory; #[test] fn test_collect_between_basic() { + let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -76,6 +78,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let between = Between { @@ -108,6 +111,8 @@ mod tests { #[test] fn test_collect_between_negated() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_between_negated_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -115,6 +120,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let between = Between { @@ -130,6 +136,8 @@ mod tests { #[test] fn test_collect_between_field_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -137,6 +145,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let between = Between { @@ -152,6 +161,8 @@ mod tests { #[test] fn test_collect_between_type_mismatch() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -159,6 +170,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let between = Between { @@ -175,6 +187,8 @@ mod tests { #[test] fn test_collect_between_nonexistent_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -182,6 +196,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let between = Between { diff --git a/src/mito2/src/sst/index/applier/builder/comparison.rs b/src/mito2/src/sst/index/applier/builder/comparison.rs index 4914a7578c..74a67aac6f 100644 --- a/src/mito2/src/sst/index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/applier/builder/comparison.rs @@ -138,6 +138,7 @@ mod tests { encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; + use crate::sst::index::puffin_manager::PuffinManagerFactory; #[test] fn test_collect_comparison_basic() { @@ -224,6 +225,8 @@ mod tests { ), ]; + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -231,6 +234,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); for ((left, op, right), _) in &cases { @@ -249,6 +253,8 @@ mod tests { #[test] fn test_collect_comparison_type_mismatch() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -256,6 +262,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10)); @@ -265,6 +272,8 @@ mod tests { #[test] fn test_collect_comparison_field_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -272,6 +281,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); builder @@ -282,6 +292,8 @@ mod tests { #[test] fn test_collect_comparison_nonexistent_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -289,6 +301,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let res = builder.collect_comparison_expr( diff --git a/src/mito2/src/sst/index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/applier/builder/eq_list.rs index 23a4d7516d..a01f77d413 100644 --- a/src/mito2/src/sst/index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/applier/builder/eq_list.rs @@ -128,9 +128,11 @@ mod tests { encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, tag_column2, test_object_store, test_region_metadata, }; + use crate::sst::index::puffin_manager::PuffinManagerFactory; #[test] fn test_collect_eq_basic() { + let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -138,6 +140,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); builder @@ -165,6 +168,8 @@ mod tests { #[test] fn test_collect_eq_field_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -172,6 +177,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); builder @@ -182,6 +188,8 @@ mod tests { #[test] fn test_collect_eq_nonexistent_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -189,6 +197,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc")); @@ -198,6 +207,8 @@ mod tests { #[test] fn test_collect_eq_type_mismatch() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -205,6 +216,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let res = builder.collect_eq(&tag_column(), &int64_lit(1)); @@ -214,6 +226,8 @@ mod tests { #[test] fn test_collect_or_eq_list_basic() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -221,6 +235,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { @@ -269,6 +284,8 @@ mod tests { #[test] fn test_collect_or_eq_list_invalid_op() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -276,6 +293,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { @@ -303,6 +321,8 @@ mod tests { #[test] fn test_collect_or_eq_list_multiple_columns() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -310,6 +330,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/index/applier/builder/in_list.rs b/src/mito2/src/sst/index/applier/builder/in_list.rs index ead08943fa..c9e0068530 100644 --- a/src/mito2/src/sst/index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/applier/builder/in_list.rs @@ -59,9 +59,11 @@ mod tests { encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; + use crate::sst::index::puffin_manager::PuffinManagerFactory; #[test] fn test_collect_in_list_basic() { + let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -69,6 +71,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let in_list = InList { @@ -91,6 +94,8 @@ mod tests { #[test] fn test_collect_in_list_negated() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -98,6 +103,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let in_list = InList { @@ -112,6 +118,8 @@ mod tests { #[test] fn test_collect_in_list_field_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -119,6 +127,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let in_list = InList { @@ -133,6 +142,8 @@ mod tests { #[test] fn test_collect_in_list_type_mismatch() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -140,6 +151,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let in_list = InList { @@ -155,6 +167,9 @@ mod tests { #[test] fn test_collect_in_list_nonexistent_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_"); + let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -162,6 +177,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let in_list = InList { diff --git a/src/mito2/src/sst/index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/applier/builder/regex_match.rs index b318fd6308..f341a03a69 100644 --- a/src/mito2/src/sst/index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/applier/builder/regex_match.rs @@ -53,9 +53,11 @@ mod tests { field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, test_region_metadata, }; + use crate::sst::index::puffin_manager::PuffinManagerFactory; #[test] fn test_regex_match_basic() { + let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -63,6 +65,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); builder @@ -81,6 +84,8 @@ mod tests { #[test] fn test_regex_match_field_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -88,6 +93,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); builder @@ -99,6 +105,8 @@ mod tests { #[test] fn test_regex_match_type_mismatch() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -106,6 +114,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); builder @@ -117,6 +126,8 @@ mod tests { #[test] fn test_regex_match_type_nonexist_column() { + let (_d, facotry) = + PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_"); let metadata = test_region_metadata(); let mut builder = SstIndexApplierBuilder::new( "test".to_string(), @@ -124,6 +135,7 @@ mod tests { None, &metadata, HashSet::default(), + facotry, ); let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc")); diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 548f1f9349..a2553baa23 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -332,6 +332,7 @@ mod tests { use super::*; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::index::applier::builder::SstIndexApplierBuilder; + use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location; fn mock_object_store() -> ObjectStore { @@ -403,8 +404,10 @@ mod tests { } async fn build_applier_factory( + prefix: &str, tags: BTreeSet<(&'static str, i32)>, ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec> { + let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let region_dir = "region0".to_string(); let sst_file_id = FileId::random(); let file_path = location::index_file_path(®ion_dir, sst_file_id); @@ -433,12 +436,14 @@ mod tests { assert_eq!(row_count, tags.len() * segment_row_count); move |expr| { + let _d = &d; let applier = SstIndexApplierBuilder::new( region_dir.clone(), object_store.clone(), None, ®ion_metadata, Default::default(), + factory.clone(), ) .build(&[expr]) .unwrap() @@ -469,7 +474,7 @@ mod tests { ("abc", 3), ]); - let applier_factory = build_applier_factory(tags).await; + let applier_factory = build_applier_factory("test_create_and_query_get_key_", tags).await; let expr = col("tag_str").eq(lit("aaa")); let res = applier_factory(expr).await; @@ -508,7 +513,7 @@ mod tests { ("abc", 3), ]); - let applier_factory = build_applier_factory(tags).await; + let applier_factory = build_applier_factory("test_create_and_query_range_", tags).await; let expr = col("tag_str").between(lit("aaa"), lit("aab")); let res = applier_factory(expr).await; @@ -541,7 +546,8 @@ mod tests { ("abc", 3), ]); - let applier_factory = build_applier_factory(tags).await; + let applier_factory = + build_applier_factory("test_create_and_query_comparison_", tags).await; let expr = col("tag_str").lt(lit("aab")); let res = applier_factory(expr).await; @@ -600,7 +606,7 @@ mod tests { ("abc", 3), ]); - let applier_factory = build_applier_factory(tags).await; + let applier_factory = build_applier_factory("test_create_and_query_regex_", tags).await; let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*")); let res = applier_factory(expr).await; diff --git a/src/mito2/src/sst/index/creator/statistics.rs b/src/mito2/src/sst/index/creator/statistics.rs index 60cabe44e8..bcf6569d48 100644 --- a/src/mito2/src/sst/index/creator/statistics.rs +++ b/src/mito2/src/sst/index/creator/statistics.rs @@ -16,6 +16,9 @@ use std::time::{Duration, Instant}; use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_ELAPSED, INDEX_CREATE_ROWS_TOTAL}; +pub(crate) type ByteCount = u64; +pub(crate) type RowCount = usize; + /// Stage of the index creation process. enum Stage { Update, @@ -33,9 +36,9 @@ pub(crate) struct Statistics { /// Accumulated elapsed time for the cleanup stage. cleanup_eplased: Duration, /// Number of rows in the index. - row_count: usize, + row_count: RowCount, /// Number of bytes in the index. - byte_count: u64, + byte_count: ByteCount, } impl Statistics { @@ -58,12 +61,12 @@ impl Statistics { } /// Returns row count. - pub fn row_count(&self) -> usize { + pub fn row_count(&self) -> RowCount { self.row_count } /// Returns byte count. - pub fn byte_count(&self) -> u64 { + pub fn byte_count(&self) -> ByteCount { self.byte_count } } diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index cf48c9e6eb..18e63e827c 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -33,8 +33,8 @@ pub struct IntermediateManager { impl IntermediateManager { /// Create a new `IntermediateManager` with the given root path. /// It will clean up all garbage intermediate files from previous runs. - pub async fn init_fs(root_path: impl AsRef) -> Result { - let store = new_fs_object_store(&normalize_dir(root_path.as_ref())).await?; + pub async fn init_fs(aux_path: impl AsRef) -> Result { + let store = new_fs_object_store(&normalize_dir(aux_path.as_ref())).await?; let store = InstrumentedStore::new(store); // Remove all garbage intermediate files from previous runs. diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs new file mode 100644 index 0000000000..85cfbfd6b7 --- /dev/null +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -0,0 +1,207 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::Path; +use std::sync::Arc; + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore}; +use puffin::error::{self as puffin_error, Result as PuffinResult}; +use puffin::puffin_manager::file_accessor::PuffinFileAccessor; +use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; +use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard, FsDirGuard}; +use puffin::puffin_manager::BlobGuard; +use snafu::ResultExt; + +use crate::error::{PuffinInitStagerSnafu, Result}; +use crate::metrics::{ + INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL, + INDEX_PUFFIN_SEEK_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, +}; +use crate::sst::index::store::{self, InstrumentedStore}; + +type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>; +type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>; + +pub(crate) type BlobReader = as BlobGuard>::Reader; +pub(crate) type SstPuffinManager = FsPuffinManager< + Arc, + Arc, + InstrumentedAsyncRead, + InstrumentedAsyncWrite, +>; + +const STAGING_DIR: &str = "staging"; + +/// A factory for creating `SstPuffinManager` instances. +#[derive(Clone)] +pub struct PuffinManagerFactory { + /// The stager used by the puffin manager. + stager: Arc, + + /// The size of the write buffer used to create object store. + write_buffer_size: Option, +} + +impl PuffinManagerFactory { + /// Creates a new `PuffinManagerFactory` instance. + pub async fn new( + aux_path: impl AsRef, + staging_capacity: u64, + write_buffer_size: Option, + ) -> Result { + let staging_dir = aux_path.as_ref().join(STAGING_DIR); + let stager = BoundedStager::new(staging_dir, staging_capacity) + .await + .context(PuffinInitStagerSnafu)?; + Ok(Self { + stager: Arc::new(stager), + write_buffer_size, + }) + } + + pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager { + let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size); + let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store); + SstPuffinManager::new(self.stager.clone(), Arc::new(puffin_file_accessor)) + } +} + +impl PuffinManagerFactory { + #[cfg(test)] + pub(crate) async fn new_for_test_async( + prefix: &str, + ) -> (common_test_util::temp_dir::TempDir, Self) { + let tempdir = common_test_util::temp_dir::create_temp_dir(prefix); + let factory = Self::new(tempdir.path().to_path_buf(), 1024, None) + .await + .unwrap(); + (tempdir, factory) + } + + #[cfg(test)] + pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) { + let tempdir = common_test_util::temp_dir::create_temp_dir(prefix); + + let f = Self::new(tempdir.path().to_path_buf(), 1024, None); + let factory = common_runtime::block_on_bg(f).unwrap(); + + (tempdir, factory) + } +} + +/// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage. +pub(crate) struct ObjectStorePuffinFileAccessor { + object_store: InstrumentedStore, +} + +impl ObjectStorePuffinFileAccessor { + pub fn new(object_store: InstrumentedStore) -> Self { + Self { object_store } + } +} + +#[async_trait] +impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { + type Reader = InstrumentedAsyncRead; + type Writer = InstrumentedAsyncWrite; + + async fn reader(&self, puffin_file_name: &str) -> PuffinResult { + self.object_store + .reader( + puffin_file_name, + &INDEX_PUFFIN_READ_BYTES_TOTAL, + &INDEX_PUFFIN_READ_OP_TOTAL, + &INDEX_PUFFIN_SEEK_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(puffin_error::ExternalSnafu) + } + + async fn writer(&self, puffin_file_name: &str) -> PuffinResult { + self.object_store + .writer( + puffin_file_name, + &INDEX_PUFFIN_WRITE_BYTES_TOTAL, + &INDEX_PUFFIN_WRITE_OP_TOTAL, + &INDEX_PUFFIN_FLUSH_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(puffin_error::ExternalSnafu) + } +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_temp_dir; + use futures::io::Cursor; + use futures::AsyncReadExt; + use object_store::services::Memory; + use puffin::blob_metadata::CompressionCodec; + use puffin::puffin_manager::{ + BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions, + }; + + use super::*; + + #[tokio::test] + async fn test_puffin_manager_factory() { + let (_dir, factory) = + PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await; + + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let manager = factory.build(object_store); + + let file_name = "my-puffin-file"; + let blob_key = "blob-key"; + let dir_key = "dir-key"; + let raw_data = b"hello world!"; + + let mut writer = manager.writer(file_name).await.unwrap(); + writer + .put_blob(blob_key, Cursor::new(raw_data), PutOptions::default()) + .await + .unwrap(); + let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_"); + tokio::fs::write(dir_data.path().join("hello"), raw_data) + .await + .unwrap(); + writer + .put_dir( + dir_key, + dir_data.path().into(), + PutOptions { + compression: Some(CompressionCodec::Zstd), + }, + ) + .await + .unwrap(); + writer.finish().await.unwrap(); + + let reader = manager.reader(file_name).await.unwrap(); + let blob_guard = reader.blob(blob_key).await.unwrap(); + let mut blob_reader = blob_guard.reader().await.unwrap(); + let mut buf = Vec::new(); + blob_reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, raw_data); + + let dir_guard = reader.dir(dir_key).await.unwrap(); + let file = dir_guard.path().join("hello"); + let data = tokio::fs::read(file).await.unwrap(); + assert_eq!(data, raw_data); + } +} diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 590c66e08c..a6ffe0b2bf 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -20,7 +20,6 @@ use common_base::Plugins; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use object_store::services::Fs; -use object_store::util::join_dir; use object_store::ObjectStore; use store_api::metadata::RegionMetadataRef; use tokio::sync::mpsc::Sender; @@ -36,6 +35,7 @@ use crate::region::{ManifestContext, ManifestContextRef, RegionState}; use crate::request::WorkerRequest; use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef}; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::worker::WorkerListener; /// Scheduler mocker. @@ -55,11 +55,20 @@ impl SchedulerEnv { let mut builder = Fs::default(); builder.root(&path_str); - let intm_mgr = IntermediateManager::init_fs(join_dir(&path_str, "intm")) + let index_aux_path = path.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) .await .unwrap(); let object_store = ObjectStore::new(builder).unwrap().finish(); - let access_layer = Arc::new(AccessLayer::new("", object_store.clone(), intm_mgr)); + let access_layer = Arc::new(AccessLayer::new( + "", + object_store.clone(), + puffin_mgr, + intm_mgr, + )); SchedulerEnv { path, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2aa251fc10..2a9edf15f4 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -58,6 +58,7 @@ use crate::request::{ }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::wal::Wal; @@ -132,10 +133,15 @@ impl WorkerGroup { let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( config.global_write_buffer_size.as_bytes() as usize, )); - let intermediate_manager = - IntermediateManager::init_fs(&config.inverted_index.intermediate_path) - .await? - .with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _)); + let puffin_manager_factory = PuffinManagerFactory::new( + &config.index.aux_path, + config.index.staging_size.as_bytes(), + Some(config.index.write_buffer_size.as_bytes() as _), + ) + .await?; + let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) + .await? + .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); // We use another scheduler to avoid purge jobs blocking other jobs. // A purge job is cheaper than other background jobs so they share the same job limit. @@ -169,6 +175,7 @@ impl WorkerGroup { purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::default(), cache_manager: cache_manager.clone(), + puffin_manager_factory: puffin_manager_factory.clone(), intermediate_manager: intermediate_manager.clone(), time_provider: time_provider.clone(), flush_sender: flush_sender.clone(), @@ -261,10 +268,15 @@ impl WorkerGroup { }); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let intermediate_manager = - IntermediateManager::init_fs(&config.inverted_index.intermediate_path) - .await? - .with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _)); + let puffin_manager_factory = PuffinManagerFactory::new( + &config.index.aux_path, + config.index.staging_size.as_bytes(), + Some(config.index.write_buffer_size.as_bytes() as _), + ) + .await?; + let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) + .await? + .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); let write_cache = write_cache_from_config( &config, object_store_manager.clone(), @@ -292,6 +304,7 @@ impl WorkerGroup { purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::new(listener.clone()), cache_manager: cache_manager.clone(), + puffin_manager_factory: puffin_manager_factory.clone(), intermediate_manager: intermediate_manager.clone(), time_provider: time_provider.clone(), flush_sender: flush_sender.clone(), @@ -361,6 +374,7 @@ struct WorkerStarter { purge_scheduler: SchedulerRef, listener: WorkerListener, cache_manager: CacheManagerRef, + puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, time_provider: TimeProviderRef, /// Watch channel sender to notify workers to handle stalled requests. @@ -408,6 +422,7 @@ impl WorkerStarter { stalled_requests: StalledRequests::default(), listener: self.listener, cache_manager: self.cache_manager, + puffin_manager_factory: self.puffin_manager_factory, intermediate_manager: self.intermediate_manager, time_provider: self.time_provider, last_periodical_check_millis: now, @@ -586,6 +601,8 @@ struct RegionWorkerLoop { listener: WorkerListener, /// Cache. cache_manager: CacheManagerRef, + /// Puffin manager factory for index. + puffin_manager_factory: PuffinManagerFactory, /// Intermediate manager for inverted index. intermediate_manager: IntermediateManager, /// Provider to get current time. diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index a4353fe529..e01680ab17 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -54,6 +54,7 @@ impl RegionWorkerLoop { self.memtable_builder_provider.clone(), self.object_store_manager.clone(), self.purge_scheduler.clone(), + self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), ) .cache(Some(self.cache_manager.clone())) diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index f07a1f38a1..e99c0a8102 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -61,6 +61,7 @@ impl RegionWorkerLoop { self.memtable_builder_provider.clone(), self.object_store_manager.clone(), self.purge_scheduler.clone(), + self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), ) .metadata(metadata) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 840e19583c..d87f531a72 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -93,6 +93,7 @@ impl RegionWorkerLoop { self.memtable_builder_provider.clone(), self.object_store_manager.clone(), self.purge_scheduler.clone(), + self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), ) .skip_wal_replay(request.skip_wal_replay) diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 8a28dffdcb..b30c542f4e 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::io::Error as IoError; use std::sync::Arc; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -248,6 +248,14 @@ pub enum Error { #[snafu(display("Get value from cache"))] CacheGet { source: Arc }, + + #[snafu(display("External error"))] + External { + #[snafu(source)] + error: BoxedError, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -287,6 +295,8 @@ impl ErrorExt for Error { DuplicateBlob { .. } => StatusCode::InvalidArguments, CacheGet { source } => source.status_code(), + + External { error, .. } => error.status_code(), } } diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index c390e9910a..47d2eb8eb0 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -18,7 +18,7 @@ use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; -pub use bounded_stager::BoundedStager; +pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard}; use futures::future::BoxFuture; use futures::AsyncWrite; diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 2b732450ca..e63d4f5524 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -68,6 +68,10 @@ pub struct BoundedStager { impl BoundedStager { pub async fn new(base_dir: PathBuf, capacity: u64) -> Result { + tokio::fs::create_dir_all(&base_dir) + .await + .context(CreateSnafu)?; + let recycle_bin = Cache::builder() .time_to_live(Duration::from_secs(60)) .build(); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 1771377ee5..c9c8468078 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -826,13 +826,16 @@ sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 allow_stale_entries = false +[region_engine.mito.index] +aux_path = "" +staging_size = "2GiB" +write_buffer_size = "8MiB" + [region_engine.mito.inverted_index] create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" -write_buffer_size = "8MiB" mem_threshold_on_create = "64.0MiB" -intermediate_path = "" [region_engine.mito.memtable] type = "time_series"