feat: add export-v2 snapshot verification (#8111)

* feat: add export-v2 snapshot verification

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: reject full export snapshots without chunks

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: enforce strict export snapshot data mainifest

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-05-19 00:15:51 -07:00
committed by GitHub
parent b97f630adb
commit 72434ee5d7
3 changed files with 907 additions and 7 deletions

View File

@@ -30,13 +30,14 @@ use crate::data::export_v2::coordinator::export_data;
use crate::data::export_v2::error::{
ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu,
ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu,
SchemaOnlyModeMismatchSnafu, UnexpectedValueTypeSnafu,
SchemaOnlyModeMismatchSnafu, SnapshotVerifyFailedSnafu, UnexpectedValueTypeSnafu,
};
use crate::data::export_v2::extractor::SchemaExtractor;
use crate::data::export_v2::manifest::{
ChunkMeta, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange,
ChunkMeta, ChunkStatus, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange,
};
use crate::data::path::ddl_path_for_schema;
use crate::data::export_v2::schema::{DDL_DIR, SCHEMA_DIR, SCHEMAS_FILE};
use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
use crate::database::{DatabaseClient, parse_proxy_opts};
@@ -48,6 +49,8 @@ pub enum ExportV2Command {
Create(ExportCreateCommand),
/// List snapshots under a parent location.
List(ExportListCommand),
/// Verify snapshot integrity.
Verify(ExportVerifyCommand),
}
impl ExportV2Command {
@@ -55,6 +58,7 @@ impl ExportV2Command {
match self {
ExportV2Command::Create(cmd) => cmd.build().await,
ExportV2Command::List(cmd) => cmd.build().await,
ExportV2Command::Verify(cmd) => cmd.build().await,
}
}
}
@@ -113,6 +117,61 @@ impl ExportList {
}
}
/// Verify snapshot integrity.
#[derive(Debug, Parser)]
pub struct ExportVerifyCommand {
/// Snapshot storage location (e.g., s3://bucket/path, file:///tmp/backup).
#[clap(long)]
snapshot: String,
/// Object store configuration for remote storage backends.
#[clap(flatten)]
storage: ObjectStoreConfig,
}
impl ExportVerifyCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
validate_uri(&self.snapshot).map_err(BoxedError::new)?;
let storage =
OpenDalStorage::from_uri(&self.snapshot, &self.storage).map_err(BoxedError::new)?;
Ok(Box::new(ExportVerify {
snapshot: self.snapshot.clone(),
storage,
}))
}
}
/// Export verify tool implementation.
pub struct ExportVerify {
snapshot: String,
storage: OpenDalStorage,
}
#[async_trait]
impl Tool for ExportVerify {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
self.run().await.map_err(BoxedError::new)
}
}
impl ExportVerify {
async fn run(&self) -> Result<()> {
let report = verify_snapshot(&self.storage).await?;
print_verify_report(&self.snapshot, &report);
if report.has_problems() {
return SnapshotVerifyFailedSnafu {
errors: report.error_count(),
warnings: report.warning_count(),
}
.fail();
}
Ok(())
}
}
/// Create a new snapshot.
#[derive(Debug, Parser)]
pub struct ExportCreateCommand {
@@ -811,6 +870,375 @@ fn format_list_chunks(manifest: &Manifest) -> String {
)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum VerifySeverity {
Error,
Warn,
}
impl VerifySeverity {
fn as_str(self) -> &'static str {
match self {
VerifySeverity::Error => "ERROR",
VerifySeverity::Warn => "WARN",
}
}
}
#[derive(Debug)]
struct VerifyProblem {
severity: VerifySeverity,
message: String,
}
#[derive(Debug, Default)]
struct VerifyChunkSummary {
total: usize,
completed: usize,
skipped: usize,
pending: usize,
in_progress: usize,
failed: usize,
}
#[derive(Debug)]
struct VerifyReport {
manifest: Manifest,
schema_index_exists: bool,
ddl_file_count: usize,
chunk_summary: VerifyChunkSummary,
data_files_total: usize,
data_files_verified: usize,
problems: Vec<VerifyProblem>,
}
impl VerifyReport {
fn error_count(&self) -> usize {
self.problems
.iter()
.filter(|problem| problem.severity == VerifySeverity::Error)
.count()
}
fn warning_count(&self) -> usize {
self.problems
.iter()
.filter(|problem| problem.severity == VerifySeverity::Warn)
.count()
}
fn has_problems(&self) -> bool {
!self.problems.is_empty()
}
fn push_error(&mut self, message: impl Into<String>) {
self.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: message.into(),
});
}
fn push_warn(&mut self, message: impl Into<String>) {
self.problems.push(VerifyProblem {
severity: VerifySeverity::Warn,
message: message.into(),
});
}
}
async fn verify_snapshot(storage: &OpenDalStorage) -> Result<VerifyReport> {
let manifest = storage.read_manifest().await?;
let schema_index_path = format!("{}/{}", SCHEMA_DIR, SCHEMAS_FILE);
let ddl_prefix = format!("{}/{}/", SCHEMA_DIR, DDL_DIR);
let schema_index_exists = storage.file_exists(&schema_index_path).await?;
let ddl_files: HashSet<_> = storage
.list_files_recursive(&ddl_prefix)
.await?
.into_iter()
.collect();
let ddl_file_count = ddl_files
.iter()
.filter(|path| path.ends_with(".sql"))
.count();
let mut report = VerifyReport {
manifest,
schema_index_exists,
ddl_file_count,
chunk_summary: VerifyChunkSummary::default(),
data_files_total: 0,
data_files_verified: 0,
problems: Vec::new(),
};
if report.manifest.version != MANIFEST_VERSION {
report.push_error(format!(
"Manifest version mismatch: expected {}, found {}",
MANIFEST_VERSION, report.manifest.version
));
}
if !report.schema_index_exists {
report.push_warn(format!("Missing schema index '{}'", schema_index_path));
}
for schema in &report.manifest.schemas {
let ddl_path = ddl_path_for_schema(schema);
if !ddl_files.contains(ddl_path.as_str()) {
report.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Schema '{}': missing DDL file '{}'", schema, ddl_path),
});
}
}
report.chunk_summary = summarize_chunks(&report.manifest);
if report.manifest.schema_only {
let chunk_count = report.manifest.chunks.len();
if chunk_count > 0 {
report.push_error(format!(
"Schema-only snapshot should not contain data chunks (found {})",
chunk_count
));
}
let data_files = storage.list_files_recursive("data/").await?;
if let Some(path) = data_files.first() {
report.push_error(format!(
"Schema-only snapshot should not contain data files (found '{}')",
path
));
}
} else if report.manifest.chunks.is_empty() {
report.push_error("Full snapshot should contain at least one data chunk");
} else {
verify_chunks_and_data_files(storage, &mut report).await?;
}
Ok(report)
}
fn summarize_chunks(manifest: &Manifest) -> VerifyChunkSummary {
VerifyChunkSummary {
total: manifest.chunks.len(),
completed: manifest.completed_count(),
skipped: manifest.skipped_count(),
pending: manifest.pending_count(),
in_progress: manifest.in_progress_count(),
failed: manifest.failed_count(),
}
}
async fn verify_chunks_and_data_files(
storage: &OpenDalStorage,
report: &mut VerifyReport,
) -> Result<()> {
let existing_files: HashSet<_> = storage
.list_files_recursive("data/")
.await?
.into_iter()
.collect();
let mut data_files_total = 0;
let mut data_files_verified = 0;
let mut problems = Vec::new();
let mut seen_chunk_ids = HashSet::new();
let mut claimed_data_files = HashSet::new();
for chunk in &report.manifest.chunks {
if !seen_chunk_ids.insert(chunk.id) {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: duplicate chunk id", chunk.id),
});
}
for file in &chunk.files {
if let Some(path) = safe_manifest_data_file_path(file) {
claimed_data_files.insert(path.to_string());
}
}
match chunk.status {
ChunkStatus::Completed => {
if chunk.files.is_empty() {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: completed chunk has no data files", chunk.id),
});
continue;
}
let allowed_prefixes = report
.manifest
.schemas
.iter()
.map(|schema| data_dir_for_schema_chunk(schema, chunk.id))
.collect::<Vec<_>>();
for file in &chunk.files {
data_files_total += 1;
let Some(path) = valid_manifest_data_file_path(file, &allowed_prefixes) else {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!(
"Chunk {}: invalid data file path '{}'",
chunk.id, file
),
});
continue;
};
if existing_files.contains(path) {
data_files_verified += 1;
} else {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: missing file '{}'", chunk.id, path),
});
}
}
}
ChunkStatus::Skipped => {
if !chunk.files.is_empty() {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!(
"Chunk {}: skipped chunk should not list data files",
chunk.id
),
});
}
}
ChunkStatus::Pending => {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'pending'", chunk.id),
});
}
ChunkStatus::InProgress => {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'in_progress'", chunk.id),
});
}
ChunkStatus::Failed => {
let reason = chunk.error.as_deref().unwrap_or("unknown error");
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'failed' (error: {})", chunk.id, reason),
});
}
}
}
for path in &existing_files {
if !claimed_data_files.contains(path) {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Unexpected data file '{}' is not listed in manifest", path),
});
}
}
report.data_files_total = data_files_total;
report.data_files_verified = data_files_verified;
report.problems.extend(problems);
Ok(())
}
fn valid_manifest_data_file_path<'a>(
path: &'a str,
allowed_prefixes: &[String],
) -> Option<&'a str> {
let normalized = safe_manifest_data_file_path(path)?;
if !allowed_prefixes
.iter()
.any(|prefix| normalized.starts_with(prefix))
{
return None;
}
Some(normalized)
}
fn safe_manifest_data_file_path(path: &str) -> Option<&str> {
let normalized = path.trim_start_matches('/');
if normalized.is_empty() || !normalized.starts_with("data/") {
return None;
}
if normalized
.split('/')
.any(|segment| segment.is_empty() || segment == "." || segment == "..")
{
return None;
}
Some(normalized)
}
fn print_verify_report(snapshot: &str, report: &VerifyReport) {
println!("Verifying snapshot: {}", report.manifest.snapshot_id);
println!(" Location: {}", snapshot);
if report.manifest.version == MANIFEST_VERSION {
println!(" Manifest: OK (version {})", report.manifest.version);
} else {
println!(
" Manifest: ERROR (version {}, expected {})",
report.manifest.version, MANIFEST_VERSION
);
}
println!(
" Schema files: {}",
if report.schema_index_exists {
format!("OK ({})", SCHEMAS_FILE)
} else {
format!("WARN (missing {})", SCHEMAS_FILE)
}
);
if report.ddl_file_count > 0 {
println!(" DDL files: {} file(s) found", report.ddl_file_count);
} else {
println!(" DDL files: not present");
}
let chunks = &report.chunk_summary;
println!(
" Chunks: {} total ({} completed, {} skipped, {} pending, {} in_progress, {} failed)",
chunks.total,
chunks.completed,
chunks.skipped,
chunks.pending,
chunks.in_progress,
chunks.failed
);
if report.manifest.schema_only {
println!(" Data files: skipped (schema-only)");
} else {
println!(
" Data files: {}/{} files verified",
report.data_files_verified, report.data_files_total
);
}
if report.problems.is_empty() {
println!();
println!("Snapshot is valid.");
return;
}
println!();
println!("Problems found:");
for problem in &report.problems {
println!(" [{}] {}", problem.severity.as_str(), problem.message);
}
println!();
println!(
"Snapshot has {} error(s), {} warning(s).",
report.error_count(),
report.warning_count()
);
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
@@ -1145,6 +1573,434 @@ mod tests {
assert_eq!(format_list_chunks(&incomplete), "1/2");
}
#[tokio::test]
async fn test_verify_snapshot_accepts_valid_full_snapshot() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 0);
assert_eq!(report.warning_count(), 0);
assert_eq!(report.data_files_total, 1);
assert_eq!(report.data_files_verified, 1);
}
#[tokio::test]
async fn test_verify_snapshot_reports_missing_data_file_and_failed_chunk() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
manifest.chunks[1].mark_failed("copy failed".to_string());
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 2);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("missing file"))
);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("status is 'failed'"))
);
}
#[tokio::test]
async fn test_verify_snapshot_reports_missing_schema_index_as_warning() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
write_root_manifest(dir.path(), manifest);
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 0);
assert_eq!(report.warning_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("Missing schema index"))
);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_schema_only_snapshot_with_chunks() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
true,
true,
);
let mut chunk = ChunkMeta::new(1, TimeRange::unbounded());
chunk.mark_completed(vec!["data/public/1/file.parquet".to_string()], None);
manifest.chunks.push(chunk);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert_eq!(report.data_files_total, 0);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("should not contain data chunks"))
);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_schema_only_snapshot_with_data_files() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
true,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert_eq!(report.data_files_total, 0);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("should not contain data files"))
);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_full_snapshot_without_chunks() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
manifest.chunks.clear();
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert_eq!(report.data_files_total, 0);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("at least one data chunk"))
);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_skipped_chunk_data_files() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
write_snapshot_file(dir.path(), "data/public/2/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| { problem.message.contains("Unexpected data file") })
);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_duplicate_chunk_ids() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
let mut duplicate = ChunkMeta::new(1, TimeRange::unbounded());
duplicate.mark_completed(vec!["data/public/1/file.parquet".to_string()], None);
manifest.chunks.push(duplicate);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("duplicate chunk id"))
);
}
#[tokio::test]
async fn test_verify_snapshot_requires_all_schema_ddl() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
true,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_snapshot_file(
dir.path(),
"schema/ddl/public.sql",
b"CREATE DATABASE public;",
);
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("analytics"))
);
}
#[tokio::test]
async fn test_verify_snapshot_reports_missing_ddl_dir() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 2);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("schema/ddl/public.sql"))
);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("schema/ddl/analytics.sql"))
);
}
#[tokio::test]
async fn test_verify_snapshot_reports_manifest_version_mismatch() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
manifest.version = MANIFEST_VERSION + 1;
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("Manifest version mismatch"))
);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_invalid_data_file_paths() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
manifest.chunks[0].files = vec!["data/public/1/../file.parquet".to_string()];
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("invalid data file path"))
);
assert_eq!(report.data_files_verified, 0);
}
#[tokio::test]
async fn test_verify_snapshot_accepts_leading_slash_manifest_data_paths() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
manifest.chunks[0].files = vec!["/data/public/1/file.parquet".to_string()];
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 0);
assert_eq!(report.data_files_verified, 1);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_unlisted_files_under_completed_chunk_prefix() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
write_snapshot_file(dir.path(), "data/public/1/extra.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("Unexpected data file"))
);
assert_eq!(report.data_files_verified, 1);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_orphan_data_files_outside_known_chunk_prefixes() {
let dir = tempdir().unwrap();
let manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
write_snapshot_file(dir.path(), "data/public/99/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 1);
assert!(
report
.problems
.iter()
.any(|problem| problem.message.contains("Unexpected data file"))
);
assert_eq!(report.data_files_verified, 1);
}
#[tokio::test]
async fn test_verify_snapshot_rejects_data_files_under_wrong_chunk_or_schema() {
let dir = tempdir().unwrap();
let mut manifest = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
manifest.chunks[0].files = vec![
"data/public/99/file.parquet".to_string(),
"data/metrics/1/file.parquet".to_string(),
];
write_root_manifest(dir.path(), manifest);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
write_default_ddl_files(dir.path());
write_snapshot_file(dir.path(), "data/public/99/file.parquet", b"data");
write_snapshot_file(dir.path(), "data/metrics/1/file.parquet", b"data");
let storage = file_storage_for_dir(dir.path());
let report = verify_snapshot(&storage).await.unwrap();
assert_eq!(report.error_count(), 2);
assert_eq!(report.data_files_verified, 0);
assert!(
report
.problems
.iter()
.all(|problem| problem.message.contains("invalid data file path"))
);
}
fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) {
let snapshot_dir = root.join(dir);
std::fs::create_dir_all(&snapshot_dir).unwrap();
@@ -1155,6 +2011,37 @@ mod tests {
.unwrap();
}
fn write_root_manifest(root: &std::path::Path, manifest: Manifest) {
std::fs::write(
root.join(MANIFEST_FILE),
serde_json::to_vec_pretty(&manifest).unwrap(),
)
.unwrap();
}
fn write_snapshot_file(root: &std::path::Path, relative_path: &str, content: &[u8]) {
let mut path = root.to_path_buf();
for segment in relative_path.split('/') {
path.push(segment);
}
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
std::fs::write(path, content).unwrap();
}
fn write_default_ddl_files(root: &std::path::Path) {
write_snapshot_file(root, "schema/ddl/public.sql", b"CREATE DATABASE public;");
write_snapshot_file(
root,
"schema/ddl/analytics.sql",
b"CREATE DATABASE analytics;",
);
}
fn file_storage_for_dir(root: &std::path::Path) -> OpenDalStorage {
let uri = Url::from_directory_path(root).unwrap().to_string();
OpenDalStorage::from_file_uri(&uri).unwrap()
}
fn test_manifest(
created_at: chrono::DateTime<chrono::Utc>,
schema_only: bool,
@@ -1175,7 +2062,7 @@ mod tests {
if !schema_only {
manifest.chunks.clear();
let mut first = ChunkMeta::new(1, TimeRange::unbounded());
first.mark_completed(vec!["data/public/chunk_1/file.parquet".to_string()], None);
first.mark_completed(vec!["data/public/1/file.parquet".to_string()], None);
manifest.chunks.push(first);
if complete {

View File

@@ -183,6 +183,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Snapshot verification failed: {} error(s), {} warning(s)",
errors,
warnings
))]
SnapshotVerifyFailed {
errors: usize,
warnings: usize,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -195,7 +207,8 @@ impl ErrorExt for Error {
| Error::SchemaOnlyModeMismatch { .. }
| Error::ResumeConfigMismatch { .. }
| Error::ManifestVersionMismatch { .. }
| Error::SchemaOnlyArgsNotAllowed { .. } => StatusCode::InvalidArguments,
| Error::SchemaOnlyArgsNotAllowed { .. }
| Error::SnapshotVerifyFailed { .. } => StatusCode::InvalidArguments,
Error::TimeParseInvalidFormat { .. }
| Error::TimeParseEndBeforeStart { .. }
| Error::ChunkTimeWindowRequiresBounds { .. } => StatusCode::InvalidArguments,

View File

@@ -483,9 +483,9 @@ impl OpenDalStorage {
}
/// Checks if a file exists using stat.
async fn file_exists(&self, path: &str) -> Result<bool> {
pub(crate) async fn file_exists(&self, path: &str) -> Result<bool> {
match self.object_store.stat(path).await {
Ok(_) => Ok(true),
Ok(metadata) => Ok(!metadata.is_dir()),
Err(e) if e.kind() == object_store::ErrorKind::NotFound => Ok(false),
Err(e) => Err(e).context(StorageOperationSnafu {
operation: format!("check exists {}", path),