From aaffe1a048cab04ccec62f9fba3b1a837643e60e Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 2 Jun 2026 20:10:10 -0700 Subject: [PATCH] refactor(cli): split export-v2 verify into plan/scan/reconcile (#8206) * refactor(cli): split export-v2 verify into plan/scan/reconcile Signed-off-by: jeremyhi * fix: by AI comments Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi --- src/cli/src/data/export_v2/command.rs | 248 +++++++++++++++++++++----- 1 file changed, 206 insertions(+), 42 deletions(-) diff --git a/src/cli/src/data/export_v2/command.rs b/src/cli/src/data/export_v2/command.rs index db0f576a4e..bb027bbef1 100644 --- a/src/cli/src/data/export_v2/command.rs +++ b/src/cli/src/data/export_v2/command.rs @@ -1077,7 +1077,9 @@ async fn verify_snapshot(storage: &OpenDalStorage) -> Result { )); } let data_files = storage.list_files_recursive("data/").await?; - if let Some(path) = data_files.first() { + // Report the lexicographically smallest path so the message is stable + // regardless of listing order across backends. + if let Some(path) = data_files.iter().min() { report.push_error(format!( "Schema-only snapshot should not contain data files (found '{}')", path @@ -1103,75 +1105,113 @@ fn summarize_chunks(manifest: &Manifest) -> VerifyChunkSummary { } } +/// A data file declared by a completed chunk that is expected to exist in storage. +#[derive(Debug)] +struct ChunkFile { + chunk_id: u32, + path: String, +} + +/// Expected snapshot contents derived purely from the manifest (no object-store IO). +/// +/// Separating planning from scanning makes it obvious which problems come from +/// the manifest alone and which require comparing against actual storage. +#[derive(Debug, Default)] +struct VerifyPlan { + /// Valid data files declared by completed chunks; each must exist in storage. + files_to_check: Vec, + /// All syntactically-safe data paths declared by any chunk, regardless of + /// status. Used as the orphan-detection baseline so a listed-but-invalid + /// file is not also reported as unexpected. + claimed_data_files: HashSet, + /// Total data-file references in completed chunks (valid + invalid). + data_files_total: usize, + /// Problems detectable from the manifest alone. + problems: Vec, +} + +/// Actual data files discovered under `data/` (the only object-store IO in +/// chunk/data-file verification). +#[derive(Debug)] +struct VerifyDataScan { + existing_data_files: HashSet, +} + +/// Result of reconciling the manifest plan against the storage scan. +#[derive(Debug, Default)] +struct VerifyOutcome { + data_files_total: usize, + data_files_verified: usize, + problems: Vec, +} + async fn verify_chunks_and_data_files( storage: &OpenDalStorage, report: &mut VerifyReport, ) -> Result<()> { - let existing_files: HashSet<_> = storage - .list_files_recursive("data/") - .await? - .into_iter() - .collect(); - let mut data_files_total = 0; - let mut data_files_verified = 0; - let mut problems = Vec::new(); - let mut seen_chunk_ids = HashSet::new(); - let mut claimed_data_files = HashSet::new(); + let plan = build_verify_plan(&report.manifest); + let scan = scan_data_files(storage).await?; + let outcome = reconcile_plan_with_scan(plan, &scan); - for chunk in &report.manifest.chunks { + report.data_files_total = outcome.data_files_total; + report.data_files_verified = outcome.data_files_verified; + report.problems.extend(outcome.problems); + + Ok(()) +} + +/// Builds the expected-state plan from the manifest. Pure; performs no IO. +fn build_verify_plan(manifest: &Manifest) -> VerifyPlan { + let mut plan = VerifyPlan::default(); + let mut seen_chunk_ids = HashSet::new(); + + for chunk in &manifest.chunks { if !seen_chunk_ids.insert(chunk.id) { - problems.push(VerifyProblem { + plan.problems.push(VerifyProblem { severity: VerifySeverity::Error, message: format!("Chunk {}: duplicate chunk id", chunk.id), }); } for file in &chunk.files { if let Some(path) = safe_manifest_data_file_path(file) { - claimed_data_files.insert(path.to_string()); + plan.claimed_data_files.insert(path.to_string()); } } match chunk.status { ChunkStatus::Completed => { if chunk.files.is_empty() { - problems.push(VerifyProblem { + plan.problems.push(VerifyProblem { severity: VerifySeverity::Error, message: format!("Chunk {}: completed chunk has no data files", chunk.id), }); continue; } - let allowed_prefixes = report - .manifest + let allowed_prefixes = manifest .schemas .iter() .map(|schema| data_dir_for_schema_chunk(schema, chunk.id)) .collect::>(); for file in &chunk.files { - data_files_total += 1; - let Some(path) = valid_manifest_data_file_path(file, &allowed_prefixes) else { - problems.push(VerifyProblem { + plan.data_files_total += 1; + match valid_manifest_data_file_path(file, &allowed_prefixes) { + Some(path) => plan.files_to_check.push(ChunkFile { + chunk_id: chunk.id, + path: path.to_string(), + }), + None => plan.problems.push(VerifyProblem { severity: VerifySeverity::Error, message: format!( "Chunk {}: invalid data file path '{}'", chunk.id, file ), - }); - continue; - }; - - if existing_files.contains(path) { - data_files_verified += 1; - } else { - problems.push(VerifyProblem { - severity: VerifySeverity::Error, - message: format!("Chunk {}: missing file '{}'", chunk.id, path), - }); + }), } } } ChunkStatus::Skipped => { if !chunk.files.is_empty() { - problems.push(VerifyProblem { + plan.problems.push(VerifyProblem { severity: VerifySeverity::Error, message: format!( "Chunk {}: skipped chunk should not list data files", @@ -1181,20 +1221,20 @@ async fn verify_chunks_and_data_files( } } ChunkStatus::Pending => { - problems.push(VerifyProblem { + plan.problems.push(VerifyProblem { severity: VerifySeverity::Error, message: format!("Chunk {}: status is 'pending'", chunk.id), }); } ChunkStatus::InProgress => { - problems.push(VerifyProblem { + plan.problems.push(VerifyProblem { severity: VerifySeverity::Error, message: format!("Chunk {}: status is 'in_progress'", chunk.id), }); } ChunkStatus::Failed => { let reason = chunk.error.as_deref().unwrap_or("unknown error"); - problems.push(VerifyProblem { + plan.problems.push(VerifyProblem { severity: VerifySeverity::Error, message: format!("Chunk {}: status is 'failed' (error: {})", chunk.id, reason), }); @@ -1202,20 +1242,60 @@ async fn verify_chunks_and_data_files( } } - for path in &existing_files { - if !claimed_data_files.contains(path) { + plan +} + +/// Lists all data files under `data/`. This is the only object-store IO in +/// chunk/data-file verification. +async fn scan_data_files(storage: &OpenDalStorage) -> Result { + let existing_data_files = storage + .list_files_recursive("data/") + .await? + .into_iter() + .collect(); + Ok(VerifyDataScan { + existing_data_files, + }) +} + +/// Reconciles the manifest plan against the storage scan. Pure; performs no IO. +/// +/// Emits missing-file problems for expected files absent from storage and +/// unexpected-file problems for storage files no chunk claims. Unexpected files +/// are sorted by path so output is deterministic regardless of listing order. +fn reconcile_plan_with_scan(plan: VerifyPlan, scan: &VerifyDataScan) -> VerifyOutcome { + let mut problems = plan.problems; + let mut data_files_verified = 0; + + for file in &plan.files_to_check { + if scan.existing_data_files.contains(&file.path) { + data_files_verified += 1; + } else { problems.push(VerifyProblem { severity: VerifySeverity::Error, - message: format!("Unexpected data file '{}' is not listed in manifest", path), + message: format!("Chunk {}: missing file '{}'", file.chunk_id, file.path), }); } } - report.data_files_total = data_files_total; - report.data_files_verified = data_files_verified; - report.problems.extend(problems); + let mut orphans: Vec<&String> = scan + .existing_data_files + .iter() + .filter(|path| !plan.claimed_data_files.contains(*path)) + .collect(); + orphans.sort(); + for path in orphans { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Unexpected data file '{}' is not listed in manifest", path), + }); + } - Ok(()) + VerifyOutcome { + data_files_total: plan.data_files_total, + data_files_verified, + problems, + } } fn valid_manifest_data_file_path<'a>( @@ -2294,6 +2374,90 @@ mod tests { ); } + #[test] + fn test_build_verify_plan_classifies_chunks_without_io() { + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + // test_manifest(complete) gives: chunk 1 completed (1 file), chunk 2 skipped. + let mut failed = ChunkMeta::new(3, TimeRange::unbounded()); + failed.mark_failed("boom".to_string()); + manifest.chunks.push(failed); + manifest + .chunks + .push(ChunkMeta::new(4, TimeRange::unbounded())); + + let plan = build_verify_plan(&manifest); + + assert_eq!(plan.files_to_check.len(), 1); + assert_eq!(plan.files_to_check[0].chunk_id, 1); + assert_eq!(plan.files_to_check[0].path, "data/public/1/file.parquet"); + assert_eq!(plan.data_files_total, 1); + assert!( + plan.claimed_data_files + .contains("data/public/1/file.parquet") + ); + assert_eq!(plan.problems.len(), 2); + assert!( + plan.problems + .iter() + .any(|problem| problem.message.contains("status is 'failed'")) + ); + assert!( + plan.problems + .iter() + .any(|problem| problem.message.contains("status is 'pending'")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_produces_deterministic_problem_output() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + // Many orphan files under a known chunk prefix to stress ordering. + for i in 0..50 { + write_snapshot_file( + dir.path(), + &format!("data/public/1/orphan_{:02}.parquet", i), + b"x", + ); + } + + let storage = file_storage_for_dir(dir.path()); + let messages = |report: &VerifyReport| { + report + .problems + .iter() + .map(|problem| problem.message.clone()) + .collect::>() + }; + let first = messages(&verify_snapshot(&storage).await.unwrap()); + let second = messages(&verify_snapshot(&storage).await.unwrap()); + + // Output is identical across runs despite HashSet-based scanning. + assert_eq!(first, second); + + let orphans = first + .iter() + .filter(|message| message.contains("Unexpected data file")) + .cloned() + .collect::>(); + assert_eq!(orphans.len(), 50); + let mut sorted = orphans.clone(); + sorted.sort(); + assert_eq!(orphans, sorted); + } + fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) { let snapshot_dir = root.join(dir); std::fs::create_dir_all(&snapshot_dir).unwrap();