mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 04:20:39 +00:00
refactor: not fallback when batch failure
Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -456,15 +456,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to delete SST file, file id: {}", file_id))]
|
||||
DeleteSst {
|
||||
file_id: FileId,
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to batch delete SST files, region id: {}, file ids: {:?}",
|
||||
region_id,
|
||||
@@ -1344,7 +1335,7 @@ impl ErrorExt for Error {
|
||||
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
|
||||
InvalidSender { .. } => StatusCode::InvalidArguments,
|
||||
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
|
||||
DeleteSst { .. } | DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => {
|
||||
DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => {
|
||||
StatusCode::StorageUnavailable
|
||||
}
|
||||
FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(),
|
||||
|
||||
@@ -35,7 +35,7 @@ use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::error::{DeleteIndexesSnafu, DeleteSstSnafu, DeleteSstsSnafu};
|
||||
use crate::error::{DeleteIndexesSnafu, DeleteSstsSnafu};
|
||||
use crate::sst::file_purger::FilePurgerRef;
|
||||
use crate::sst::location;
|
||||
|
||||
@@ -576,33 +576,25 @@ pub async fn delete_files(
|
||||
}
|
||||
}
|
||||
|
||||
match access_layer.object_store().deleter().await {
|
||||
Ok(mut deleter) => {
|
||||
if let Err(e) = deleter
|
||||
.delete_iter(paths.iter().map(String::as_str))
|
||||
.await
|
||||
.context(DeleteSstsSnafu {
|
||||
region_id,
|
||||
file_ids: file_ids.iter().map(|f| f.0).collect_vec(),
|
||||
})
|
||||
{
|
||||
error!(e; "Failed to batch delete sst and index files for region {}, now delete separately", region_id);
|
||||
|
||||
delete_files_individually(region_id, file_ids, table_dir, path_type, access_layer)
|
||||
.await?;
|
||||
} else {
|
||||
deleter.close().await.context(DeleteSstsSnafu {
|
||||
region_id,
|
||||
file_ids: attempted_files.clone(),
|
||||
})?;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Failed to create object store deleter for region {}, fallback to single delete", region_id);
|
||||
delete_files_individually(region_id, file_ids, table_dir, path_type, access_layer)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
let mut deleter = access_layer
|
||||
.object_store()
|
||||
.deleter()
|
||||
.await
|
||||
.context(DeleteSstsSnafu {
|
||||
region_id,
|
||||
file_ids: attempted_files.clone(),
|
||||
})?;
|
||||
deleter
|
||||
.delete_iter(paths.iter().map(String::as_str))
|
||||
.await
|
||||
.context(DeleteSstsSnafu {
|
||||
region_id,
|
||||
file_ids: file_ids.iter().map(|f| f.0).collect_vec(),
|
||||
})?;
|
||||
deleter.close().await.context(DeleteSstsSnafu {
|
||||
region_id,
|
||||
file_ids: attempted_files.clone(),
|
||||
})?;
|
||||
|
||||
debug!(
|
||||
"Attempted to delete {} files for region {}: {:?}",
|
||||
@@ -700,31 +692,6 @@ pub async fn delete_indexes(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_files_individually(
|
||||
region_id: RegionId,
|
||||
file_ids: &[(FileId, u64)],
|
||||
table_dir: &str,
|
||||
path_type: PathType,
|
||||
access_layer: &AccessLayerRef,
|
||||
) -> crate::error::Result<()> {
|
||||
for (file_id, index_version) in file_ids {
|
||||
let region_file_id = RegionFileId::new(region_id, *file_id);
|
||||
let sst_path = location::sst_file_path(table_dir, region_file_id, path_type);
|
||||
access_layer
|
||||
.object_store()
|
||||
.delete(&sst_path)
|
||||
.await
|
||||
.context(DeleteSstSnafu { file_id: *file_id })?;
|
||||
|
||||
for version in 0..=*index_version {
|
||||
let index_id = RegionIndexId::new(region_file_id, version);
|
||||
access_layer.delete_index(index_id).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_index_and_purge(
|
||||
index_id: RegionIndexId,
|
||||
access_layer: &AccessLayerRef,
|
||||
|
||||
Reference in New Issue
Block a user