fix: update manifest state before deleting delta files (#8001)

* fix: update state before deleting deltas

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update log level

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-21 19:14:13 +08:00
committed by GitHub
parent 555741a9f1
commit 449243a175
5 changed files with 178 additions and 21 deletions

View File

@@ -16,7 +16,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use store_api::storage::RegionId;
use store_api::{MIN_VERSION, ManifestVersion};
@@ -69,14 +69,18 @@ impl Inner {
return;
}
if let Err(e) = self.manifest_store.delete_until(version, true).await {
error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
return;
}
// Advance the in-memory checkpoint version as soon as the checkpoint file
// is durable. If the subsequent delta cleanup fails, the on-disk state is
// still consistent (the `_last_checkpoint` metadata points at the new
// checkpoint) and `maybe_do_checkpoint` must not re-checkpoint the same
// range.
self.last_checkpoint_version
.store(version, Ordering::Relaxed);
if let Err(e) = self.manifest_store.delete_until(version, true).await {
warn!(e; "Failed to delete manifest actions until version {} for region {}, leftover files will be ignored on recovery", version, region_id);
}
info!(
"Checkpoint for region {} success, version: {}",
region_id, version

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use common_datasource::compression::CompressionType;
use common_telemetry::{debug, info};
@@ -255,6 +256,7 @@ impl RegionManifestManager {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["open"])
.start_timer();
let open_start = Instant::now();
// construct storage
let mut store = ManifestObjectStore::new(
@@ -290,8 +292,15 @@ impl RegionManifestManager {
RegionManifestBuilder::default()
};
let replay_start_version = version;
info!(
"Replaying region manifest {} from version {}, last checkpoint version: {}",
options.manifest_dir, replay_start_version, last_checkpoint_version,
);
// apply actions from storage
let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
let replayed_deltas = manifests.len();
for (manifest_version, raw_action_list) in manifests {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
@@ -334,6 +343,7 @@ impl RegionManifestManager {
);
let version = manifest.manifest_version;
let manifest_dir = options.manifest_dir.clone();
let checkpointer = Checkpointer::new(
manifest.metadata.region_id,
options,
@@ -344,6 +354,16 @@ impl RegionManifestManager {
manifest
.removed_files
.update_file_removed_cnt_to_stats(stats);
info!(
"Opened region manifest {}, region_id: {}, start_version: {}, last_checkpoint_version: {}, replayed_deltas: {}, final_version: {}, cost: {:?}",
manifest_dir,
manifest.metadata.region_id,
replay_start_version,
last_checkpoint_version,
replayed_deltas,
version,
open_start.elapsed(),
);
Ok(Some(Self {
store,
last_version: manifest_version,

View File

@@ -24,18 +24,22 @@ use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use common_datasource::compression::CompressionType;
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use crc32fast::Hasher;
use lazy_static::lazy_static;
use object_store::util::join_dir;
use object_store::{Lister, ObjectStore, util};
use regex::Regex;
use snafu::{ResultExt, ensure};
#[cfg(test)]
use snafu::ResultExt;
use snafu::ensure;
use store_api::ManifestVersion;
use store_api::storage::RegionId;
use crate::cache::manifest_cache::ManifestCache;
use crate::error::{ChecksumMismatchSnafu, OpenDalSnafu, Result};
#[cfg(test)]
use crate::error::OpenDalSnafu;
use crate::error::{ChecksumMismatchSnafu, Result};
use crate::manifest::storage::checkpoint::CheckpointStorage;
use crate::manifest::storage::delta::DeltaStorage;
use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker};
@@ -287,11 +291,11 @@ impl ManifestObjectStore {
.iter()
.map(|(e, _, _)| e.path().to_string())
.collect::<Vec<_>>();
let ret = paths.len();
let total = paths.len();
debug!(
"Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
ret, self.path, end, checkpoint_version, paths,
total, self.path, end, checkpoint_version, paths,
);
// Remove from cache first
@@ -299,13 +303,37 @@ impl ManifestObjectStore {
remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await;
}
self.object_store
.delete_iter(paths)
.await
.context(OpenDalSnafu)?;
// Try batch delete first. On failure, fall back to per-file deletes.
// This is a workaround for S3-compatible object stores that do not support batch delete. See issue #7986.
let mut succeeded = vec![false; del_entries.len()];
match self.object_store.delete_iter(paths.clone()).await {
Ok(()) => succeeded.fill(true),
Err(batch_err) => {
warn!(
batch_err;
"Batch delete failed for manifest path {}, falling back to per-file delete for {} paths",
self.path, total,
);
for (i, path) in paths.iter().enumerate() {
if let Err(e) = self.object_store.delete(path).await {
warn!(
e;
"Failed to delete manifest file {} under {}, aborting fallback, {} files will be retried on next checkpoint",
path, self.path, total - i,
);
break;
}
succeeded[i] = true;
}
}
}
// delete manifest sizes
for (_, is_checkpoint, version) in &del_entries {
let mut deleted = 0usize;
for (i, (_, is_checkpoint, version)) in del_entries.iter().enumerate() {
if !succeeded[i] {
continue;
}
deleted += 1;
if *is_checkpoint {
self.size_tracker
.remove(&size_tracker::FileKey::Checkpoint(*version));
@@ -315,7 +343,7 @@ impl ManifestObjectStore {
}
}
Ok(ret)
Ok(deleted)
}
/// Save the delta manifest file.

View File

@@ -14,9 +14,13 @@
use std::assert_matches;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use common_datasource::compression::CompressionType;
use object_store::layers::mock::{
Error as MockError, ErrorKind, MockLayerBuilder, OpDelete, Result as MockResult, oio,
};
use store_api::storage::{FileId, RegionId};
use strum::IntoEnumIterator;
@@ -26,6 +30,7 @@ use crate::manifest::action::{
};
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::storage::checkpoint::CheckpointMetadata;
use crate::manifest::storage::is_delta_file;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::sst::file::FileMeta;
use crate::test_util::TestEnv;
@@ -491,3 +496,96 @@ async fn test_checkpoint_bypass_in_staging_mode() {
// Checkpoint should include all 16 actions (15 from staging + 1 from writable)
assert_eq!(last_version, 16);
}
/// A deleter that fails on `flush`, simulating the S3 batch-delete failure
/// described in issue #7986.
struct FailingDeleter {
inner: oio::Deleter,
flush_calls: Arc<AtomicUsize>,
}
impl oio::Delete for FailingDeleter {
fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> {
self.inner.delete(path, args)
}
async fn flush(&mut self) -> MockResult<usize> {
self.flush_calls.fetch_add(1, Ordering::Relaxed);
Err(MockError::new(
ErrorKind::Unexpected,
"mock manifest delete flush failure",
))
}
}
#[tokio::test]
async fn checkpoint_advances_and_recovery_works_when_delete_fails() {
common_telemetry::init_default_ut_logging();
let flush_calls = Arc::new(AtomicUsize::new(0));
let factory_flush_calls = flush_calls.clone();
let mock_layer = MockLayerBuilder::default()
.deleter_factory(Arc::new(move |inner| {
Box::new(FailingDeleter {
inner,
flush_calls: factory_flush_calls.clone(),
})
}))
.build()
.unwrap();
let env = TestEnv::new().await.with_mock_layer(mock_layer);
let metadata = Arc::new(basic_region_metadata());
let mut manager = env
.create_manifest_manager(CompressionType::Uncompressed, 1, Some(metadata))
.await
.unwrap()
.unwrap();
for _ in 0..10 {
manager.update(nop_action(), false).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
// The checkpointer must have attempted to delete stale files at least once.
assert!(flush_calls.load(Ordering::Relaxed) > 0);
// Despite delete failures, the in-memory checkpoint marker advances so
// subsequent `maybe_do_checkpoint` calls compute correct ranges.
assert_eq!(manager.checkpointer().last_checkpoint_version(), 10);
// And the durable `_last_checkpoint` metadata reflects the latest version.
let (last_version, _) = manager
.store()
.load_last_checkpoint()
.await
.unwrap()
.expect("checkpoint should be durable");
assert_eq!(last_version, 10);
// Stale deltas below the checkpoint version must still be present because
// the mocked deleter refused them.
let file_names = manager
.store()
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
let stale_delta_count = file_names.iter().filter(|name| is_delta_file(name)).count();
assert!(
stale_delta_count > 0,
"expected leftover delta files after failed delete, got {:?}",
file_names,
);
// Recovery must succeed despite the leftover deltas.
manager.stop().await;
let reopened = env
.create_manifest_manager(CompressionType::Uncompressed, 1, None)
.await
.unwrap()
.expect("manifest should be recoverable");
assert_eq!(reopened.manifest().manifest_version, 10);
}

View File

@@ -610,9 +610,16 @@ impl TestEnv {
let manifest_dir = data_home.join("manifest").as_path().display().to_string();
let builder = Fs::default();
let object_store = ObjectStore::new(builder.root(&manifest_dir))
.unwrap()
.finish();
let object_store = if let Some(mock_layer) = self.object_store_mock_layer.as_ref() {
ObjectStore::new(builder.root(&manifest_dir))
.unwrap()
.layer(mock_layer.clone())
.finish()
} else {
ObjectStore::new(builder.root(&manifest_dir))
.unwrap()
.finish()
};
// The "manifest_dir" here should be the relative path from the `object_store`'s root.
// Otherwise the OpenDal's list operation would fail with "StripPrefixError". This is