diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 23f8185cad..d3503334a4 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -465,6 +465,20 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to batch delete SST files, region id: {}, file ids: {:?}", + region_id, + file_ids + ))] + DeleteSsts { + region_id: RegionId, + file_ids: Vec, + #[snafu(source)] + error: object_store::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to delete index file, file id: {}", file_id))] DeleteIndex { file_id: FileId, @@ -1321,7 +1335,9 @@ impl ErrorExt for Error { PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, InvalidSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, - DeleteSst { .. } | DeleteIndex { .. } => StatusCode::StorageUnavailable, + DeleteSst { .. } | DeleteSsts { .. } | DeleteIndex { .. } => { + StatusCode::StorageUnavailable + } FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 4dd0d878c9..7e576ad731 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error}; use common_time::Timestamp; +use itertools::Itertools; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; @@ -34,7 +35,7 @@ use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileType, IndexKey}; -use crate::error::OpenDalSnafu; +use crate::error::{DeleteSstSnafu, DeleteSstsSnafu, OpenDalSnafu}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location; @@ -577,25 +578,51 @@ pub async fn delete_files( match access_layer.object_store().deleter().await { Ok(mut deleter) => { - if let Err(e) = deleter.delete_iter(paths.iter().map(String::as_str)).await { + if let Err(e) = deleter + .delete_iter(paths.iter().map(String::as_str)) + .await + .context(DeleteSstsSnafu { + region_id, + file_ids: file_ids.iter().map(|f| f.0).collect_vec(), + }) + { error!(e; "Failed to batch delete sst and index files for region {}, now delete separately", region_id); - for path in &paths { - if let Err(e) = access_layer.object_store().delete(path).await { - error!(e; "Failed to delete sst/index file path: {}", path); + for (file_id, index_version) in file_ids { + let region_file_id = RegionFileId::new(region_id, *file_id); + let sst_path = location::sst_file_path(table_dir, region_file_id, path_type); + access_layer + .object_store() + .delete(&sst_path) + .await + .context(DeleteSstSnafu { file_id: *file_id })?; + + for version in 0..=*index_version { + let index_id = RegionIndexId::new(region_file_id, version); + access_layer.delete_index(index_id).await?; } } } - if let Err(e) = deleter.close().await { - error!(e; "Failed to close object store deleter for region {}", region_id); - } + deleter.close().await.context(DeleteSstsSnafu { + region_id, + file_ids: attempted_files.clone(), + })?; } Err(e) => { - error!(e; "Failed to create object store deleter for region {}", region_id); - for path in &paths { - if let Err(e) = access_layer.object_store().delete(path).await { - error!(e; "Failed to delete sst/index file path: {}", path); + error!(e; "Failed to create object store deleter for region {}, fallback to single delete", region_id); + for (file_id, index_version) in file_ids { + let region_file_id = RegionFileId::new(region_id, *file_id); + let sst_path = location::sst_file_path(table_dir, region_file_id, path_type); + access_layer + .object_store() + .delete(&sst_path) + .await + .context(DeleteSstSnafu { file_id: *file_id })?; + + for version in 0..=*index_version { + let index_id = RegionIndexId::new(region_file_id, version); + access_layer.delete_index(index_id).await?; } } }