diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d3503334a4..093e5bc23d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -488,6 +488,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to batch delete index files, file ids: {:?}", file_ids))] + DeleteIndexes { + file_ids: Vec, + #[snafu(source)] + error: object_store::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to flush region {}", region_id))] FlushRegion { region_id: RegionId, @@ -1335,7 +1344,7 @@ impl ErrorExt for Error { PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, InvalidSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, - DeleteSst { .. } | DeleteSsts { .. } | DeleteIndex { .. } => { + DeleteSst { .. } | DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => { StatusCode::StorageUnavailable } FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(), diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 7e576ad731..3af31351d7 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -35,7 +35,7 @@ use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileType, IndexKey}; -use crate::error::{DeleteSstSnafu, DeleteSstsSnafu, OpenDalSnafu}; +use crate::error::{DeleteIndexesSnafu, DeleteSstSnafu, DeleteSstsSnafu}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location; @@ -588,43 +588,19 @@ pub async fn delete_files( { error!(e; "Failed to batch delete sst and index files for region {}, now delete separately", region_id); - for (file_id, index_version) in file_ids { - let region_file_id = RegionFileId::new(region_id, *file_id); - let sst_path = location::sst_file_path(table_dir, region_file_id, path_type); - access_layer - .object_store() - .delete(&sst_path) - .await - .context(DeleteSstSnafu { file_id: *file_id })?; - - for version in 0..=*index_version { - let index_id = RegionIndexId::new(region_file_id, version); - access_layer.delete_index(index_id).await?; - } - } + delete_files_individually(region_id, file_ids, table_dir, path_type, access_layer) + .await?; + } else { + deleter.close().await.context(DeleteSstsSnafu { + region_id, + file_ids: attempted_files.clone(), + })?; } - - deleter.close().await.context(DeleteSstsSnafu { - region_id, - file_ids: attempted_files.clone(), - })?; } Err(e) => { error!(e; "Failed to create object store deleter for region {}, fallback to single delete", region_id); - for (file_id, index_version) in file_ids { - let region_file_id = RegionFileId::new(region_id, *file_id); - let sst_path = location::sst_file_path(table_dir, region_file_id, path_type); - access_layer - .object_store() - .delete(&sst_path) - .await - .context(DeleteSstSnafu { file_id: *file_id })?; - - for version in 0..=*index_version { - let index_id = RegionIndexId::new(region_file_id, version); - access_layer.delete_index(index_id).await?; - } - } + delete_files_individually(region_id, file_ids, table_dir, path_type, access_layer) + .await?; } } @@ -677,23 +653,40 @@ pub async fn delete_indexes( match access_layer.object_store().deleter().await { Ok(mut deleter) => { - if let Err(e) = deleter.delete_iter(paths.iter().map(String::as_str)).await { + let file_ids = index_ids + .iter() + .map(|index_id| index_id.file_id()) + .collect_vec(); + if let Err(e) = deleter + .delete_iter(paths.iter().map(String::as_str)) + .await + .context(DeleteIndexesSnafu { + file_ids: file_ids.clone(), + }) + { error!(e; "Failed to batch delete index files"); for index_id in index_ids { delete_index_and_purge(*index_id, access_layer, cache_manager).await?; } - if let Err(e) = deleter.close().await { + if let Err(e) = deleter + .close() + .await + .context(DeleteIndexesSnafu { file_ids }) + { error!(e; "Failed to close object store deleter after fallback index deletion"); } return Ok(()); + } else { + deleter + .close() + .await + .context(DeleteIndexesSnafu { file_ids })?; + + purge_indexes(index_ids, access_layer, cache_manager).await; } - - deleter.close().await.context(OpenDalSnafu)?; - - purge_indexes(index_ids, access_layer, cache_manager).await; } Err(e) => { error!(e; "Failed to create object store deleter for index files"); @@ -707,6 +700,31 @@ pub async fn delete_indexes( Ok(()) } +async fn delete_files_individually( + region_id: RegionId, + file_ids: &[(FileId, u64)], + table_dir: &str, + path_type: PathType, + access_layer: &AccessLayerRef, +) -> crate::error::Result<()> { + for (file_id, index_version) in file_ids { + let region_file_id = RegionFileId::new(region_id, *file_id); + let sst_path = location::sst_file_path(table_dir, region_file_id, path_type); + access_layer + .object_store() + .delete(&sst_path) + .await + .context(DeleteSstSnafu { file_id: *file_id })?; + + for version in 0..=*index_version { + let index_id = RegionIndexId::new(region_file_id, version); + access_layer.delete_index(index_id).await?; + } + } + + Ok(()) +} + async fn delete_index_and_purge( index_id: RegionIndexId, access_layer: &AccessLayerRef,