fix: prune intermediate dirs on index finish and region pruge (#6878)

* fix: prune intermediate dirs on index finish and region pruge

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Zhenchi
2025-09-02 21:57:16 +08:00
committed by Weny Xu
parent 9dc16772fe
commit 4c9fcb7dee
7 changed files with 119 additions and 5 deletions

View File

@@ -189,6 +189,11 @@ impl AccessLayer {
&self.puffin_manager_factory
}
/// Returns the intermediate manager.
pub fn intermediate_manager(&self) -> &IntermediateManager {
&self.intermediate_manager
}
/// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type);

View File

@@ -137,6 +137,13 @@ impl FilePurger for LocalFilePurger {
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
file_meta.file_id(), file_meta.region_id);
}
if let Err(e) = sst_layer
.intermediate_manager()
.prune_region_dir(&file_meta.region_id)
.await
{
error!(e; "Failed to prune intermediate region directory, region_id: {}", file_meta.region_id);
}
})) {
error!(e; "Failed to schedule the file purge request");
}

View File

@@ -110,6 +110,7 @@ pub struct Indexer {
last_mem_fulltext_index: usize,
bloom_filter_indexer: Option<BloomFilterIndexer>,
last_mem_bloom_filter: usize,
intermediate_manager: Option<IntermediateManager>,
}
impl Indexer {
@@ -196,6 +197,7 @@ impl IndexerBuilder for IndexerBuilderImpl {
indexer.inverted_indexer = self.build_inverted_indexer(file_id);
indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
indexer.intermediate_manager = Some(self.intermediate_manager.clone());
if indexer.inverted_indexer.is_none()
&& indexer.fulltext_indexer.is_none()
&& indexer.bloom_filter_indexer.is_none()

View File

@@ -21,6 +21,7 @@ impl Indexer {
self.do_abort_inverted_index().await;
self.do_abort_fulltext_index().await;
self.do_abort_bloom_filter().await;
self.do_prune_intm_sst_dir().await;
self.puffin_manager = None;
}

View File

@@ -54,6 +54,7 @@ impl Indexer {
return IndexOutput::default();
}
self.do_prune_intm_sst_dir().await;
output.file_size = self.do_finish_puffin_writer(writer).await;
output
}
@@ -270,4 +271,12 @@ impl Indexer {
output.row_count = row_count;
output.columns = column_ids;
}
pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
if let Some(manager) = self.intermediate_manager.take() {
if let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await {
warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
}
}
}
}

View File

@@ -54,14 +54,22 @@ impl IntermediateManager {
aux_path.as_ref()
);
// Remove the intermediate directory on bankground
let aux_pb = PathBuf::from(aux_path.as_ref());
let intm_dir = aux_pb.join(INTERMEDIATE_DIR);
let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4()));
if let Err(err) = tokio::fs::rename(&intm_dir, &deleted_dir).await {
warn!(err; "Failed to rename intermediate directory");
}
tokio::spawn(async move {
if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await {
warn!(err; "Failed to remove intermediate directory");
}
});
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
let store = InstrumentedStore::new(store);
// Remove all garbage intermediate files from previous runs.
if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await {
warn!(err; "Failed to remove garbage intermediate files");
}
Ok(Self {
base_dir: PathBuf::from(aux_path.as_ref()),
store,
@@ -94,6 +102,24 @@ impl IntermediateManager {
.join(sst_file_id.to_string())
.join(format!("fulltext-{column_id}-{uuid}"))
}
/// Prunes the intermediate directory for SST files.
pub(crate) async fn prune_sst_dir(
&self,
region_id: &RegionId,
sst_file_id: &FileId,
) -> Result<()> {
let region_id = region_id.as_u64();
let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/");
self.store.remove_all(&sst_dir).await
}
/// Prunes the intermediate directory for region files.
pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> {
let region_id = region_id.as_u64();
let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/");
self.store.remove_all(&region_dir).await
}
}
/// `IntermediateLocation` produces paths for intermediate files
@@ -268,6 +294,60 @@ mod tests {
.unwrap());
}
#[tokio::test]
async fn test_cleanup_dir() {
let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_");
let region_id = RegionId::new(0, 0);
let sst_file_id = FileId::random();
let region_dir = temp_dir
.path()
.join(INTERMEDIATE_DIR)
.join(region_id.as_u64().to_string());
let sst_dir = region_dir.join(sst_file_id.to_string());
let path = temp_dir.path().to_str().unwrap();
let manager = IntermediateManager::init_fs(path).await.unwrap();
let location = IntermediateLocation::new(&region_id, &sst_file_id);
let temp_file_provider = TempFileProvider::new(location, manager.clone());
let mut f1 = temp_file_provider
.create("sky", "000000000000")
.await
.unwrap();
f1.write_all(b"hello").await.unwrap();
f1.flush().await.unwrap();
f1.close().await.unwrap();
let mut f2 = temp_file_provider
.create("sky", "000000000001")
.await
.unwrap();
f2.write_all(b"world").await.unwrap();
f2.flush().await.unwrap();
f2.close().await.unwrap();
temp_file_provider.cleanup().await.unwrap();
// sst_dir and region_dir still exists
assert!(tokio::fs::try_exists(&sst_dir).await.unwrap());
assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
// sst_dir should be deleted, region_dir still exists
manager
.prune_sst_dir(&region_id, &sst_file_id)
.await
.unwrap();
assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
// sst_dir, region_dir should be deleted
manager.prune_region_dir(&region_id).await.unwrap();
assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
assert!(!tokio::fs::try_exists(&region_dir).await.unwrap());
}
#[test]
fn test_intermediate_location() {
let sst_file_id = FileId::random();

View File

@@ -369,6 +369,9 @@ impl<H> BoundedStager<H> {
/// Note: It can't recover the mapping between puffin files and keys, so TTL
/// is configured to purge the dangling files and directories.
async fn recover(&self) -> Result<()> {
let timer = std::time::Instant::now();
common_telemetry::info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
let mut elems = HashMap::new();
@@ -430,6 +433,7 @@ impl<H> BoundedStager<H> {
}
let mut size = 0;
let num_elems = elems.len();
for (key, value) in elems {
size += value.size();
self.cache.insert(key, value).await;
@@ -440,6 +444,12 @@ impl<H> BoundedStager<H> {
self.cache.run_pending_tasks().await;
info!(
"Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}",
num_elems,
size,
timer.elapsed()
);
Ok(())
}