refactor(cli): split export-v2 verify into plan/scan/reconcile (#8206)

* refactor(cli): split export-v2 verify into plan/scan/reconcile

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-06-02 20:10:10 -07:00
committed by GitHub
parent f3919a3117
commit aaffe1a048

View File

@@ -1077,7 +1077,9 @@ async fn verify_snapshot(storage: &OpenDalStorage) -> Result<VerifyReport> {
));
}
let data_files = storage.list_files_recursive("data/").await?;
if let Some(path) = data_files.first() {
// Report the lexicographically smallest path so the message is stable
// regardless of listing order across backends.
if let Some(path) = data_files.iter().min() {
report.push_error(format!(
"Schema-only snapshot should not contain data files (found '{}')",
path
@@ -1103,75 +1105,113 @@ fn summarize_chunks(manifest: &Manifest) -> VerifyChunkSummary {
}
}
/// A data file declared by a completed chunk that is expected to exist in storage.
#[derive(Debug)]
struct ChunkFile {
chunk_id: u32,
path: String,
}
/// Expected snapshot contents derived purely from the manifest (no object-store IO).
///
/// Separating planning from scanning makes it obvious which problems come from
/// the manifest alone and which require comparing against actual storage.
#[derive(Debug, Default)]
struct VerifyPlan {
/// Valid data files declared by completed chunks; each must exist in storage.
files_to_check: Vec<ChunkFile>,
/// All syntactically-safe data paths declared by any chunk, regardless of
/// status. Used as the orphan-detection baseline so a listed-but-invalid
/// file is not also reported as unexpected.
claimed_data_files: HashSet<String>,
/// Total data-file references in completed chunks (valid + invalid).
data_files_total: usize,
/// Problems detectable from the manifest alone.
problems: Vec<VerifyProblem>,
}
/// Actual data files discovered under `data/` (the only object-store IO in
/// chunk/data-file verification).
#[derive(Debug)]
struct VerifyDataScan {
existing_data_files: HashSet<String>,
}
/// Result of reconciling the manifest plan against the storage scan.
#[derive(Debug, Default)]
struct VerifyOutcome {
data_files_total: usize,
data_files_verified: usize,
problems: Vec<VerifyProblem>,
}
async fn verify_chunks_and_data_files(
storage: &OpenDalStorage,
report: &mut VerifyReport,
) -> Result<()> {
let existing_files: HashSet<_> = storage
.list_files_recursive("data/")
.await?
.into_iter()
.collect();
let mut data_files_total = 0;
let mut data_files_verified = 0;
let mut problems = Vec::new();
let mut seen_chunk_ids = HashSet::new();
let mut claimed_data_files = HashSet::new();
let plan = build_verify_plan(&report.manifest);
let scan = scan_data_files(storage).await?;
let outcome = reconcile_plan_with_scan(plan, &scan);
for chunk in &report.manifest.chunks {
report.data_files_total = outcome.data_files_total;
report.data_files_verified = outcome.data_files_verified;
report.problems.extend(outcome.problems);
Ok(())
}
/// Builds the expected-state plan from the manifest. Pure; performs no IO.
fn build_verify_plan(manifest: &Manifest) -> VerifyPlan {
let mut plan = VerifyPlan::default();
let mut seen_chunk_ids = HashSet::new();
for chunk in &manifest.chunks {
if !seen_chunk_ids.insert(chunk.id) {
problems.push(VerifyProblem {
plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: duplicate chunk id", chunk.id),
});
}
for file in &chunk.files {
if let Some(path) = safe_manifest_data_file_path(file) {
claimed_data_files.insert(path.to_string());
plan.claimed_data_files.insert(path.to_string());
}
}
match chunk.status {
ChunkStatus::Completed => {
if chunk.files.is_empty() {
problems.push(VerifyProblem {
plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: completed chunk has no data files", chunk.id),
});
continue;
}
let allowed_prefixes = report
.manifest
let allowed_prefixes = manifest
.schemas
.iter()
.map(|schema| data_dir_for_schema_chunk(schema, chunk.id))
.collect::<Vec<_>>();
for file in &chunk.files {
data_files_total += 1;
let Some(path) = valid_manifest_data_file_path(file, &allowed_prefixes) else {
problems.push(VerifyProblem {
plan.data_files_total += 1;
match valid_manifest_data_file_path(file, &allowed_prefixes) {
Some(path) => plan.files_to_check.push(ChunkFile {
chunk_id: chunk.id,
path: path.to_string(),
}),
None => plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!(
"Chunk {}: invalid data file path '{}'",
chunk.id, file
),
});
continue;
};
if existing_files.contains(path) {
data_files_verified += 1;
} else {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: missing file '{}'", chunk.id, path),
});
}),
}
}
}
ChunkStatus::Skipped => {
if !chunk.files.is_empty() {
problems.push(VerifyProblem {
plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!(
"Chunk {}: skipped chunk should not list data files",
@@ -1181,20 +1221,20 @@ async fn verify_chunks_and_data_files(
}
}
ChunkStatus::Pending => {
problems.push(VerifyProblem {
plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'pending'", chunk.id),
});
}
ChunkStatus::InProgress => {
problems.push(VerifyProblem {
plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'in_progress'", chunk.id),
});
}
ChunkStatus::Failed => {
let reason = chunk.error.as_deref().unwrap_or("unknown error");
problems.push(VerifyProblem {
plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'failed' (error: {})", chunk.id, reason),
});
@@ -1202,20 +1242,60 @@ async fn verify_chunks_and_data_files(
}
}
for path in &existing_files {
if !claimed_data_files.contains(path) {
plan
}
/// Lists all data files under `data/`. This is the only object-store IO in
/// chunk/data-file verification.
async fn scan_data_files(storage: &OpenDalStorage) -> Result<VerifyDataScan> {
let existing_data_files = storage
.list_files_recursive("data/")
.await?
.into_iter()
.collect();
Ok(VerifyDataScan {
existing_data_files,
})
}
/// Reconciles the manifest plan against the storage scan. Pure; performs no IO.
///
/// Emits missing-file problems for expected files absent from storage and
/// unexpected-file problems for storage files no chunk claims. Unexpected files
/// are sorted by path so output is deterministic regardless of listing order.
fn reconcile_plan_with_scan(plan: VerifyPlan, scan: &VerifyDataScan) -> VerifyOutcome {
let mut problems = plan.problems;
let mut data_files_verified = 0;
for file in &plan.files_to_check {
if scan.existing_data_files.contains(&file.path) {
data_files_verified += 1;
} else {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Unexpected data file '{}' is not listed in manifest", path),
message: format!("Chunk {}: missing file '{}'", file.chunk_id, file.path),
});
}
}
report.data_files_total = data_files_total;
report.data_files_verified = data_files_verified;
report.problems.extend(problems);
let mut orphans: Vec<&String> = scan
.existing_data_files
.iter()
.filter(|path| !plan.claimed_data_files.contains(*path))
.collect();
orphans.sort();
for path in orphans {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Unexpected data file '{}' is not listed in manifest", path),
});
}
Ok(())
VerifyOutcome {
data_files_total: plan.data_files_total,
data_files_verified,
problems,
}
}
fn valid_manifest_data_file_path<'a>(
@@ -2294,6 +2374,90 @@ mod tests {
);
}
#[test]
fn test_build_verify_plan_classifies_chunks_without_io() {
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
// test_manifest(complete) gives: chunk 1 completed (1 file), chunk 2 skipped.
let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
failed.mark_failed("boom".to_string());
manifest.chunks.push(failed);
manifest
.chunks
.push(ChunkMeta::new(4, TimeRange::unbounded()));
let plan = build_verify_plan(&manifest);
assert_eq!(plan.files_to_check.len(), 1);
assert_eq!(plan.files_to_check[0].chunk_id, 1);
assert_eq!(plan.files_to_check[0].path, "data/public/1/file.parquet");
assert_eq!(plan.data_files_total, 1);
assert!(
plan.claimed_data_files
.contains("data/public/1/file.parquet")
);
assert_eq!(plan.problems.len(), 2);
assert!(
plan.problems
.iter()
.any(|problem| problem.message.contains("status is 'failed'"))
);
assert!(
plan.problems
.iter()
.any(|problem| problem.message.contains("status is 'pending'"))
);
}
#[tokio::test]
async fn test_verify_snapshot_produces_deterministic_problem_output() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
// Many orphan files under a known chunk prefix to stress ordering.
for i in 0..50 {
write_snapshot_file(
dir.path(),
&format!("data/public/1/orphan_{:02}.parquet", i),
b"x",
);
}
let storage = file_storage_for_dir(dir.path());
let messages = |report: &VerifyReport| {
report
.problems
.iter()
.map(|problem| problem.message.clone())
.collect::<Vec<_>>()
};
let first = messages(&verify_snapshot(&storage).await.unwrap());
let second = messages(&verify_snapshot(&storage).await.unwrap());
// Output is identical across runs despite HashSet-based scanning.
assert_eq!(first, second);
let orphans = first
.iter()
.filter(|message| message.contains("Unexpected data file"))
.cloned()
.collect::<Vec<_>>();
assert_eq!(orphans.len(), 50);
let mut sorted = orphans.clone();
sorted.sort();
assert_eq!(orphans, sorted);
}
fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) {
let snapshot_dir = root.join(dir);
std::fs::create_dir_all(&snapshot_dir).unwrap();