chore: per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-02-28 14:52:51 +08:00
parent e784e74ecd
commit d7a873296f
2 changed files with 68 additions and 41 deletions

View File

@@ -488,6 +488,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to batch delete index files, file ids: {:?}", file_ids))]
DeleteIndexes {
file_ids: Vec<FileId>,
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to flush region {}", region_id))]
FlushRegion {
region_id: RegionId,
@@ -1335,7 +1344,7 @@ impl ErrorExt for Error {
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
InvalidSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
DeleteSst { .. } | DeleteSsts { .. } | DeleteIndex { .. } => {
DeleteSst { .. } | DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => {
StatusCode::StorageUnavailable
}
FlushRegion { source, .. } | BuildIndexAsync { source, .. } => source.status_code(),

View File

@@ -35,7 +35,7 @@ use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::error::{DeleteSstSnafu, DeleteSstsSnafu, OpenDalSnafu};
use crate::error::{DeleteIndexesSnafu, DeleteSstSnafu, DeleteSstsSnafu};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::location;
@@ -588,43 +588,19 @@ pub async fn delete_files(
{
error!(e; "Failed to batch delete sst and index files for region {}, now delete separately", region_id);
for (file_id, index_version) in file_ids {
let region_file_id = RegionFileId::new(region_id, *file_id);
let sst_path = location::sst_file_path(table_dir, region_file_id, path_type);
access_layer
.object_store()
.delete(&sst_path)
.await
.context(DeleteSstSnafu { file_id: *file_id })?;
for version in 0..=*index_version {
let index_id = RegionIndexId::new(region_file_id, version);
access_layer.delete_index(index_id).await?;
}
}
delete_files_individually(region_id, file_ids, table_dir, path_type, access_layer)
.await?;
} else {
deleter.close().await.context(DeleteSstsSnafu {
region_id,
file_ids: attempted_files.clone(),
})?;
}
deleter.close().await.context(DeleteSstsSnafu {
region_id,
file_ids: attempted_files.clone(),
})?;
}
Err(e) => {
error!(e; "Failed to create object store deleter for region {}, fallback to single delete", region_id);
for (file_id, index_version) in file_ids {
let region_file_id = RegionFileId::new(region_id, *file_id);
let sst_path = location::sst_file_path(table_dir, region_file_id, path_type);
access_layer
.object_store()
.delete(&sst_path)
.await
.context(DeleteSstSnafu { file_id: *file_id })?;
for version in 0..=*index_version {
let index_id = RegionIndexId::new(region_file_id, version);
access_layer.delete_index(index_id).await?;
}
}
delete_files_individually(region_id, file_ids, table_dir, path_type, access_layer)
.await?;
}
}
@@ -677,23 +653,40 @@ pub async fn delete_indexes(
match access_layer.object_store().deleter().await {
Ok(mut deleter) => {
if let Err(e) = deleter.delete_iter(paths.iter().map(String::as_str)).await {
let file_ids = index_ids
.iter()
.map(|index_id| index_id.file_id())
.collect_vec();
if let Err(e) = deleter
.delete_iter(paths.iter().map(String::as_str))
.await
.context(DeleteIndexesSnafu {
file_ids: file_ids.clone(),
})
{
error!(e; "Failed to batch delete index files");
for index_id in index_ids {
delete_index_and_purge(*index_id, access_layer, cache_manager).await?;
}
if let Err(e) = deleter.close().await {
if let Err(e) = deleter
.close()
.await
.context(DeleteIndexesSnafu { file_ids })
{
error!(e; "Failed to close object store deleter after fallback index deletion");
}
return Ok(());
} else {
deleter
.close()
.await
.context(DeleteIndexesSnafu { file_ids })?;
purge_indexes(index_ids, access_layer, cache_manager).await;
}
deleter.close().await.context(OpenDalSnafu)?;
purge_indexes(index_ids, access_layer, cache_manager).await;
}
Err(e) => {
error!(e; "Failed to create object store deleter for index files");
@@ -707,6 +700,31 @@ pub async fn delete_indexes(
Ok(())
}
async fn delete_files_individually(
region_id: RegionId,
file_ids: &[(FileId, u64)],
table_dir: &str,
path_type: PathType,
access_layer: &AccessLayerRef,
) -> crate::error::Result<()> {
for (file_id, index_version) in file_ids {
let region_file_id = RegionFileId::new(region_id, *file_id);
let sst_path = location::sst_file_path(table_dir, region_file_id, path_type);
access_layer
.object_store()
.delete(&sst_path)
.await
.context(DeleteSstSnafu { file_id: *file_id })?;
for version in 0..=*index_version {
let index_id = RegionIndexId::new(region_file_id, version);
access_layer.delete_index(index_id).await?;
}
}
Ok(())
}
async fn delete_index_and_purge(
index_id: RegionIndexId,
access_layer: &AccessLayerRef,