fix: force recycle region dir after gc duration (#5485)

This commit is contained in:
Yingwen
2025-02-07 16:39:04 +08:00
committed by GitHub
parent f29a1c56e9
commit 6eccadbf73
2 changed files with 21 additions and 11 deletions

View File

@@ -32,7 +32,7 @@ use crate::region::{RegionLeaderState, RegionMapRef};
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes
const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) const MAX_RETRY_TIMES: u64 = 12; // 1 hours (5m * 12)
impl<S> RegionWorkerLoop<S> impl<S> RegionWorkerLoop<S>
where where
@@ -118,12 +118,16 @@ where
} }
} }
/// Background GC task to remove the entire region path once it find there is no /// Background GC task to remove the entire region path once one of the following
/// parquet file left. Returns whether the path is removed. /// conditions is true:
/// - It finds there is no parquet file left.
/// - After `gc_duration`.
/// ///
/// This task will keep running until finished. Any resource captured by it will /// Returns whether the path is removed.
/// not be released before then. Be sure to only pass weak reference if something ///
/// is depended on ref-count mechanism. /// This task will retry on failure and keep running until finished. Any resource
/// captured by it will not be released before then. Be sure to only pass weak reference
/// if something is depended on ref-count mechanism.
async fn later_drop_task( async fn later_drop_task(
region_id: RegionId, region_id: RegionId,
region_path: String, region_path: String,
@@ -131,9 +135,9 @@ async fn later_drop_task(
dropping_regions: RegionMapRef, dropping_regions: RegionMapRef,
gc_duration: Duration, gc_duration: Duration,
) -> bool { ) -> bool {
let mut force = false;
for _ in 0..MAX_RETRY_TIMES { for _ in 0..MAX_RETRY_TIMES {
sleep(gc_duration).await; let result = remove_region_dir_once(&region_path, &object_store, force).await;
let result = remove_region_dir_once(&region_path, &object_store).await;
match result { match result {
Err(err) => { Err(err) => {
warn!( warn!(
@@ -143,11 +147,14 @@ async fn later_drop_task(
} }
Ok(true) => { Ok(true) => {
dropping_regions.remove_region(region_id); dropping_regions.remove_region(region_id);
info!("Region {} is dropped", region_path); info!("Region {} is dropped, force: {}", region_path, force);
return true; return true;
} }
Ok(false) => (), Ok(false) => (),
} }
sleep(gc_duration).await;
// Force recycle after gc duration.
force = true;
} }
warn!( warn!(
@@ -160,9 +167,11 @@ async fn later_drop_task(
// TODO(ruihang): place the marker in a separate dir // TODO(ruihang): place the marker in a separate dir
/// Removes region dir if there is no parquet files, returns whether the directory is removed. /// Removes region dir if there is no parquet files, returns whether the directory is removed.
/// If `force = true`, always removes the dir.
pub(crate) async fn remove_region_dir_once( pub(crate) async fn remove_region_dir_once(
region_path: &str, region_path: &str,
object_store: &ObjectStore, object_store: &ObjectStore,
force: bool,
) -> Result<bool> { ) -> Result<bool> {
// list all files under the given region path to check if there are un-deleted parquet files // list all files under the given region path to check if there are un-deleted parquet files
let mut has_parquet_file = false; let mut has_parquet_file = false;
@@ -173,7 +182,8 @@ pub(crate) async fn remove_region_dir_once(
.await .await
.context(OpenDalSnafu)?; .context(OpenDalSnafu)?;
while let Some(file) = files.try_next().await.context(OpenDalSnafu)? { while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
if file.path().ends_with(".parquet") { if !force && file.path().ends_with(".parquet") {
// If not in force mode, we only remove the region dir if there is no parquet file
has_parquet_file = true; has_parquet_file = true;
break; break;
} else if !file.path().ends_with(DROPPING_MARKER_FILE) { } else if !file.path().ends_with(DROPPING_MARKER_FILE) {

View File

@@ -55,7 +55,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await .await
.context(OpenDalSnafu)? .context(OpenDalSnafu)?
{ {
let result = remove_region_dir_once(&request.region_dir, object_store).await; let result = remove_region_dir_once(&request.region_dir, object_store, true).await;
info!( info!(
"Region {} is dropped, worker: {}, result: {:?}", "Region {} is dropped, worker: {}, result: {:?}",
region_id, self.id, result region_id, self.id, result