diff --git a/src/mito2/src/manifest/checkpointer.rs b/src/mito2/src/manifest/checkpointer.rs index 1da03dda21..9dd35e189a 100644 --- a/src/mito2/src/manifest/checkpointer.rs +++ b/src/mito2/src/manifest/checkpointer.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use store_api::storage::RegionId; use store_api::{MIN_VERSION, ManifestVersion}; @@ -69,14 +69,18 @@ impl Inner { return; } - if let Err(e) = self.manifest_store.delete_until(version, true).await { - error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id); - return; - } - + // Advance the in-memory checkpoint version as soon as the checkpoint file + // is durable. If the subsequent delta cleanup fails, the on-disk state is + // still consistent (the `_last_checkpoint` metadata points at the new + // checkpoint) and `maybe_do_checkpoint` must not re-checkpoint the same + // range. self.last_checkpoint_version .store(version, Ordering::Relaxed); + if let Err(e) = self.manifest_store.delete_until(version, true).await { + warn!(e; "Failed to delete manifest actions until version {} for region {}, leftover files will be ignored on recovery", version, region_id); + } + info!( "Checkpoint for region {} success, version: {}", region_id, version diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e489c6756d..97e840520e 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; use common_datasource::compression::CompressionType; use common_telemetry::{debug, info}; @@ -255,6 +256,7 @@ impl RegionManifestManager { let _t = MANIFEST_OP_ELAPSED .with_label_values(&["open"]) .start_timer(); + let open_start = Instant::now(); // construct storage let mut store = ManifestObjectStore::new( @@ -290,8 +292,15 @@ impl RegionManifestManager { RegionManifestBuilder::default() }; + let replay_start_version = version; + info!( + "Replaying region manifest {} from version {}, last checkpoint version: {}", + options.manifest_dir, replay_start_version, last_checkpoint_version, + ); + // apply actions from storage let manifests = store.fetch_manifests(version, MAX_VERSION).await?; + let replayed_deltas = manifests.len(); for (manifest_version, raw_action_list) in manifests { let action_list = RegionMetaActionList::decode(&raw_action_list)?; @@ -334,6 +343,7 @@ impl RegionManifestManager { ); let version = manifest.manifest_version; + let manifest_dir = options.manifest_dir.clone(); let checkpointer = Checkpointer::new( manifest.metadata.region_id, options, @@ -344,6 +354,16 @@ impl RegionManifestManager { manifest .removed_files .update_file_removed_cnt_to_stats(stats); + info!( + "Opened region manifest {}, region_id: {}, start_version: {}, last_checkpoint_version: {}, replayed_deltas: {}, final_version: {}, cost: {:?}", + manifest_dir, + manifest.metadata.region_id, + replay_start_version, + last_checkpoint_version, + replayed_deltas, + version, + open_start.elapsed(), + ); Ok(Some(Self { store, last_version: manifest_version, diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 6013c0393f..34061139f9 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -24,18 +24,22 @@ use std::sync::Arc; use std::sync::atomic::AtomicU64; use common_datasource::compression::CompressionType; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use crc32fast::Hasher; use lazy_static::lazy_static; use object_store::util::join_dir; use object_store::{Lister, ObjectStore, util}; use regex::Regex; -use snafu::{ResultExt, ensure}; +#[cfg(test)] +use snafu::ResultExt; +use snafu::ensure; use store_api::ManifestVersion; use store_api::storage::RegionId; use crate::cache::manifest_cache::ManifestCache; -use crate::error::{ChecksumMismatchSnafu, OpenDalSnafu, Result}; +#[cfg(test)] +use crate::error::OpenDalSnafu; +use crate::error::{ChecksumMismatchSnafu, Result}; use crate::manifest::storage::checkpoint::CheckpointStorage; use crate::manifest::storage::delta::DeltaStorage; use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker}; @@ -287,11 +291,11 @@ impl ManifestObjectStore { .iter() .map(|(e, _, _)| e.path().to_string()) .collect::>(); - let ret = paths.len(); + let total = paths.len(); debug!( "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}", - ret, self.path, end, checkpoint_version, paths, + total, self.path, end, checkpoint_version, paths, ); // Remove from cache first @@ -299,13 +303,37 @@ impl ManifestObjectStore { remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await; } - self.object_store - .delete_iter(paths) - .await - .context(OpenDalSnafu)?; + // Try batch delete first. On failure, fall back to per-file deletes. + // This is a workaround for S3-compatible object stores that do not support batch delete. See issue #7986. + let mut succeeded = vec![false; del_entries.len()]; + match self.object_store.delete_iter(paths.clone()).await { + Ok(()) => succeeded.fill(true), + Err(batch_err) => { + warn!( + batch_err; + "Batch delete failed for manifest path {}, falling back to per-file delete for {} paths", + self.path, total, + ); + for (i, path) in paths.iter().enumerate() { + if let Err(e) = self.object_store.delete(path).await { + warn!( + e; + "Failed to delete manifest file {} under {}, aborting fallback, {} files will be retried on next checkpoint", + path, self.path, total - i, + ); + break; + } + succeeded[i] = true; + } + } + } - // delete manifest sizes - for (_, is_checkpoint, version) in &del_entries { + let mut deleted = 0usize; + for (i, (_, is_checkpoint, version)) in del_entries.iter().enumerate() { + if !succeeded[i] { + continue; + } + deleted += 1; if *is_checkpoint { self.size_tracker .remove(&size_tracker::FileKey::Checkpoint(*version)); @@ -315,7 +343,7 @@ impl ManifestObjectStore { } } - Ok(ret) + Ok(deleted) } /// Save the delta manifest file. diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index e50650ccee..a2d64c990e 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -14,9 +14,13 @@ use std::assert_matches; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use common_datasource::compression::CompressionType; +use object_store::layers::mock::{ + Error as MockError, ErrorKind, MockLayerBuilder, OpDelete, Result as MockResult, oio, +}; use store_api::storage::{FileId, RegionId}; use strum::IntoEnumIterator; @@ -26,6 +30,7 @@ use crate::manifest::action::{ }; use crate::manifest::manager::RegionManifestManager; use crate::manifest::storage::checkpoint::CheckpointMetadata; +use crate::manifest::storage::is_delta_file; use crate::manifest::tests::utils::basic_region_metadata; use crate::sst::file::FileMeta; use crate::test_util::TestEnv; @@ -491,3 +496,96 @@ async fn test_checkpoint_bypass_in_staging_mode() { // Checkpoint should include all 16 actions (15 from staging + 1 from writable) assert_eq!(last_version, 16); } + +/// A deleter that fails on `flush`, simulating the S3 batch-delete failure +/// described in issue #7986. +struct FailingDeleter { + inner: oio::Deleter, + flush_calls: Arc, +} + +impl oio::Delete for FailingDeleter { + fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> { + self.inner.delete(path, args) + } + + async fn flush(&mut self) -> MockResult { + self.flush_calls.fetch_add(1, Ordering::Relaxed); + Err(MockError::new( + ErrorKind::Unexpected, + "mock manifest delete flush failure", + )) + } +} + +#[tokio::test] +async fn checkpoint_advances_and_recovery_works_when_delete_fails() { + common_telemetry::init_default_ut_logging(); + + let flush_calls = Arc::new(AtomicUsize::new(0)); + let factory_flush_calls = flush_calls.clone(); + let mock_layer = MockLayerBuilder::default() + .deleter_factory(Arc::new(move |inner| { + Box::new(FailingDeleter { + inner, + flush_calls: factory_flush_calls.clone(), + }) + })) + .build() + .unwrap(); + + let env = TestEnv::new().await.with_mock_layer(mock_layer); + let metadata = Arc::new(basic_region_metadata()); + let mut manager = env + .create_manifest_manager(CompressionType::Uncompressed, 1, Some(metadata)) + .await + .unwrap() + .unwrap(); + + for _ in 0..10 { + manager.update(nop_action(), false).await.unwrap(); + while manager.checkpointer().is_doing_checkpoint() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + // The checkpointer must have attempted to delete stale files at least once. + assert!(flush_calls.load(Ordering::Relaxed) > 0); + + // Despite delete failures, the in-memory checkpoint marker advances so + // subsequent `maybe_do_checkpoint` calls compute correct ranges. + assert_eq!(manager.checkpointer().last_checkpoint_version(), 10); + + // And the durable `_last_checkpoint` metadata reflects the latest version. + let (last_version, _) = manager + .store() + .load_last_checkpoint() + .await + .unwrap() + .expect("checkpoint should be durable"); + assert_eq!(last_version, 10); + + // Stale deltas below the checkpoint version must still be present because + // the mocked deleter refused them. + let file_names = manager + .store() + .delta_storage() + .get_paths(|e| Some(e.name().to_string())) + .await + .unwrap(); + let stale_delta_count = file_names.iter().filter(|name| is_delta_file(name)).count(); + assert!( + stale_delta_count > 0, + "expected leftover delta files after failed delete, got {:?}", + file_names, + ); + + // Recovery must succeed despite the leftover deltas. + manager.stop().await; + let reopened = env + .create_manifest_manager(CompressionType::Uncompressed, 1, None) + .await + .unwrap() + .expect("manifest should be recoverable"); + assert_eq!(reopened.manifest().manifest_version, 10); +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 350195bfa9..15785d8ac7 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -610,9 +610,16 @@ impl TestEnv { let manifest_dir = data_home.join("manifest").as_path().display().to_string(); let builder = Fs::default(); - let object_store = ObjectStore::new(builder.root(&manifest_dir)) - .unwrap() - .finish(); + let object_store = if let Some(mock_layer) = self.object_store_mock_layer.as_ref() { + ObjectStore::new(builder.root(&manifest_dir)) + .unwrap() + .layer(mock_layer.clone()) + .finish() + } else { + ObjectStore::new(builder.root(&manifest_dir)) + .unwrap() + .finish() + }; // The "manifest_dir" here should be the relative path from the `object_store`'s root. // Otherwise the OpenDal's list operation would fail with "StripPrefixError". This is