feat: gc batch delete files (#7733)

* feat: batch delete

Signed-off-by: discord9 <discord9@163.com>

* chore: explict error msg

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: not fallback when batch failure

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: batch delete in access layer

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-06 15:17:20 +08:00
committed by GitHub
parent d1151b665b
commit 5e6d2b221e
4 changed files with 185 additions and 62 deletions

View File

@@ -33,7 +33,9 @@ use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::write_cache::SstUploadRequest;
use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::error::{
CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::{FlatSource, Source};
use crate::region::options::IndexOptions;
@@ -212,29 +214,6 @@ impl AccessLayer {
self.puffin_manager_factory.build(store, path_provider)
}
/// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(
&self,
region_file_id: &RegionFileId,
index_file_id: &RegionIndexId,
) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
self.object_store
.delete(&path)
.await
.context(DeleteSstSnafu {
file_id: region_file_id.file_id(),
})?;
// Delete all versions of the index file.
for version in 0..=index_file_id.version {
let index_id = RegionIndexId::new(*region_file_id, version);
self.delete_index(index_id).await?;
}
Ok(())
}
pub(crate) async fn delete_index(
&self,
index_file_id: RegionIndexId,
@@ -253,6 +232,88 @@ impl AccessLayer {
Ok(())
}
pub(crate) async fn delete_ssts(
&self,
region_id: RegionId,
file_ids: &[FileId],
) -> Result<(), crate::error::Error> {
if file_ids.is_empty() {
return Ok(());
}
let attempted_files = file_ids.to_vec();
let paths: Vec<_> = file_ids
.iter()
.map(|file_id| {
location::sst_file_path(
&self.table_dir,
RegionFileId::new(region_id, *file_id),
self.path_type,
)
})
.collect();
let mut deleter = self
.object_store
.deleter()
.await
.with_context(|_| DeleteSstsSnafu {
region_id,
file_ids: attempted_files.clone(),
})?;
deleter
.delete_iter(paths.iter().map(String::as_str))
.await
.with_context(|_| DeleteSstsSnafu {
region_id,
file_ids: attempted_files.clone(),
})?;
deleter.close().await.with_context(|_| DeleteSstsSnafu {
region_id,
file_ids: attempted_files,
})?;
Ok(())
}
pub(crate) async fn delete_indexes(
&self,
index_ids: &[RegionIndexId],
) -> Result<(), crate::error::Error> {
if index_ids.is_empty() {
return Ok(());
}
let file_ids: Vec<_> = index_ids
.iter()
.map(|index_id| index_id.file_id())
.collect();
let paths: Vec<_> = index_ids
.iter()
.map(|index_id| location::index_file_path(&self.table_dir, *index_id, self.path_type))
.collect();
let mut deleter = self
.object_store
.deleter()
.await
.context(DeleteIndexesSnafu {
file_ids: file_ids.clone(),
})?;
deleter
.delete_iter(paths.iter().map(String::as_str))
.await
.context(DeleteIndexesSnafu {
file_ids: file_ids.clone(),
})?;
deleter
.close()
.await
.context(DeleteIndexesSnafu { file_ids })?;
Ok(())
}
/// Returns the directory of the region in the table.
pub fn build_region_dir(&self, region_id: RegionId) -> String {
region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)

View File

@@ -456,9 +456,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to delete SST file, file id: {}", file_id))]
DeleteSst {
file_id: FileId,
#[snafu(display(
"Failed to batch delete SST files, region id: {}, file ids: {:?}",
region_id,
file_ids
))]
DeleteSsts {
region_id: RegionId,
file_ids: Vec<FileId>,
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
@@ -474,6 +479,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to batch delete index files, file ids: {:?}", file_ids))]
DeleteIndexes {
file_ids: Vec<FileId>,
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to flush region {}", region_id))]
FlushRegion {
region_id: RegionId,
@@ -1321,7 +1335,9 @@ impl ErrorExt for Error {
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
InvalidSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
DeleteSst { .. } | DeleteIndex { .. } => StatusCode::StorageUnavailable,
DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => {
StatusCode::StorageUnavailable
}
FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,

View File

@@ -51,7 +51,7 @@ use crate::metrics::{
GC_ORPHANED_INDEX_FILES, GC_RUNS_TOTAL, GC_SKIPPED_UNPARSABLE_FILES,
};
use crate::region::{MitoRegionRef, RegionRoleState};
use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_indexes};
use crate::sst::location::{self};
#[cfg(test)]
@@ -547,17 +547,17 @@ impl LocalGcWorker {
GC_DELETE_FILE_CNT.inc_by(deleted_count);
}
for index_id in index_ids {
match delete_index(index_id, &self.access_layer, &self.cache_manager).await {
Ok(()) => {
GC_FILES_DELETED_TOTAL.with_label_values(&["index"]).inc();
GC_DELETE_FILE_CNT.inc();
}
Err(err) => {
if !index_ids.is_empty() {
let deleted_count = index_ids.len() as u64;
delete_indexes(&index_ids, &self.access_layer, &self.cache_manager)
.await
.inspect_err(|_| {
GC_ERRORS_TOTAL.with_label_values(&["delete_failed"]).inc();
return Err(err);
}
}
})?;
GC_FILES_DELETED_TOTAL
.with_label_values(&["index"])
.inc_by(deleted_count);
GC_DELETE_FILE_CNT.inc_by(deleted_count);
}
Ok(())

View File

@@ -554,31 +554,27 @@ pub async fn delete_files(
cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
}
}
let mut deleted_files = Vec::with_capacity(file_ids.len());
let mut attempted_files = Vec::with_capacity(file_ids.len());
let mut index_ids = Vec::new();
for (file_id, index_version) in file_ids {
let region_file_id = RegionFileId::new(region_id, *file_id);
match access_layer
.delete_sst(
&region_file_id,
&RegionIndexId::new(region_file_id, *index_version),
)
.await
{
Ok(_) => {
deleted_files.push(*file_id);
}
Err(e) => {
error!(e; "Failed to delete sst and index file for {}", region_file_id);
}
}
attempted_files.push(*file_id);
index_ids.extend(
(0..=*index_version).map(|version| RegionIndexId::new(region_file_id, version)),
);
}
access_layer
.delete_ssts(region_id, &attempted_files)
.await?;
access_layer.delete_indexes(&index_ids).await?;
debug!(
"Deleted {} files for region {}: {:?}",
deleted_files.len(),
"Attempted to delete {} files for region {}: {:?}",
attempted_files.len(),
region_id,
deleted_files
attempted_files
);
for (file_id, index_version) in file_ids {
@@ -600,21 +596,71 @@ pub async fn delete_index(
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
access_layer.delete_index(region_index_id).await?;
delete_index_and_purge(region_index_id, access_layer, cache_manager).await?;
Ok(())
}
pub async fn delete_indexes(
index_ids: &[RegionIndexId],
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
if index_ids.is_empty() {
return Ok(());
}
if let Err(e) = access_layer.delete_indexes(index_ids).await {
error!(e; "Failed to batch delete index files");
for index_id in index_ids {
delete_index_and_purge(*index_id, access_layer, cache_manager).await?;
}
return Ok(());
}
purge_indexes(index_ids, access_layer, cache_manager).await;
Ok(())
}
async fn delete_index_and_purge(
index_id: RegionIndexId,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
access_layer.delete_index(index_id).await?;
purge_index_cache_stager(
region_index_id.region_id(),
index_id.region_id(),
true,
access_layer,
cache_manager,
region_index_id.file_id(),
region_index_id.version,
index_id.file_id(),
index_id.version,
)
.await;
Ok(())
}
async fn purge_indexes(
index_ids: &[RegionIndexId],
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) {
for index_id in index_ids {
purge_index_cache_stager(
index_id.region_id(),
true,
access_layer,
cache_manager,
index_id.file_id(),
index_id.version,
)
.await;
}
}
async fn purge_index_cache_stager(
region_id: RegionId,
delete_index: bool,