fix: move prune_region_dir to region drop (#6891)

* fix: move prune_region_dir to region drop

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Zhenchi
2025-09-03 15:11:05 +08:00
committed by Weny Xu
parent a0a2b40cbe
commit eb52129a91
3 changed files with 8 additions and 3 deletions

View File

@@ -137,12 +137,13 @@ impl FilePurger for LocalFilePurger {
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
file_meta.file_id(), file_meta.region_id);
}
let file_id = file_meta.file_id();
if let Err(e) = sst_layer
.intermediate_manager()
.prune_region_dir(&file_meta.region_id)
.prune_sst_dir(&file_id.region_id(), &file_id.file_id())
.await
{
error!(e; "Failed to prune intermediate region directory, region_id: {}", file_meta.region_id);
error!(e; "Failed to prune intermediate sst directory, region_id: {}, file_id: {}", file_id.region_id(), file_id.file_id());
}
})) {
error!(e; "Failed to schedule the file purge request");

View File

@@ -99,6 +99,7 @@ where
let object_store = region.access_layer.object_store().clone();
let dropping_regions = self.dropping_regions.clone();
let listener = self.listener.clone();
let intm_manager = self.intermediate_manager.clone();
common_runtime::spawn_global(async move {
let gc_duration = listener
.on_later_drop_begin(region_id)
@@ -111,6 +112,9 @@ where
gc_duration,
)
.await;
if let Err(err) = intm_manager.prune_region_dir(&region_id).await {
warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
}
listener.on_later_drop_end(region_id, removed);
});

View File

@@ -370,7 +370,7 @@ impl<H> BoundedStager<H> {
/// is configured to purge the dangling files and directories.
async fn recover(&self) -> Result<()> {
let timer = std::time::Instant::now();
common_telemetry::info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;