diff --git a/config/config.md b/config/config.md index 1831a2f644..aaa92c7f35 100644 --- a/config/config.md +++ b/config/config.md @@ -152,6 +152,7 @@ | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | +| `region_engine.mito.index.staging_ttl` | String | `7d` | The TTL of the staging directory.
Defaults to 7 days.
Setting it to "0s" to disable TTL. | | `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. | | `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. | | `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. | @@ -491,6 +492,7 @@ | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | +| `region_engine.mito.index.staging_ttl` | String | `7d` | The TTL of the staging directory.
Defaults to 7 days.
Setting it to "0s" to disable TTL. | | `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. | | `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. | | `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index a4acd1aa89..52eaea9190 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -497,6 +497,11 @@ aux_path = "" ## The max capacity of the staging directory. staging_size = "2GB" +## The TTL of the staging directory. +## Defaults to 7 days. +## Setting it to "0s" to disable TTL. +staging_ttl = "7d" + ## Cache size for inverted index metadata. metadata_cache_size = "64MiB" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index bea6984a65..c42966e410 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -584,6 +584,11 @@ aux_path = "" ## The max capacity of the staging directory. staging_size = "2GB" +## The TTL of the staging directory. +## Defaults to 7 days. +## Setting it to "0s" to disable TTL. +staging_ttl = "7d" + ## Cache size for inverted index metadata. metadata_cache_size = "64MiB" diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs index f0c0649575..3c10f0568d 100644 --- a/src/index/src/fulltext_index/tests.rs +++ b/src/index/src/fulltext_index/tests.rs @@ -30,7 +30,7 @@ async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc) { let path = staging_dir.path().to_path_buf(); ( staging_dir, - Arc::new(BoundedStager::new(path, 102400, None).await.unwrap()), + Arc::new(BoundedStager::new(path, 102400, None, None).await.unwrap()), ) } diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 6bda8c578f..fb6e7bd03f 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -135,6 +135,7 @@ pub async fn open_compaction_region( &mito_config.index.aux_path, mito_config.index.staging_size.as_bytes(), Some(mito_config.index.write_buffer_size.as_bytes() as _), + mito_config.index.staging_ttl, ) .await?; let intermediate_manager = diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 8427b9a408..b1cfa8efab 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -299,6 +299,11 @@ pub struct IndexConfig { /// The max capacity of the staging directory. pub staging_size: ReadableSize, + /// The TTL of the staging directory. + /// Defaults to 7 days. + /// Setting it to "0s" to disable TTL. + #[serde(with = "humantime_serde")] + pub staging_ttl: Option, /// Write buffer size for creating the index. pub write_buffer_size: ReadableSize, @@ -316,6 +321,7 @@ impl Default for IndexConfig { Self { aux_path: String::new(), staging_size: ReadableSize::gb(2), + staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)), write_buffer_size: ReadableSize::mb(8), metadata_cache_size: ReadableSize::mb(64), content_cache_size: ReadableSize::mb(128), @@ -352,6 +358,10 @@ impl IndexConfig { ); } + if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) { + self.staging_ttl = None; + } + Ok(()) } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 6860abc082..74ea47b653 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -823,6 +823,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to purge puffin stager"))] + PuffinPurgeStager { + source: puffin::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to build puffin reader"))] PuffinBuildReader { source: puffin::error::Error, @@ -1062,7 +1069,8 @@ impl ErrorExt for Error { PuffinReadBlob { source, .. } | PuffinAddBlob { source, .. } | PuffinInitStager { source, .. } - | PuffinBuildReader { source, .. } => source.status_code(), + | PuffinBuildReader { source, .. } + | PuffinPurgeStager { source, .. } => source.status_code(), CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, StaleLogEntry { .. } => StatusCode::Unexpected, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index e9959ae562..dc8829c330 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -154,6 +154,10 @@ pub enum IndexType { } impl FileMeta { + pub fn exists_index(&self) -> bool { + !self.available_indexes.is_empty() + } + /// Returns true if the file has an inverted index pub fn inverted_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::InvertedIndex) diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 7d81445c67..da59d3aec2 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -92,8 +92,8 @@ impl FilePurger for LocalFilePurger { if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { - // Removes the inverted index from the cache. - if file_meta.inverted_index_available() { + // Removes index file from the cache. + if file_meta.exists_index() { write_cache .remove(IndexKey::new( file_meta.region_id, @@ -111,6 +111,18 @@ impl FilePurger for LocalFilePurger { )) .await; } + + // Purges index content in the stager. + let puffin_file_name = + crate::sst::location::index_file_path(sst_layer.region_dir(), file_meta.file_id); + if let Err(e) = sst_layer + .puffin_manager_factory() + .purge_stager(&puffin_file_name) + .await + { + error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", + file_meta.file_id, file_meta.region_id); + } })) { error!(e; "Failed to schedule the file purge request"); } @@ -146,7 +158,7 @@ mod tests { let path = location::sst_file_path(sst_dir, sst_file_id); let index_aux_path = dir.path().join("index_aux"); - let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) .await .unwrap(); let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) @@ -202,7 +214,7 @@ mod tests { let sst_dir = "table1"; let index_aux_path = dir.path().join("index_aux"); - let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) .await .unwrap(); let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index d8559d2e07..5d54da5ffb 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -14,6 +14,7 @@ use std::path::Path; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use common_error::ext::BoxedError; @@ -21,11 +22,11 @@ use object_store::{FuturesAsyncWriter, ObjectStore}; use puffin::error::{self as puffin_error, Result as PuffinResult}; use puffin::puffin_manager::file_accessor::PuffinFileAccessor; use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; -use puffin::puffin_manager::stager::BoundedStager; +use puffin::puffin_manager::stager::{BoundedStager, Stager}; use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; -use crate::error::{PuffinInitStagerSnafu, Result}; +use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result}; use crate::metrics::{ StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, @@ -61,12 +62,14 @@ impl PuffinManagerFactory { aux_path: impl AsRef, staging_capacity: u64, write_buffer_size: Option, + staging_ttl: Option, ) -> Result { let staging_dir = aux_path.as_ref().join(STAGING_DIR); let stager = BoundedStager::new( staging_dir, staging_capacity, Some(Arc::new(StagerMetrics::default())), + staging_ttl, ) .await .context(PuffinInitStagerSnafu)?; @@ -81,6 +84,13 @@ impl PuffinManagerFactory { let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store); SstPuffinManager::new(self.stager.clone(), puffin_file_accessor) } + + pub(crate) async fn purge_stager(&self, puffin_file_name: &str) -> Result<()> { + self.stager + .purge(puffin_file_name) + .await + .context(PuffinPurgeStagerSnafu) + } } #[cfg(test)] @@ -89,7 +99,7 @@ impl PuffinManagerFactory { prefix: &str, ) -> (common_test_util::temp_dir::TempDir, Self) { let tempdir = common_test_util::temp_dir::create_temp_dir(prefix); - let factory = Self::new(tempdir.path().to_path_buf(), 1024, None) + let factory = Self::new(tempdir.path().to_path_buf(), 1024, None, None) .await .unwrap(); (tempdir, factory) @@ -98,7 +108,7 @@ impl PuffinManagerFactory { pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) { let tempdir = common_test_util::temp_dir::create_temp_dir(prefix); - let f = Self::new(tempdir.path().to_path_buf(), 1024, None); + let f = Self::new(tempdir.path().to_path_buf(), 1024, None, None); let factory = common_runtime::block_on_global(f).unwrap(); (tempdir, factory) diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 9232f478c2..b4b7be1184 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -640,7 +640,7 @@ impl TestEnv { capacity: ReadableSize, ) -> WriteCacheRef { let index_aux_path = self.data_home.path().join("index_aux"); - let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) .await .unwrap(); let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index ba777b157f..d55213369a 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -55,7 +55,7 @@ impl SchedulerEnv { let builder = Fs::default().root(&path_str); let index_aux_path = path.path().join("index_aux"); - let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) .await .unwrap(); let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2f089d9a97..bd09b3f4ee 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -146,6 +146,7 @@ impl WorkerGroup { &config.index.aux_path, config.index.staging_size.as_bytes(), Some(config.index.write_buffer_size.as_bytes() as _), + config.index.staging_ttl, ) .await?; let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) @@ -295,6 +296,7 @@ impl WorkerGroup { &config.index.aux_path, config.index.staging_size.as_bytes(), Some(config.index.write_buffer_size.as_bytes() as _), + config.index.staging_ttl, ) .await?; let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index 5dc2cb31fc..ad21f88989 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -88,6 +88,9 @@ pub trait Stager: Send + Sync { dir_path: PathBuf, dir_size: u64, ) -> Result<()>; + + /// Purges all content for the given puffin file from the staging area. + async fn purge(&self, puffin_file_name: &str) -> Result<()>; } /// `StagerNotifier` provides a way to notify the caller of the staging events. diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 46ea2548ad..508ba68a31 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -78,19 +78,21 @@ impl BoundedStager { base_dir: PathBuf, capacity: u64, notifier: Option>, + cache_ttl: Option, ) -> Result { tokio::fs::create_dir_all(&base_dir) .await .context(CreateSnafu)?; - let recycle_bin = Cache::builder().time_to_live(RECYCLE_BIN_TTL).build(); - + let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build(); let recycle_bin_cloned = recycle_bin.clone(); let notifier_cloned = notifier.clone(); - let cache = Cache::builder() + + let mut cache_builder = Cache::builder() .max_capacity(capacity) .weigher(|_: &String, v: &CacheValue| v.weight()) .eviction_policy(EvictionPolicy::lru()) + .support_invalidation_closures() .async_eviction_listener(move |k, v, _| { let recycle_bin = recycle_bin_cloned.clone(); if let Some(notifier) = notifier_cloned.as_ref() { @@ -101,8 +103,13 @@ impl BoundedStager { recycle_bin.insert(k.as_str().to_string(), v).await; } .boxed() - }) - .build(); + }); + if let Some(ttl) = cache_ttl { + if !ttl.is_zero() { + cache_builder = cache_builder.time_to_live(ttl); + } + } + let cache = cache_builder.build(); let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE); let notifier_cloned = notifier.clone(); @@ -162,6 +169,7 @@ impl Stager for BoundedStager { notifier.on_load_blob(timer.elapsed()); } let guard = Arc::new(FsBlobGuard { + puffin_file_name: puffin_file_name.to_string(), path, delete_queue: self.delete_queue.clone(), size, @@ -216,6 +224,7 @@ impl Stager for BoundedStager { notifier.on_load_dir(timer.elapsed()); } let guard = Arc::new(FsDirGuard { + puffin_file_name: puffin_file_name.to_string(), path, size, delete_queue: self.delete_queue.clone(), @@ -266,6 +275,7 @@ impl Stager for BoundedStager { notifier.on_cache_insert(size); } let guard = Arc::new(FsDirGuard { + puffin_file_name: puffin_file_name.to_string(), path, size, delete_queue: self.delete_queue.clone(), @@ -284,6 +294,15 @@ impl Stager for BoundedStager { Ok(()) } + + async fn purge(&self, puffin_file_name: &str) -> Result<()> { + let file_name = puffin_file_name.to_string(); + self.cache + .invalidate_entries_if(move |_k, v| v.puffin_file_name() == file_name) + .unwrap(); // SAFETY: `support_invalidation_closures` is enabled + self.cache.run_pending_tasks().await; + Ok(()) + } } impl BoundedStager { @@ -337,6 +356,9 @@ impl BoundedStager { } /// Recovers the staging area by iterating through the staging directory. + /// + /// Note: It can't recover the mapping between puffin files and keys, so TTL + /// is configured to purge the dangling files and directories. async fn recover(&self) -> Result<()> { let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?; @@ -376,6 +398,9 @@ impl BoundedStager { path, size, delete_queue: self.delete_queue.clone(), + + // placeholder + puffin_file_name: String::new(), })); // A duplicate dir will be moved to the delete queue. let _dup_dir = elems.insert(key, v); @@ -385,6 +410,9 @@ impl BoundedStager { path, size, delete_queue: self.delete_queue.clone(), + + // placeholder + puffin_file_name: String::new(), })); // A duplicate file will be moved to the delete queue. let _dup_file = elems.insert(key, v); @@ -506,6 +534,13 @@ impl CacheValue { fn weight(&self) -> u32 { self.size().try_into().unwrap_or(u32::MAX) } + + fn puffin_file_name(&self) -> &str { + match self { + CacheValue::File(guard) => &guard.puffin_file_name, + CacheValue::Dir(guard) => &guard.puffin_file_name, + } + } } enum DeleteTask { @@ -518,6 +553,7 @@ enum DeleteTask { /// automatically deleting the file on drop. #[derive(Debug)] pub struct FsBlobGuard { + puffin_file_name: String, path: PathBuf, size: u64, delete_queue: Sender, @@ -550,6 +586,7 @@ impl Drop for FsBlobGuard { /// automatically deleting the directory on drop. #[derive(Debug)] pub struct FsDirGuard { + puffin_file_name: String, path: PathBuf, size: u64, delete_queue: Sender, @@ -754,6 +791,7 @@ mod tests { tempdir.path().to_path_buf(), u64::MAX, Some(notifier.clone()), + None, ) .await .unwrap(); @@ -810,6 +848,7 @@ mod tests { tempdir.path().to_path_buf(), u64::MAX, Some(notifier.clone()), + None, ) .await .unwrap(); @@ -884,6 +923,7 @@ mod tests { tempdir.path().to_path_buf(), u64::MAX, Some(notifier.clone()), + None, ) .await .unwrap(); @@ -937,7 +977,7 @@ mod tests { // recover stager drop(stager); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None) .await .unwrap(); @@ -997,6 +1037,7 @@ mod tests { tempdir.path().to_path_buf(), 1, /* extremely small size */ Some(notifier.clone()), + None, ) .await .unwrap(); @@ -1217,7 +1258,7 @@ mod tests { #[tokio::test] async fn test_get_blob_concurrency_on_fail() { let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_"); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None) .await .unwrap(); @@ -1254,7 +1295,7 @@ mod tests { #[tokio::test] async fn test_get_dir_concurrency_on_fail() { let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_"); - let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None) .await .unwrap(); @@ -1287,4 +1328,84 @@ mod tests { assert!(!stager.in_cache(puffin_file_name, key)); } + + #[tokio::test] + async fn test_purge() { + let tempdir = create_temp_dir("test_purge_"); + let notifier = MockNotifier::build(); + let stager = BoundedStager::new( + tempdir.path().to_path_buf(), + u64::MAX, + Some(notifier.clone()), + None, + ) + .await + .unwrap(); + + // initialize stager + let puffin_file_name = "test_purge"; + let blob_key = "blob_key"; + let guard = stager + .get_blob( + puffin_file_name, + blob_key, + Box::new(|mut writer| { + Box::pin(async move { + writer.write_all(b"hello world").await.unwrap(); + Ok(11) + }) + }), + ) + .await + .unwrap(); + drop(guard); + + let files_in_dir = [ + ("file_a", "Hello, world!".as_bytes()), + ("file_b", "Hello, Rust!".as_bytes()), + ("file_c", "你好,世界!".as_bytes()), + ("subdir/file_d", "Hello, Puffin!".as_bytes()), + ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()), + ]; + + let dir_key = "dir_key"; + let guard = stager + .get_dir( + puffin_file_name, + dir_key, + Box::new(|writer_provider| { + Box::pin(async move { + let mut size = 0; + for (rel_path, content) in &files_in_dir { + size += content.len(); + let mut writer = writer_provider.writer(rel_path).await.unwrap(); + writer.write_all(content).await.unwrap(); + } + Ok(size as _) + }) + }), + ) + .await + .unwrap(); + drop(guard); + + // purge the stager + stager.purge(puffin_file_name).await.unwrap(); + stager.cache.run_pending_tasks().await; + + let stats = notifier.stats(); + assert_eq!( + stats, + Stats { + cache_insert_size: 81, + cache_evict_size: 81, + cache_hit_count: 0, + cache_hit_size: 0, + cache_miss_count: 2, + cache_miss_size: 81, + recycle_insert_size: 81, + recycle_clear_size: 0 + } + ); + } } diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index b4d3450fd5..adfc44692e 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -32,7 +32,11 @@ async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc