diff --git a/src/cli/src/data/export_v2/command.rs b/src/cli/src/data/export_v2/command.rs index a7da6d3862..3c069a72be 100644 --- a/src/cli/src/data/export_v2/command.rs +++ b/src/cli/src/data/export_v2/command.rs @@ -30,13 +30,14 @@ use crate::data::export_v2::coordinator::export_data; use crate::data::export_v2::error::{ ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu, ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu, - SchemaOnlyModeMismatchSnafu, UnexpectedValueTypeSnafu, + SchemaOnlyModeMismatchSnafu, SnapshotVerifyFailedSnafu, UnexpectedValueTypeSnafu, }; use crate::data::export_v2::extractor::SchemaExtractor; use crate::data::export_v2::manifest::{ - ChunkMeta, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange, + ChunkMeta, ChunkStatus, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange, }; -use crate::data::path::ddl_path_for_schema; +use crate::data::export_v2::schema::{DDL_DIR, SCHEMA_DIR, SCHEMAS_FILE}; +use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema}; use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri}; use crate::data::sql::{escape_sql_identifier, escape_sql_literal}; use crate::database::{DatabaseClient, parse_proxy_opts}; @@ -48,6 +49,8 @@ pub enum ExportV2Command { Create(ExportCreateCommand), /// List snapshots under a parent location. List(ExportListCommand), + /// Verify snapshot integrity. + Verify(ExportVerifyCommand), } impl ExportV2Command { @@ -55,6 +58,7 @@ impl ExportV2Command { match self { ExportV2Command::Create(cmd) => cmd.build().await, ExportV2Command::List(cmd) => cmd.build().await, + ExportV2Command::Verify(cmd) => cmd.build().await, } } } @@ -113,6 +117,61 @@ impl ExportList { } } +/// Verify snapshot integrity. +#[derive(Debug, Parser)] +pub struct ExportVerifyCommand { + /// Snapshot storage location (e.g., s3://bucket/path, file:///tmp/backup). + #[clap(long)] + snapshot: String, + + /// Object store configuration for remote storage backends. + #[clap(flatten)] + storage: ObjectStoreConfig, +} + +impl ExportVerifyCommand { + pub async fn build(&self) -> std::result::Result, BoxedError> { + validate_uri(&self.snapshot).map_err(BoxedError::new)?; + let storage = + OpenDalStorage::from_uri(&self.snapshot, &self.storage).map_err(BoxedError::new)?; + + Ok(Box::new(ExportVerify { + snapshot: self.snapshot.clone(), + storage, + })) + } +} + +/// Export verify tool implementation. +pub struct ExportVerify { + snapshot: String, + storage: OpenDalStorage, +} + +#[async_trait] +impl Tool for ExportVerify { + async fn do_work(&self) -> std::result::Result<(), BoxedError> { + self.run().await.map_err(BoxedError::new) + } +} + +impl ExportVerify { + async fn run(&self) -> Result<()> { + let report = verify_snapshot(&self.storage).await?; + print_verify_report(&self.snapshot, &report); + + if report.has_problems() { + return SnapshotVerifyFailedSnafu { + errors: report.error_count(), + warnings: report.warning_count(), + } + .fail(); + } + + Ok(()) + } +} + /// Create a new snapshot. #[derive(Debug, Parser)] pub struct ExportCreateCommand { @@ -811,6 +870,375 @@ fn format_list_chunks(manifest: &Manifest) -> String { ) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum VerifySeverity { + Error, + Warn, +} + +impl VerifySeverity { + fn as_str(self) -> &'static str { + match self { + VerifySeverity::Error => "ERROR", + VerifySeverity::Warn => "WARN", + } + } +} + +#[derive(Debug)] +struct VerifyProblem { + severity: VerifySeverity, + message: String, +} + +#[derive(Debug, Default)] +struct VerifyChunkSummary { + total: usize, + completed: usize, + skipped: usize, + pending: usize, + in_progress: usize, + failed: usize, +} + +#[derive(Debug)] +struct VerifyReport { + manifest: Manifest, + schema_index_exists: bool, + ddl_file_count: usize, + chunk_summary: VerifyChunkSummary, + data_files_total: usize, + data_files_verified: usize, + problems: Vec, +} + +impl VerifyReport { + fn error_count(&self) -> usize { + self.problems + .iter() + .filter(|problem| problem.severity == VerifySeverity::Error) + .count() + } + + fn warning_count(&self) -> usize { + self.problems + .iter() + .filter(|problem| problem.severity == VerifySeverity::Warn) + .count() + } + + fn has_problems(&self) -> bool { + !self.problems.is_empty() + } + + fn push_error(&mut self, message: impl Into) { + self.problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: message.into(), + }); + } + + fn push_warn(&mut self, message: impl Into) { + self.problems.push(VerifyProblem { + severity: VerifySeverity::Warn, + message: message.into(), + }); + } +} + +async fn verify_snapshot(storage: &OpenDalStorage) -> Result { + let manifest = storage.read_manifest().await?; + let schema_index_path = format!("{}/{}", SCHEMA_DIR, SCHEMAS_FILE); + let ddl_prefix = format!("{}/{}/", SCHEMA_DIR, DDL_DIR); + let schema_index_exists = storage.file_exists(&schema_index_path).await?; + let ddl_files: HashSet<_> = storage + .list_files_recursive(&ddl_prefix) + .await? + .into_iter() + .collect(); + let ddl_file_count = ddl_files + .iter() + .filter(|path| path.ends_with(".sql")) + .count(); + + let mut report = VerifyReport { + manifest, + schema_index_exists, + ddl_file_count, + chunk_summary: VerifyChunkSummary::default(), + data_files_total: 0, + data_files_verified: 0, + problems: Vec::new(), + }; + + if report.manifest.version != MANIFEST_VERSION { + report.push_error(format!( + "Manifest version mismatch: expected {}, found {}", + MANIFEST_VERSION, report.manifest.version + )); + } + + if !report.schema_index_exists { + report.push_warn(format!("Missing schema index '{}'", schema_index_path)); + } + + for schema in &report.manifest.schemas { + let ddl_path = ddl_path_for_schema(schema); + if !ddl_files.contains(ddl_path.as_str()) { + report.problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Schema '{}': missing DDL file '{}'", schema, ddl_path), + }); + } + } + + report.chunk_summary = summarize_chunks(&report.manifest); + if report.manifest.schema_only { + let chunk_count = report.manifest.chunks.len(); + if chunk_count > 0 { + report.push_error(format!( + "Schema-only snapshot should not contain data chunks (found {})", + chunk_count + )); + } + let data_files = storage.list_files_recursive("data/").await?; + if let Some(path) = data_files.first() { + report.push_error(format!( + "Schema-only snapshot should not contain data files (found '{}')", + path + )); + } + } else if report.manifest.chunks.is_empty() { + report.push_error("Full snapshot should contain at least one data chunk"); + } else { + verify_chunks_and_data_files(storage, &mut report).await?; + } + + Ok(report) +} + +fn summarize_chunks(manifest: &Manifest) -> VerifyChunkSummary { + VerifyChunkSummary { + total: manifest.chunks.len(), + completed: manifest.completed_count(), + skipped: manifest.skipped_count(), + pending: manifest.pending_count(), + in_progress: manifest.in_progress_count(), + failed: manifest.failed_count(), + } +} + +async fn verify_chunks_and_data_files( + storage: &OpenDalStorage, + report: &mut VerifyReport, +) -> Result<()> { + let existing_files: HashSet<_> = storage + .list_files_recursive("data/") + .await? + .into_iter() + .collect(); + let mut data_files_total = 0; + let mut data_files_verified = 0; + let mut problems = Vec::new(); + let mut seen_chunk_ids = HashSet::new(); + let mut claimed_data_files = HashSet::new(); + + for chunk in &report.manifest.chunks { + if !seen_chunk_ids.insert(chunk.id) { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Chunk {}: duplicate chunk id", chunk.id), + }); + } + for file in &chunk.files { + if let Some(path) = safe_manifest_data_file_path(file) { + claimed_data_files.insert(path.to_string()); + } + } + + match chunk.status { + ChunkStatus::Completed => { + if chunk.files.is_empty() { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Chunk {}: completed chunk has no data files", chunk.id), + }); + continue; + } + let allowed_prefixes = report + .manifest + .schemas + .iter() + .map(|schema| data_dir_for_schema_chunk(schema, chunk.id)) + .collect::>(); + for file in &chunk.files { + data_files_total += 1; + let Some(path) = valid_manifest_data_file_path(file, &allowed_prefixes) else { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!( + "Chunk {}: invalid data file path '{}'", + chunk.id, file + ), + }); + continue; + }; + + if existing_files.contains(path) { + data_files_verified += 1; + } else { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Chunk {}: missing file '{}'", chunk.id, path), + }); + } + } + } + ChunkStatus::Skipped => { + if !chunk.files.is_empty() { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!( + "Chunk {}: skipped chunk should not list data files", + chunk.id + ), + }); + } + } + ChunkStatus::Pending => { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Chunk {}: status is 'pending'", chunk.id), + }); + } + ChunkStatus::InProgress => { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Chunk {}: status is 'in_progress'", chunk.id), + }); + } + ChunkStatus::Failed => { + let reason = chunk.error.as_deref().unwrap_or("unknown error"); + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Chunk {}: status is 'failed' (error: {})", chunk.id, reason), + }); + } + } + } + + for path in &existing_files { + if !claimed_data_files.contains(path) { + problems.push(VerifyProblem { + severity: VerifySeverity::Error, + message: format!("Unexpected data file '{}' is not listed in manifest", path), + }); + } + } + + report.data_files_total = data_files_total; + report.data_files_verified = data_files_verified; + report.problems.extend(problems); + + Ok(()) +} + +fn valid_manifest_data_file_path<'a>( + path: &'a str, + allowed_prefixes: &[String], +) -> Option<&'a str> { + let normalized = safe_manifest_data_file_path(path)?; + + if !allowed_prefixes + .iter() + .any(|prefix| normalized.starts_with(prefix)) + { + return None; + } + + Some(normalized) +} + +fn safe_manifest_data_file_path(path: &str) -> Option<&str> { + let normalized = path.trim_start_matches('/'); + if normalized.is_empty() || !normalized.starts_with("data/") { + return None; + } + + if normalized + .split('/') + .any(|segment| segment.is_empty() || segment == "." || segment == "..") + { + return None; + } + + Some(normalized) +} + +fn print_verify_report(snapshot: &str, report: &VerifyReport) { + println!("Verifying snapshot: {}", report.manifest.snapshot_id); + println!(" Location: {}", snapshot); + if report.manifest.version == MANIFEST_VERSION { + println!(" Manifest: OK (version {})", report.manifest.version); + } else { + println!( + " Manifest: ERROR (version {}, expected {})", + report.manifest.version, MANIFEST_VERSION + ); + } + println!( + " Schema files: {}", + if report.schema_index_exists { + format!("OK ({})", SCHEMAS_FILE) + } else { + format!("WARN (missing {})", SCHEMAS_FILE) + } + ); + if report.ddl_file_count > 0 { + println!(" DDL files: {} file(s) found", report.ddl_file_count); + } else { + println!(" DDL files: not present"); + } + + let chunks = &report.chunk_summary; + println!( + " Chunks: {} total ({} completed, {} skipped, {} pending, {} in_progress, {} failed)", + chunks.total, + chunks.completed, + chunks.skipped, + chunks.pending, + chunks.in_progress, + chunks.failed + ); + + if report.manifest.schema_only { + println!(" Data files: skipped (schema-only)"); + } else { + println!( + " Data files: {}/{} files verified", + report.data_files_verified, report.data_files_total + ); + } + + if report.problems.is_empty() { + println!(); + println!("Snapshot is valid."); + return; + } + + println!(); + println!("Problems found:"); + for problem in &report.problems { + println!(" [{}] {}", problem.severity.as_str(), problem.message); + } + println!(); + println!( + "Snapshot has {} error(s), {} warning(s).", + report.error_count(), + report.warning_count() + ); +} + #[cfg(test)] mod tests { use chrono::TimeZone; @@ -1145,6 +1573,434 @@ mod tests { assert_eq!(format_list_chunks(&incomplete), "1/2"); } + #[tokio::test] + async fn test_verify_snapshot_accepts_valid_full_snapshot() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 0); + assert_eq!(report.warning_count(), 0); + assert_eq!(report.data_files_total, 1); + assert_eq!(report.data_files_verified, 1); + } + + #[tokio::test] + async fn test_verify_snapshot_reports_missing_data_file_and_failed_chunk() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + manifest.chunks[1].mark_failed("copy failed".to_string()); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 2); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("missing file")) + ); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("status is 'failed'")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_reports_missing_schema_index_as_warning() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + write_root_manifest(dir.path(), manifest); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 0); + assert_eq!(report.warning_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("Missing schema index")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_schema_only_snapshot_with_chunks() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + true, + true, + ); + let mut chunk = ChunkMeta::new(1, TimeRange::unbounded()); + chunk.mark_completed(vec!["data/public/1/file.parquet".to_string()], None); + manifest.chunks.push(chunk); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert_eq!(report.data_files_total, 0); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("should not contain data chunks")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_schema_only_snapshot_with_data_files() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + true, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert_eq!(report.data_files_total, 0); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("should not contain data files")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_full_snapshot_without_chunks() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + manifest.chunks.clear(); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert_eq!(report.data_files_total, 0); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("at least one data chunk")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_skipped_chunk_data_files() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + write_snapshot_file(dir.path(), "data/public/2/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| { problem.message.contains("Unexpected data file") }) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_duplicate_chunk_ids() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + let mut duplicate = ChunkMeta::new(1, TimeRange::unbounded()); + duplicate.mark_completed(vec!["data/public/1/file.parquet".to_string()], None); + manifest.chunks.push(duplicate); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("duplicate chunk id")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_requires_all_schema_ddl() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + true, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_snapshot_file( + dir.path(), + "schema/ddl/public.sql", + b"CREATE DATABASE public;", + ); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("analytics")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_reports_missing_ddl_dir() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 2); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("schema/ddl/public.sql")) + ); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("schema/ddl/analytics.sql")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_reports_manifest_version_mismatch() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + manifest.version = MANIFEST_VERSION + 1; + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("Manifest version mismatch")) + ); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_invalid_data_file_paths() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + manifest.chunks[0].files = vec!["data/public/1/../file.parquet".to_string()]; + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("invalid data file path")) + ); + assert_eq!(report.data_files_verified, 0); + } + + #[tokio::test] + async fn test_verify_snapshot_accepts_leading_slash_manifest_data_paths() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + manifest.chunks[0].files = vec!["/data/public/1/file.parquet".to_string()]; + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 0); + assert_eq!(report.data_files_verified, 1); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_unlisted_files_under_completed_chunk_prefix() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + write_snapshot_file(dir.path(), "data/public/1/extra.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("Unexpected data file")) + ); + assert_eq!(report.data_files_verified, 1); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_orphan_data_files_outside_known_chunk_prefixes() { + let dir = tempdir().unwrap(); + let manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data"); + write_snapshot_file(dir.path(), "data/public/99/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 1); + assert!( + report + .problems + .iter() + .any(|problem| problem.message.contains("Unexpected data file")) + ); + assert_eq!(report.data_files_verified, 1); + } + + #[tokio::test] + async fn test_verify_snapshot_rejects_data_files_under_wrong_chunk_or_schema() { + let dir = tempdir().unwrap(); + let mut manifest = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + manifest.chunks[0].files = vec![ + "data/public/99/file.parquet".to_string(), + "data/metrics/1/file.parquet".to_string(), + ]; + write_root_manifest(dir.path(), manifest); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + write_default_ddl_files(dir.path()); + write_snapshot_file(dir.path(), "data/public/99/file.parquet", b"data"); + write_snapshot_file(dir.path(), "data/metrics/1/file.parquet", b"data"); + + let storage = file_storage_for_dir(dir.path()); + let report = verify_snapshot(&storage).await.unwrap(); + + assert_eq!(report.error_count(), 2); + assert_eq!(report.data_files_verified, 0); + assert!( + report + .problems + .iter() + .all(|problem| problem.message.contains("invalid data file path")) + ); + } + fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) { let snapshot_dir = root.join(dir); std::fs::create_dir_all(&snapshot_dir).unwrap(); @@ -1155,6 +2011,37 @@ mod tests { .unwrap(); } + fn write_root_manifest(root: &std::path::Path, manifest: Manifest) { + std::fs::write( + root.join(MANIFEST_FILE), + serde_json::to_vec_pretty(&manifest).unwrap(), + ) + .unwrap(); + } + + fn write_snapshot_file(root: &std::path::Path, relative_path: &str, content: &[u8]) { + let mut path = root.to_path_buf(); + for segment in relative_path.split('/') { + path.push(segment); + } + std::fs::create_dir_all(path.parent().unwrap()).unwrap(); + std::fs::write(path, content).unwrap(); + } + + fn write_default_ddl_files(root: &std::path::Path) { + write_snapshot_file(root, "schema/ddl/public.sql", b"CREATE DATABASE public;"); + write_snapshot_file( + root, + "schema/ddl/analytics.sql", + b"CREATE DATABASE analytics;", + ); + } + + fn file_storage_for_dir(root: &std::path::Path) -> OpenDalStorage { + let uri = Url::from_directory_path(root).unwrap().to_string(); + OpenDalStorage::from_file_uri(&uri).unwrap() + } + fn test_manifest( created_at: chrono::DateTime, schema_only: bool, @@ -1175,7 +2062,7 @@ mod tests { if !schema_only { manifest.chunks.clear(); let mut first = ChunkMeta::new(1, TimeRange::unbounded()); - first.mark_completed(vec!["data/public/chunk_1/file.parquet".to_string()], None); + first.mark_completed(vec!["data/public/1/file.parquet".to_string()], None); manifest.chunks.push(first); if complete { diff --git a/src/cli/src/data/export_v2/error.rs b/src/cli/src/data/export_v2/error.rs index ec860fecfa..8d9a53f186 100644 --- a/src/cli/src/data/export_v2/error.rs +++ b/src/cli/src/data/export_v2/error.rs @@ -183,6 +183,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Snapshot verification failed: {} error(s), {} warning(s)", + errors, + warnings + ))] + SnapshotVerifyFailed { + errors: usize, + warnings: usize, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -195,7 +207,8 @@ impl ErrorExt for Error { | Error::SchemaOnlyModeMismatch { .. } | Error::ResumeConfigMismatch { .. } | Error::ManifestVersionMismatch { .. } - | Error::SchemaOnlyArgsNotAllowed { .. } => StatusCode::InvalidArguments, + | Error::SchemaOnlyArgsNotAllowed { .. } + | Error::SnapshotVerifyFailed { .. } => StatusCode::InvalidArguments, Error::TimeParseInvalidFormat { .. } | Error::TimeParseEndBeforeStart { .. } | Error::ChunkTimeWindowRequiresBounds { .. } => StatusCode::InvalidArguments, diff --git a/src/cli/src/data/snapshot_storage.rs b/src/cli/src/data/snapshot_storage.rs index 2ee07b5586..da8fdf6ab1 100644 --- a/src/cli/src/data/snapshot_storage.rs +++ b/src/cli/src/data/snapshot_storage.rs @@ -483,9 +483,9 @@ impl OpenDalStorage { } /// Checks if a file exists using stat. - async fn file_exists(&self, path: &str) -> Result { + pub(crate) async fn file_exists(&self, path: &str) -> Result { match self.object_store.stat(path).await { - Ok(_) => Ok(true), + Ok(metadata) => Ok(!metadata.is_dir()), Err(e) if e.kind() == object_store::ErrorKind::NotFound => Ok(false), Err(e) => Err(e).context(StorageOperationSnafu { operation: format!("check exists {}", path),