From 5e6d2b221eb7337da1a5ac23708ff6d197cd8bf7 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:17:20 +0800 Subject: [PATCH] feat: gc batch delete files (#7733) * feat: batch delete Signed-off-by: discord9 * chore: explict error msg Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * refactor: not fallback when batch failure Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * pcr Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * refactor: batch delete in access layer Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/mito2/src/access_layer.rs | 109 ++++++++++++++++++++++++++-------- src/mito2/src/error.rs | 24 ++++++-- src/mito2/src/gc.rs | 22 +++---- src/mito2/src/sst/file.rs | 92 +++++++++++++++++++++------- 4 files changed, 185 insertions(+), 62 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 049f8e1180..316347f229 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -33,7 +33,9 @@ use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::write_cache::SstUploadRequest; use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig}; -use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; +use crate::error::{ + CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result, +}; use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; use crate::read::{FlatSource, Source}; use crate::region::options::IndexOptions; @@ -212,29 +214,6 @@ impl AccessLayer { self.puffin_manager_factory.build(store, path_provider) } - /// Deletes a SST file (and its index file if it has one) with given file id. - pub(crate) async fn delete_sst( - &self, - region_file_id: &RegionFileId, - index_file_id: &RegionIndexId, - ) -> Result<()> { - let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type); - self.object_store - .delete(&path) - .await - .context(DeleteSstSnafu { - file_id: region_file_id.file_id(), - })?; - - // Delete all versions of the index file. - for version in 0..=index_file_id.version { - let index_id = RegionIndexId::new(*region_file_id, version); - self.delete_index(index_id).await?; - } - - Ok(()) - } - pub(crate) async fn delete_index( &self, index_file_id: RegionIndexId, @@ -253,6 +232,88 @@ impl AccessLayer { Ok(()) } + pub(crate) async fn delete_ssts( + &self, + region_id: RegionId, + file_ids: &[FileId], + ) -> Result<(), crate::error::Error> { + if file_ids.is_empty() { + return Ok(()); + } + + let attempted_files = file_ids.to_vec(); + let paths: Vec<_> = file_ids + .iter() + .map(|file_id| { + location::sst_file_path( + &self.table_dir, + RegionFileId::new(region_id, *file_id), + self.path_type, + ) + }) + .collect(); + + let mut deleter = self + .object_store + .deleter() + .await + .with_context(|_| DeleteSstsSnafu { + region_id, + file_ids: attempted_files.clone(), + })?; + deleter + .delete_iter(paths.iter().map(String::as_str)) + .await + .with_context(|_| DeleteSstsSnafu { + region_id, + file_ids: attempted_files.clone(), + })?; + deleter.close().await.with_context(|_| DeleteSstsSnafu { + region_id, + file_ids: attempted_files, + })?; + + Ok(()) + } + + pub(crate) async fn delete_indexes( + &self, + index_ids: &[RegionIndexId], + ) -> Result<(), crate::error::Error> { + if index_ids.is_empty() { + return Ok(()); + } + + let file_ids: Vec<_> = index_ids + .iter() + .map(|index_id| index_id.file_id()) + .collect(); + let paths: Vec<_> = index_ids + .iter() + .map(|index_id| location::index_file_path(&self.table_dir, *index_id, self.path_type)) + .collect(); + + let mut deleter = self + .object_store + .deleter() + .await + .context(DeleteIndexesSnafu { + file_ids: file_ids.clone(), + })?; + deleter + .delete_iter(paths.iter().map(String::as_str)) + .await + .context(DeleteIndexesSnafu { + file_ids: file_ids.clone(), + })?; + deleter + .close() + .await + .context(DeleteIndexesSnafu { file_ids })?; + + Ok(()) + } + /// Returns the directory of the region in the table. pub fn build_region_dir(&self, region_id: RegionId) -> String { region_dir_from_table_dir(&self.table_dir, region_id, self.path_type) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 23f8185cad..923d8a2713 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -456,9 +456,14 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to delete SST file, file id: {}", file_id))] - DeleteSst { - file_id: FileId, + #[snafu(display( + "Failed to batch delete SST files, region id: {}, file ids: {:?}", + region_id, + file_ids + ))] + DeleteSsts { + region_id: RegionId, + file_ids: Vec, #[snafu(source)] error: object_store::Error, #[snafu(implicit)] @@ -474,6 +479,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to batch delete index files, file ids: {:?}", file_ids))] + DeleteIndexes { + file_ids: Vec, + #[snafu(source)] + error: object_store::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to flush region {}", region_id))] FlushRegion { region_id: RegionId, @@ -1321,7 +1335,9 @@ impl ErrorExt for Error { PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, InvalidSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, - DeleteSst { .. } | DeleteIndex { .. } => StatusCode::StorageUnavailable, + DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => { + StatusCode::StorageUnavailable + } FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index cae67637be..b1468c0055 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -51,7 +51,7 @@ use crate::metrics::{ GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES, }; use crate::region::{MitoRegionRef, RegionRoleState}; -use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index}; +use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_indexes}; use crate::sst::location::{self}; #[cfg(test)] @@ -547,17 +547,17 @@ impl LocalGcWorker { GC_DELETE_FILE_CNT.inc_by(deleted_count); } - for index_id in index_ids { - match delete_index(index_id, &self.access_layer, &self.cache_manager).await { - Ok(()) => { - GC_FILES_DELETED_TOTAL.with_label_values(&["index"]).inc(); - GC_DELETE_FILE_CNT.inc(); - } - Err(err) => { + if !index_ids.is_empty() { + let deleted_count = index_ids.len() as u64; + delete_indexes(&index_ids, &self.access_layer, &self.cache_manager) + .await + .inspect_err(|_| { GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc(); - return Err(err); - } - } + })?; + GC_FILES_DELETED_TOTAL + .with_label_values(&["index"]) + .inc_by(deleted_count); + GC_DELETE_FILE_CNT.inc_by(deleted_count); } Ok(()) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 814c38f8eb..74176831b6 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -554,31 +554,27 @@ pub async fn delete_files( cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id)); } } - let mut deleted_files = Vec::with_capacity(file_ids.len()); + let mut attempted_files = Vec::with_capacity(file_ids.len()); + let mut index_ids = Vec::new(); for (file_id, index_version) in file_ids { let region_file_id = RegionFileId::new(region_id, *file_id); - match access_layer - .delete_sst( - ®ion_file_id, - &RegionIndexId::new(region_file_id, *index_version), - ) - .await - { - Ok(_) => { - deleted_files.push(*file_id); - } - Err(e) => { - error!(e; "Failed to delete sst and index file for {}", region_file_id); - } - } + attempted_files.push(*file_id); + index_ids.extend( + (0..=*index_version).map(|version| RegionIndexId::new(region_file_id, version)), + ); } + access_layer + .delete_ssts(region_id, &attempted_files) + .await?; + access_layer.delete_indexes(&index_ids).await?; + debug!( - "Deleted {} files for region {}: {:?}", - deleted_files.len(), + "Attempted to delete {} files for region {}: {:?}", + attempted_files.len(), region_id, - deleted_files + attempted_files ); for (file_id, index_version) in file_ids { @@ -600,21 +596,71 @@ pub async fn delete_index( access_layer: &AccessLayerRef, cache_manager: &Option, ) -> crate::error::Result<()> { - access_layer.delete_index(region_index_id).await?; + delete_index_and_purge(region_index_id, access_layer, cache_manager).await?; + Ok(()) +} + +pub async fn delete_indexes( + index_ids: &[RegionIndexId], + access_layer: &AccessLayerRef, + cache_manager: &Option, +) -> crate::error::Result<()> { + if index_ids.is_empty() { + return Ok(()); + } + + if let Err(e) = access_layer.delete_indexes(index_ids).await { + error!(e; "Failed to batch delete index files"); + + for index_id in index_ids { + delete_index_and_purge(*index_id, access_layer, cache_manager).await?; + } + + return Ok(()); + } + + purge_indexes(index_ids, access_layer, cache_manager).await; + + Ok(()) +} + +async fn delete_index_and_purge( + index_id: RegionIndexId, + access_layer: &AccessLayerRef, + cache_manager: &Option, +) -> crate::error::Result<()> { + access_layer.delete_index(index_id).await?; purge_index_cache_stager( - region_index_id.region_id(), + index_id.region_id(), true, access_layer, cache_manager, - region_index_id.file_id(), - region_index_id.version, + index_id.file_id(), + index_id.version, ) .await; - Ok(()) } +async fn purge_indexes( + index_ids: &[RegionIndexId], + access_layer: &AccessLayerRef, + cache_manager: &Option, +) { + for index_id in index_ids { + purge_index_cache_stager( + index_id.region_id(), + true, + access_layer, + cache_manager, + index_id.file_id(), + index_id.version, + ) + .await; + } +} + async fn purge_index_cache_stager( region_id: RegionId, delete_index: bool,