diff --git a/src/mito2/src/cache/manifest_cache.rs b/src/mito2/src/cache/manifest_cache.rs index 458b199386..08ab0fb14a 100644 --- a/src/mito2/src/cache/manifest_cache.rs +++ b/src/mito2/src/cache/manifest_cache.rs @@ -378,7 +378,7 @@ impl ManifestCache { warn!(e; "Failed to remove empty root dir {}", dir.display()); return Err(e); } else { - warn!("Empty root dir not found before removal {}", dir.display()); + info!("Empty root dir not found before removal {}", dir.display()); } } else { info!( diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 97e840520e..a69f08693c 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -35,8 +35,8 @@ use crate::manifest::action::{ }; use crate::manifest::checkpointer::Checkpointer; use crate::manifest::storage::{ - ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type, - manifest_dir, + ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, list_start_after, + manifest_compress_type, manifest_dir, }; use crate::metrics::MANIFEST_OP_ELAPSED; use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState}; @@ -652,13 +652,17 @@ impl RegionManifestManager { pub async fn has_update(&self) -> Result { let last_version = self.last_version(); - let streamer = - self.store - .manifest_lister(false) - .await? - .context(error::EmptyManifestDirSnafu { - manifest_dir: self.store.manifest_dir(), - })?; + // Skip older files at the object-store layer. Files for `v == last_version` + // may still appear (`{path}{v:020}` sorts before `{path}{v:020}.json`) but + // they are filtered out below by the `version > last_version` check. + let start_after = list_start_after(self.store.manifest_dir(), last_version); + let streamer = self + .store + .manifest_lister(false, Some(&start_after)) + .await? + .context(error::EmptyManifestDirSnafu { + manifest_dir: self.store.manifest_dir(), + })?; let need_update = streamer .try_any(|entry| async move { diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 34061139f9..5f795cc1c9 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -80,6 +80,24 @@ pub fn checkpoint_file(version: ManifestVersion) -> String { format!("{version:020}.checkpoint") } +/// Returns a lexicographic `start_after` key for an object-store `list` +/// request over the manifest directory at `path`. +/// +/// `path` must be the same directory prefix passed to `lister_with(path)` +/// and must end with `/`. OpenDAL resolves `start_after` against the +/// operator root, not relative to the listed path, so the caller must +/// supply the full prefix — otherwise the bound is compared against keys +/// that already share a longer prefix and is silently a no-op. +pub(crate) fn list_start_after(path: &str, version: ManifestVersion) -> String { + debug_assert!( + path.ends_with('/'), + "list_start_after: path must end with '/', got {path:?}", + ); + // Manifest files are named `{version:020}.{json,checkpoint}[.gz]` and sort lexicographically; + // `{path}{version:020}` is a strict prefix of `{path}{version:020}.{json,checkpoint}[.gz]`. + format!("{path}{version:020}") +} + pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String { if compress_type == CompressionType::Uncompressed { format!("{}{}", path, file) @@ -198,11 +216,19 @@ impl ManifestObjectStore { } /// Returns an iterator of manifests from normal or staging directory. - pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result> { + /// + /// `start_after` is forwarded to the non-staging lister to skip entries + /// whose name is lexicographically less than or equal to it. It is + /// ignored for the staging directory. + pub(crate) async fn manifest_lister( + &self, + is_staging: bool, + start_after: Option<&str>, + ) -> Result> { if is_staging { self.staging_storage.manifest_lister().await } else { - self.delta_storage.manifest_lister().await + self.delta_storage.manifest_lister(start_after).await } } @@ -243,9 +269,14 @@ impl ManifestObjectStore { keep_last_checkpoint: bool, ) -> Result { // Stores (entry, is_checkpoint, version) in a Vec. + // + // `start_after` is intentionally `None` here: a previous deletion + // may have been interrupted and left stale files at versions below + // the current checkpoint; we need the lister to surface them so + // cleanup can finish. let entries: Vec<_> = self .delta_storage - .get_paths(|entry| { + .get_paths(None, |entry| { let file_name = entry.name(); let is_checkpoint = is_checkpoint_file(file_name); if is_delta_file(file_name) || is_checkpoint_file(file_name) { @@ -448,12 +479,16 @@ mod tests { use crate::manifest::storage::checkpoint::CheckpointMetadata; fn new_test_manifest_store() -> ManifestObjectStore { + new_test_manifest_store_at("/") + } + + fn new_test_manifest_store_at(path: &str) -> ManifestObjectStore { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_manifest_log_store"); let builder = Fs::default().root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); ManifestObjectStore::new( - "/", + path, object_store, CompressionType::Uncompressed, Default::default(), @@ -718,4 +753,66 @@ mod tests { assert_eq!(log_store.total_manifest_size(), 0); } + + #[tokio::test] + async fn test_scan_with_start_after_uncompress() { + let mut log_store = new_test_manifest_store(); + log_store.set_compress_type(CompressionType::Uncompressed); + test_scan_with_start_after_case(log_store).await; + } + + #[tokio::test] + async fn test_scan_with_start_after_compress() { + let mut log_store = new_test_manifest_store(); + log_store.set_compress_type(CompressionType::Gzip); + test_scan_with_start_after_case(log_store).await; + } + + // OpenDAL resolves `start_after` against the operator + // root, so the bound must embed the manifest directory prefix. Running the + // same assertions against a non-root path exercises that composition. + #[tokio::test] + async fn test_scan_with_start_after_nested_path() { + let mut log_store = new_test_manifest_store_at("/nested/region-1/"); + log_store.set_compress_type(CompressionType::Uncompressed); + test_scan_with_start_after_case(log_store).await; + } + + async fn test_scan_with_start_after_case(mut log_store: ManifestObjectStore) { + for v in 0..10 { + log_store + .save(v, format!("hello, {v}").as_bytes(), false) + .await + .unwrap(); + } + // A checkpoint at version 5 shares the directory; scan must still + // return only delta files in range. + log_store + .save_checkpoint(5, "checkpoint".as_bytes()) + .await + .unwrap(); + + // start > 0: `start_after` must skip pre-start deltas without losing any. + let entries = log_store.delta_storage.scan(3, 10).await.unwrap(); + let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect(); + assert_eq!(versions, vec![3, 4, 5, 6, 7, 8, 9]); + + // start == 0: `start_after` is skipped; every delta is returned. + let entries = log_store.delta_storage.scan(0, 10).await.unwrap(); + let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect(); + assert_eq!(versions, (0..10).collect::>()); + + // Upper bound exclusive. + let entries = log_store.delta_storage.scan(7, 9).await.unwrap(); + let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect(); + assert_eq!(versions, vec![7, 8]); + + // Start beyond any existing file returns empty. + let entries = log_store + .delta_storage + .scan(10, ManifestVersion::MAX) + .await + .unwrap(); + assert!(entries.is_empty()); + } } diff --git a/src/mito2/src/manifest/storage/delta.rs b/src/mito2/src/manifest/storage/delta.rs index 3f312f1d07..594b56ddae 100644 --- a/src/mito2/src/manifest/storage/delta.rs +++ b/src/mito2/src/manifest/storage/delta.rs @@ -34,7 +34,7 @@ use crate::manifest::storage::utils::{ }; use crate::manifest::storage::{ FETCH_MANIFEST_PARALLELISM, delta_file, file_compress_type, file_version, gen_path, - is_delta_file, + is_delta_file, list_start_after, }; #[derive(Debug, Clone)] @@ -76,8 +76,18 @@ impl DeltaStorage { } /// Returns an iterator of manifests from path directory. - pub(crate) async fn manifest_lister(&self) -> Result> { - match self.object_store.lister_with(&self.path).await { + /// + /// If `start_after` is `Some`, the lister will skip entries whose name is + /// lexicographically less than or equal to it (see OpenDAL's `start_after`). + pub(crate) async fn manifest_lister( + &self, + start_after: Option<&str>, + ) -> Result> { + let mut builder = self.object_store.lister_with(&self.path); + if let Some(s) = start_after { + builder = builder.start_after(s); + } + match builder.await { Ok(streamer) => Ok(Some(streamer)), Err(e) if e.kind() == ErrorKind::NotFound => { debug!("Manifest directory does not exist: {}", self.path); @@ -90,16 +100,22 @@ impl DeltaStorage { /// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`), /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`) /// Return an empty vector when directory is not found. - pub async fn get_paths(&self, filter: F) -> Result> + /// + /// `start_after` is forwarded to the underlying lister to skip entries + /// whose name is lexicographically less than or equal to it. + pub async fn get_paths(&self, start_after: Option<&str>, mut filter: F) -> Result> where - F: Fn(Entry) -> Option, + F: FnMut(Entry) -> Option, { - let Some(streamer) = self.manifest_lister().await? else { + let Some(streamer) = self.manifest_lister(start_after).await? else { return Ok(vec![]); }; streamer - .try_filter_map(|e| async { Ok(filter(e)) }) + .try_filter_map(|e| { + let result = filter(e); + async { Ok(result) } + }) .try_collect::>() .await .context(OpenDalSnafu) @@ -113,8 +129,13 @@ impl DeltaStorage { ) -> Result> { ensure!(start <= end, InvalidScanIndexSnafu { start, end }); + // Push the version lower bound into the list request via + // `list_start_after`; skip the hint when `start == 0` (nothing to skip). + let start_after = (start > 0).then(|| list_start_after(&self.path, start)); + let mut total_paths = 0; let mut entries: Vec<(ManifestVersion, Entry)> = self - .get_paths(|entry| { + .get_paths(start_after.as_deref(), |entry| { + total_paths += 1; let file_name = entry.name(); if is_delta_file(file_name) { let version = file_version(file_name); @@ -128,6 +149,16 @@ impl DeltaStorage { sort_manifests(&mut entries); + common_telemetry::debug!( + "DeltaStorage get paths for {}, start: {}, end: {}, start_after: {:?}, total_paths: {}, entries: {}", + self.path, + start, + end, + start_after, + total_paths, + entries.len() + ); + Ok(entries) } diff --git a/src/mito2/src/manifest/storage/staging.rs b/src/mito2/src/manifest/storage/staging.rs index 17d341605c..95ad00fb8c 100644 --- a/src/mito2/src/manifest/storage/staging.rs +++ b/src/mito2/src/manifest/storage/staging.rs @@ -156,14 +156,14 @@ impl StagingStorage { /// Returns an iterator of manifests from staging directory. pub(crate) async fn manifest_lister(&self) -> Result> { - self.delta_storage.manifest_lister().await + self.delta_storage.manifest_lister(None).await } /// Fetch all staging manifest files and return them as (version, action_list) pairs. pub(crate) async fn fetch_manifests(&self) -> Result)>> { let manifest_entries = self .delta_storage - .get_paths(|entry| { + .get_paths(None, |entry| { let file_name = entry.name(); if is_delta_file(file_name) { let version = file_version(file_name); diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index a2d64c990e..c1511de47a 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -123,7 +123,7 @@ async fn manager_without_checkpoint() { let mut paths = manager .store() .delta_storage() - .get_paths(|e| Some(e.name().to_string())) + .get_paths(None, |e| Some(e.name().to_string())) .await .unwrap(); paths.sort_unstable(); @@ -166,7 +166,7 @@ async fn manager_with_checkpoint_distance_1() { let mut paths = manager .store() .delta_storage() - .get_paths(|e| Some(e.name().to_string())) + .get_paths(None, |e| Some(e.name().to_string())) .await .unwrap(); paths.sort_unstable(); @@ -421,7 +421,7 @@ async fn manifest_install_manifest_to_with_checkpoint() { let mut paths = manager .store() .delta_storage() - .get_paths(|e| Some(e.name().to_string())) + .get_paths(None, |e| Some(e.name().to_string())) .await .unwrap(); @@ -567,10 +567,10 @@ async fn checkpoint_advances_and_recovery_works_when_delete_fails() { // Stale deltas below the checkpoint version must still be present because // the mocked deleter refused them. - let file_names = manager + let file_names: Vec = manager .store() .delta_storage() - .get_paths(|e| Some(e.name().to_string())) + .get_paths(None, |e| Some(e.name().to_string())) .await .unwrap(); let stale_delta_count = file_names.iter().filter(|name| is_delta_file(name)).count(); diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 5df6ab04bd..3dac15da46 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -17,6 +17,7 @@ use std::env; use anyhow::Result; use common_telemetry::info; use common_test_util::temp_dir::create_temp_dir; +use futures::TryStreamExt; use object_store::ObjectStore; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; @@ -103,6 +104,109 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { Ok(()) } +async fn test_object_list_start_after(store: &ObjectStore) -> Result<()> { + let scheme = store.info().scheme(); + // `start_after` is a service-level capability. Skip the checks when the + // backend (e.g. the local Fs service) doesn't honor it natively — the + // bound would be silently ignored and the full listing returned. + if !store.info().native_capability().list_with_start_after { + info!("Skip test_object_list_start_after: backend {scheme} lacks start_after support"); + return Ok(()); + } + info!("Run test_object_list_start_after on backend {scheme}"); + + let files = [ + "00000000000000000001.json", + "00000000000000000002.json", + "00000000000000000003.checkpoint", + "00000000000000000003.json", + "00000000000000000004.json", + ]; + for name in files { + store.write(name, "x").await?; + } + + // Bare 20-digit bound: versions 1..=2 are skipped; version-3 deltas and + // checkpoint are kept (their `.` suffix sorts after the bound). + let lister = store + .lister_with("/") + .start_after("00000000000000000003") + .await?; + let mut got: Vec = lister + .try_collect::>() + .await? + .into_iter() + .filter(|e| e.metadata().mode() == EntryMode::FILE) + .map(|e| e.name().to_string()) + .collect(); + got.sort(); + let mut expected = vec![ + "00000000000000000003.checkpoint".to_string(), + "00000000000000000003.json".to_string(), + "00000000000000000004.json".to_string(), + ]; + expected.sort(); + assert_eq!(expected, got); + + // A bound that matches an existing name exactly excludes that name. + let lister = store + .lister_with("/") + .start_after("00000000000000000003.json") + .await?; + let got: Vec = lister + .try_collect::>() + .await? + .into_iter() + .filter(|e| e.metadata().mode() == EntryMode::FILE) + .map(|e| e.name().to_string()) + .collect(); + assert_eq!(vec!["00000000000000000004.json".to_string()], got); + + for name in files { + store.delete(name).await?; + } + + // OpenDAL resolves `start_after` against the operator root, not the + // `lister_with` path. For a nested prefix like `manifest/`, the bound + // must also embed that prefix — passing only the bare 20-digit name is + // silently a no-op because the full keys start with `m...` > `0...`. + let nested_files = [ + "manifest/00000000000000000001.json", + "manifest/00000000000000000002.json", + "manifest/00000000000000000003.checkpoint", + "manifest/00000000000000000003.json", + "manifest/00000000000000000004.json", + ]; + for name in nested_files { + store.write(name, "x").await?; + } + + let lister = store + .lister_with("manifest/") + .start_after("manifest/00000000000000000003") + .await?; + let mut got: Vec = lister + .try_collect::>() + .await? + .into_iter() + .filter(|e| e.metadata().mode() == EntryMode::FILE) + .map(|e| e.name().to_string()) + .collect(); + got.sort(); + let mut expected = vec![ + "00000000000000000003.checkpoint".to_string(), + "00000000000000000003.json".to_string(), + "00000000000000000004.json".to_string(), + ]; + expected.sort(); + assert_eq!(expected, got); + + for name in nested_files { + store.delete(name).await?; + } + Ok(()) +} + fn assert_opendal_metrics() { let metric_families = prometheus::gather(); let mut buffer = Vec::new(); @@ -129,6 +233,7 @@ async fn test_fs_backend() -> Result<()> { test_object_crud(&store).await?; test_object_list(&store).await?; + test_object_list_start_after(&store).await?; assert_opendal_metrics(); @@ -158,6 +263,7 @@ async fn test_s3_backend() -> Result<()> { let guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; + test_object_list_start_after(&store).await?; assert_opendal_metrics(); guard.remove_all().await?; } @@ -187,6 +293,7 @@ async fn test_oss_backend() -> Result<()> { let guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; + test_object_list_start_after(&store).await?; assert_opendal_metrics(); guard.remove_all().await?; } @@ -216,6 +323,7 @@ async fn test_azblob_backend() -> Result<()> { let guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; + test_object_list_start_after(&store).await?; assert_opendal_metrics(); guard.remove_all().await?; } @@ -244,6 +352,7 @@ async fn test_gcs_backend() -> Result<()> { let guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; + test_object_list_start_after(&store).await?; assert_opendal_metrics(); guard.remove_all().await?; }