diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ae0091c7c2..f9200fc620 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -96,6 +96,11 @@ impl AccessLayer { &self.puffin_manager_factory } + /// Returns the intermediate manager. + pub fn intermediate_manager(&self) -> &IntermediateManager { + &self.intermediate_manager + } + /// Deletes a SST file (and its index file if it has one) with given file id. pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> { let path = location::sst_file_path(&self.region_dir, file_meta.file_id); diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 0083372b2c..a389916cc2 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -131,6 +131,13 @@ impl FilePurger for LocalFilePurger { error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", file_meta.file_id, file_meta.region_id); } + if let Err(e) = sst_layer + .intermediate_manager() + .prune_region_dir(&file_meta.region_id) + .await + { + error!(e; "Failed to prune intermediate region directory, region_id: {}", file_meta.region_id); + } })) { error!(e; "Failed to schedule the file purge request"); } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 3490107f9b..2b4b3f0485 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -110,6 +110,7 @@ pub struct Indexer { last_mem_fulltext_index: usize, bloom_filter_indexer: Option, last_mem_bloom_filter: usize, + intermediate_manager: Option, } impl Indexer { @@ -196,6 +197,7 @@ impl IndexerBuilder for IndexerBuilderImpl { indexer.inverted_indexer = self.build_inverted_indexer(file_id); indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await; indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id); + indexer.intermediate_manager = Some(self.intermediate_manager.clone()); if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() && indexer.bloom_filter_indexer.is_none() diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index 5b29009a03..b285b4891d 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -21,6 +21,7 @@ impl Indexer { self.do_abort_inverted_index().await; self.do_abort_fulltext_index().await; self.do_abort_bloom_filter().await; + self.do_prune_intm_sst_dir().await; self.puffin_manager = None; } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index 56da128934..5c70009e55 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -53,6 +53,7 @@ impl Indexer { return IndexOutput::default(); } + self.do_prune_intm_sst_dir().await; output.file_size = self.do_finish_puffin_writer(writer).await; output } @@ -266,4 +267,12 @@ impl Indexer { output.row_count = row_count; output.columns = column_ids; } + + pub(crate) async fn do_prune_intm_sst_dir(&mut self) { + if let Some(manager) = self.intermediate_manager.take() { + if let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await { + warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id); + } + } + } } diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index 38b1f81cbc..82a1cb987e 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -54,14 +54,22 @@ impl IntermediateManager { aux_path.as_ref() ); + // Remove the intermediate directory on bankground + let aux_pb = PathBuf::from(aux_path.as_ref()); + let intm_dir = aux_pb.join(INTERMEDIATE_DIR); + let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4())); + if let Err(err) = tokio::fs::rename(&intm_dir, &deleted_dir).await { + warn!(err; "Failed to rename intermediate directory"); + } + tokio::spawn(async move { + if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await { + warn!(err; "Failed to remove intermediate directory"); + } + }); + let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?; let store = InstrumentedStore::new(store); - // Remove all garbage intermediate files from previous runs. - if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await { - warn!(err; "Failed to remove garbage intermediate files"); - } - Ok(Self { base_dir: PathBuf::from(aux_path.as_ref()), store, @@ -94,6 +102,24 @@ impl IntermediateManager { .join(sst_file_id.to_string()) .join(format!("fulltext-{column_id}-{uuid}")) } + + /// Prunes the intermediate directory for SST files. + pub(crate) async fn prune_sst_dir( + &self, + region_id: &RegionId, + sst_file_id: &FileId, + ) -> Result<()> { + let region_id = region_id.as_u64(); + let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/"); + self.store.remove_all(&sst_dir).await + } + + /// Prunes the intermediate directory for region files. + pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> { + let region_id = region_id.as_u64(); + let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/"); + self.store.remove_all(®ion_dir).await + } } /// `IntermediateLocation` produces paths for intermediate files @@ -268,6 +294,60 @@ mod tests { .unwrap()); } + #[tokio::test] + async fn test_cleanup_dir() { + let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_"); + + let region_id = RegionId::new(0, 0); + let sst_file_id = FileId::random(); + let region_dir = temp_dir + .path() + .join(INTERMEDIATE_DIR) + .join(region_id.as_u64().to_string()); + let sst_dir = region_dir.join(sst_file_id.to_string()); + + let path = temp_dir.path().to_str().unwrap(); + let manager = IntermediateManager::init_fs(path).await.unwrap(); + + let location = IntermediateLocation::new(®ion_id, &sst_file_id); + let temp_file_provider = TempFileProvider::new(location, manager.clone()); + + let mut f1 = temp_file_provider + .create("sky", "000000000000") + .await + .unwrap(); + f1.write_all(b"hello").await.unwrap(); + f1.flush().await.unwrap(); + f1.close().await.unwrap(); + + let mut f2 = temp_file_provider + .create("sky", "000000000001") + .await + .unwrap(); + f2.write_all(b"world").await.unwrap(); + f2.flush().await.unwrap(); + f2.close().await.unwrap(); + + temp_file_provider.cleanup().await.unwrap(); + + // sst_dir and region_dir still exists + assert!(tokio::fs::try_exists(&sst_dir).await.unwrap()); + assert!(tokio::fs::try_exists(®ion_dir).await.unwrap()); + + // sst_dir should be deleted, region_dir still exists + manager + .prune_sst_dir(®ion_id, &sst_file_id) + .await + .unwrap(); + assert!(tokio::fs::try_exists(®ion_dir).await.unwrap()); + assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap()); + + // sst_dir, region_dir should be deleted + manager.prune_region_dir(®ion_id).await.unwrap(); + assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap()); + assert!(!tokio::fs::try_exists(®ion_dir).await.unwrap()); + } + #[test] fn test_intermediate_location() { let sst_file_id = FileId::random(); diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 939987217e..8ac5c81dcc 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -369,6 +369,9 @@ impl BoundedStager { /// Note: It can't recover the mapping between puffin files and keys, so TTL /// is configured to purge the dangling files and directories. async fn recover(&self) -> Result<()> { + let timer = std::time::Instant::now(); + common_telemetry::info!("Recovering the staging area, base_dir: {:?}", self.base_dir); + let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?; let mut elems = HashMap::new(); @@ -430,6 +433,7 @@ impl BoundedStager { } let mut size = 0; + let num_elems = elems.len(); for (key, value) in elems { size += value.size(); self.cache.insert(key, value).await; @@ -440,6 +444,12 @@ impl BoundedStager { self.cache.run_pending_tasks().await; + info!( + "Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}", + num_elems, + size, + timer.elapsed() + ); Ok(()) }