mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 04:50:38 +00:00
@@ -465,6 +465,20 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to batch delete SST files, region id: {}, file ids: {:?}",
|
||||
region_id,
|
||||
file_ids
|
||||
))]
|
||||
DeleteSsts {
|
||||
region_id: RegionId,
|
||||
file_ids: Vec<FileId>,
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to delete index file, file id: {}", file_id))]
|
||||
DeleteIndex {
|
||||
file_id: FileId,
|
||||
@@ -1321,7 +1335,9 @@ impl ErrorExt for Error {
|
||||
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
|
||||
InvalidSender { .. } => StatusCode::InvalidArguments,
|
||||
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
|
||||
DeleteSst { .. } | DeleteIndex { .. } => StatusCode::StorageUnavailable,
|
||||
DeleteSst { .. } | DeleteSsts { .. } | DeleteIndex { .. } => {
|
||||
StatusCode::StorageUnavailable
|
||||
}
|
||||
FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(),
|
||||
RegionDropped { .. } => StatusCode::Cancelled,
|
||||
RegionClosed { .. } => StatusCode::Cancelled,
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{debug, error};
|
||||
use common_time::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use partition::expr::PartitionExpr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use smallvec::SmallVec;
|
||||
@@ -34,7 +35,7 @@ use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::error::OpenDalSnafu;
|
||||
use crate::error::{DeleteSstSnafu, DeleteSstsSnafu, OpenDalSnafu};
|
||||
use crate::sst::file_purger::FilePurgerRef;
|
||||
use crate::sst::location;
|
||||
|
||||
@@ -577,25 +578,51 @@ pub async fn delete_files(
|
||||
|
||||
match access_layer.object_store().deleter().await {
|
||||
Ok(mut deleter) => {
|
||||
if let Err(e) = deleter.delete_iter(paths.iter().map(String::as_str)).await {
|
||||
if let Err(e) = deleter
|
||||
.delete_iter(paths.iter().map(String::as_str))
|
||||
.await
|
||||
.context(DeleteSstsSnafu {
|
||||
region_id,
|
||||
file_ids: file_ids.iter().map(|f| f.0).collect_vec(),
|
||||
})
|
||||
{
|
||||
error!(e; "Failed to batch delete sst and index files for region {}, now delete separately", region_id);
|
||||
|
||||
for path in &paths {
|
||||
if let Err(e) = access_layer.object_store().delete(path).await {
|
||||
error!(e; "Failed to delete sst/index file path: {}", path);
|
||||
for (file_id, index_version) in file_ids {
|
||||
let region_file_id = RegionFileId::new(region_id, *file_id);
|
||||
let sst_path = location::sst_file_path(table_dir, region_file_id, path_type);
|
||||
access_layer
|
||||
.object_store()
|
||||
.delete(&sst_path)
|
||||
.await
|
||||
.context(DeleteSstSnafu { file_id: *file_id })?;
|
||||
|
||||
for version in 0..=*index_version {
|
||||
let index_id = RegionIndexId::new(region_file_id, version);
|
||||
access_layer.delete_index(index_id).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = deleter.close().await {
|
||||
error!(e; "Failed to close object store deleter for region {}", region_id);
|
||||
}
|
||||
deleter.close().await.context(DeleteSstsSnafu {
|
||||
region_id,
|
||||
file_ids: attempted_files.clone(),
|
||||
})?;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Failed to create object store deleter for region {}", region_id);
|
||||
for path in &paths {
|
||||
if let Err(e) = access_layer.object_store().delete(path).await {
|
||||
error!(e; "Failed to delete sst/index file path: {}", path);
|
||||
error!(e; "Failed to create object store deleter for region {}, fallback to single delete", region_id);
|
||||
for (file_id, index_version) in file_ids {
|
||||
let region_file_id = RegionFileId::new(region_id, *file_id);
|
||||
let sst_path = location::sst_file_path(table_dir, region_file_id, path_type);
|
||||
access_layer
|
||||
.object_store()
|
||||
.delete(&sst_path)
|
||||
.await
|
||||
.context(DeleteSstSnafu { file_id: *file_id })?;
|
||||
|
||||
for version in 0..=*index_version {
|
||||
let index_id = RegionIndexId::new(region_file_id, version);
|
||||
access_layer.delete_index(index_id).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user