fix: manifest recovery scans after last version if possible (#8009)

* feat: suppport scan with start after

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add start_after test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: adjust remove dir warning

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test list_with_start_after

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: update get_paths call with start_after arg in checkpoint test

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: log scan metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fix start_after on manifest dir

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-23 10:42:59 +08:00
committed by GitHub
parent 1440924955
commit dc2f2cbfae
7 changed files with 270 additions and 29 deletions

View File

@@ -378,7 +378,7 @@ impl ManifestCache {
warn!(e; "Failed to remove empty root dir {}", dir.display());
return Err(e);
} else {
warn!("Empty root dir not found before removal {}", dir.display());
info!("Empty root dir not found before removal {}", dir.display());
}
} else {
info!(

View File

@@ -35,8 +35,8 @@ use crate::manifest::action::{
};
use crate::manifest::checkpointer::Checkpointer;
use crate::manifest::storage::{
ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
manifest_dir,
ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, list_start_after,
manifest_compress_type, manifest_dir,
};
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
@@ -652,13 +652,17 @@ impl RegionManifestManager {
pub async fn has_update(&self) -> Result<bool> {
let last_version = self.last_version();
let streamer =
self.store
.manifest_lister(false)
.await?
.context(error::EmptyManifestDirSnafu {
manifest_dir: self.store.manifest_dir(),
})?;
// Skip older files at the object-store layer. Files for `v == last_version`
// may still appear (`{path}{v:020}` sorts before `{path}{v:020}.json`) but
// they are filtered out below by the `version > last_version` check.
let start_after = list_start_after(self.store.manifest_dir(), last_version);
let streamer = self
.store
.manifest_lister(false, Some(&start_after))
.await?
.context(error::EmptyManifestDirSnafu {
manifest_dir: self.store.manifest_dir(),
})?;
let need_update = streamer
.try_any(|entry| async move {

View File

@@ -80,6 +80,24 @@ pub fn checkpoint_file(version: ManifestVersion) -> String {
format!("{version:020}.checkpoint")
}
/// Returns a lexicographic `start_after` key for an object-store `list`
/// request over the manifest directory at `path`.
///
/// `path` must be the same directory prefix passed to `lister_with(path)`
/// and must end with `/`. OpenDAL resolves `start_after` against the
/// operator root, not relative to the listed path, so the caller must
/// supply the full prefix — otherwise the bound is compared against keys
/// that already share a longer prefix and is silently a no-op.
pub(crate) fn list_start_after(path: &str, version: ManifestVersion) -> String {
debug_assert!(
path.ends_with('/'),
"list_start_after: path must end with '/', got {path:?}",
);
// Manifest files are named `{version:020}.{json,checkpoint}[.gz]` and sort lexicographically;
// `{path}{version:020}` is a strict prefix of `{path}{version:020}.{json,checkpoint}[.gz]`.
format!("{path}{version:020}")
}
pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
if compress_type == CompressionType::Uncompressed {
format!("{}{}", path, file)
@@ -198,11 +216,19 @@ impl ManifestObjectStore {
}
/// Returns an iterator of manifests from normal or staging directory.
pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
///
/// `start_after` is forwarded to the non-staging lister to skip entries
/// whose name is lexicographically less than or equal to it. It is
/// ignored for the staging directory.
pub(crate) async fn manifest_lister(
&self,
is_staging: bool,
start_after: Option<&str>,
) -> Result<Option<Lister>> {
if is_staging {
self.staging_storage.manifest_lister().await
} else {
self.delta_storage.manifest_lister().await
self.delta_storage.manifest_lister(start_after).await
}
}
@@ -243,9 +269,14 @@ impl ManifestObjectStore {
keep_last_checkpoint: bool,
) -> Result<usize> {
// Stores (entry, is_checkpoint, version) in a Vec.
//
// `start_after` is intentionally `None` here: a previous deletion
// may have been interrupted and left stale files at versions below
// the current checkpoint; we need the lister to surface them so
// cleanup can finish.
let entries: Vec<_> = self
.delta_storage
.get_paths(|entry| {
.get_paths(None, |entry| {
let file_name = entry.name();
let is_checkpoint = is_checkpoint_file(file_name);
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
@@ -448,12 +479,16 @@ mod tests {
use crate::manifest::storage::checkpoint::CheckpointMetadata;
fn new_test_manifest_store() -> ManifestObjectStore {
new_test_manifest_store_at("/")
}
fn new_test_manifest_store_at(path: &str) -> ManifestObjectStore {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_manifest_log_store");
let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
ManifestObjectStore::new(
"/",
path,
object_store,
CompressionType::Uncompressed,
Default::default(),
@@ -718,4 +753,66 @@ mod tests {
assert_eq!(log_store.total_manifest_size(), 0);
}
#[tokio::test]
async fn test_scan_with_start_after_uncompress() {
let mut log_store = new_test_manifest_store();
log_store.set_compress_type(CompressionType::Uncompressed);
test_scan_with_start_after_case(log_store).await;
}
#[tokio::test]
async fn test_scan_with_start_after_compress() {
let mut log_store = new_test_manifest_store();
log_store.set_compress_type(CompressionType::Gzip);
test_scan_with_start_after_case(log_store).await;
}
// OpenDAL resolves `start_after` against the operator
// root, so the bound must embed the manifest directory prefix. Running the
// same assertions against a non-root path exercises that composition.
#[tokio::test]
async fn test_scan_with_start_after_nested_path() {
let mut log_store = new_test_manifest_store_at("/nested/region-1/");
log_store.set_compress_type(CompressionType::Uncompressed);
test_scan_with_start_after_case(log_store).await;
}
async fn test_scan_with_start_after_case(mut log_store: ManifestObjectStore) {
for v in 0..10 {
log_store
.save(v, format!("hello, {v}").as_bytes(), false)
.await
.unwrap();
}
// A checkpoint at version 5 shares the directory; scan must still
// return only delta files in range.
log_store
.save_checkpoint(5, "checkpoint".as_bytes())
.await
.unwrap();
// start > 0: `start_after` must skip pre-start deltas without losing any.
let entries = log_store.delta_storage.scan(3, 10).await.unwrap();
let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
assert_eq!(versions, vec![3, 4, 5, 6, 7, 8, 9]);
// start == 0: `start_after` is skipped; every delta is returned.
let entries = log_store.delta_storage.scan(0, 10).await.unwrap();
let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
assert_eq!(versions, (0..10).collect::<Vec<_>>());
// Upper bound exclusive.
let entries = log_store.delta_storage.scan(7, 9).await.unwrap();
let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
assert_eq!(versions, vec![7, 8]);
// Start beyond any existing file returns empty.
let entries = log_store
.delta_storage
.scan(10, ManifestVersion::MAX)
.await
.unwrap();
assert!(entries.is_empty());
}
}

View File

@@ -34,7 +34,7 @@ use crate::manifest::storage::utils::{
};
use crate::manifest::storage::{
FETCH_MANIFEST_PARALLELISM, delta_file, file_compress_type, file_version, gen_path,
is_delta_file,
is_delta_file, list_start_after,
};
#[derive(Debug, Clone)]
@@ -76,8 +76,18 @@ impl<T: Tracker> DeltaStorage<T> {
}
/// Returns an iterator of manifests from path directory.
pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
match self.object_store.lister_with(&self.path).await {
///
/// If `start_after` is `Some`, the lister will skip entries whose name is
/// lexicographically less than or equal to it (see OpenDAL's `start_after`).
pub(crate) async fn manifest_lister(
&self,
start_after: Option<&str>,
) -> Result<Option<Lister>> {
let mut builder = self.object_store.lister_with(&self.path);
if let Some(s) = start_after {
builder = builder.start_after(s);
}
match builder.await {
Ok(streamer) => Ok(Some(streamer)),
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exist: {}", self.path);
@@ -90,16 +100,22 @@ impl<T: Tracker> DeltaStorage<T> {
/// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
/// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
/// Return an empty vector when directory is not found.
pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
///
/// `start_after` is forwarded to the underlying lister to skip entries
/// whose name is lexicographically less than or equal to it.
pub async fn get_paths<F, R>(&self, start_after: Option<&str>, mut filter: F) -> Result<Vec<R>>
where
F: Fn(Entry) -> Option<R>,
F: FnMut(Entry) -> Option<R>,
{
let Some(streamer) = self.manifest_lister().await? else {
let Some(streamer) = self.manifest_lister(start_after).await? else {
return Ok(vec![]);
};
streamer
.try_filter_map(|e| async { Ok(filter(e)) })
.try_filter_map(|e| {
let result = filter(e);
async { Ok(result) }
})
.try_collect::<Vec<_>>()
.await
.context(OpenDalSnafu)
@@ -113,8 +129,13 @@ impl<T: Tracker> DeltaStorage<T> {
) -> Result<Vec<(ManifestVersion, Entry)>> {
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
// Push the version lower bound into the list request via
// `list_start_after`; skip the hint when `start == 0` (nothing to skip).
let start_after = (start > 0).then(|| list_start_after(&self.path, start));
let mut total_paths = 0;
let mut entries: Vec<(ManifestVersion, Entry)> = self
.get_paths(|entry| {
.get_paths(start_after.as_deref(), |entry| {
total_paths += 1;
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
@@ -128,6 +149,16 @@ impl<T: Tracker> DeltaStorage<T> {
sort_manifests(&mut entries);
common_telemetry::debug!(
"DeltaStorage get paths for {}, start: {}, end: {}, start_after: {:?}, total_paths: {}, entries: {}",
self.path,
start,
end,
start_after,
total_paths,
entries.len()
);
Ok(entries)
}

View File

@@ -156,14 +156,14 @@ impl StagingStorage {
/// Returns an iterator of manifests from staging directory.
pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
self.delta_storage.manifest_lister().await
self.delta_storage.manifest_lister(None).await
}
/// Fetch all staging manifest files and return them as (version, action_list) pairs.
pub(crate) async fn fetch_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifest_entries = self
.delta_storage
.get_paths(|entry| {
.get_paths(None, |entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);

View File

@@ -123,7 +123,7 @@ async fn manager_without_checkpoint() {
let mut paths = manager
.store()
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.get_paths(None, |e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
@@ -166,7 +166,7 @@ async fn manager_with_checkpoint_distance_1() {
let mut paths = manager
.store()
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.get_paths(None, |e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
@@ -421,7 +421,7 @@ async fn manifest_install_manifest_to_with_checkpoint() {
let mut paths = manager
.store()
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.get_paths(None, |e| Some(e.name().to_string()))
.await
.unwrap();
@@ -567,10 +567,10 @@ async fn checkpoint_advances_and_recovery_works_when_delete_fails() {
// Stale deltas below the checkpoint version must still be present because
// the mocked deleter refused them.
let file_names = manager
let file_names: Vec<String> = manager
.store()
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.get_paths(None, |e| Some(e.name().to_string()))
.await
.unwrap();
let stale_delta_count = file_names.iter().filter(|name| is_delta_file(name)).count();