diff --git a/src/cli/src/data/export_v2/command.rs b/src/cli/src/data/export_v2/command.rs index 3c069a72be..db0f576a4e 100644 --- a/src/cli/src/data/export_v2/command.rs +++ b/src/cli/src/data/export_v2/command.rs @@ -15,6 +15,7 @@ //! Export V2 CLI commands. use std::collections::HashSet; +use std::io::{self, Write}; use std::time::Duration; use async_trait::async_trait; @@ -28,7 +29,7 @@ use crate::Tool; use crate::common::ObjectStoreConfig; use crate::data::export_v2::coordinator::export_data; use crate::data::export_v2::error::{ - ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu, + ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu, IoSnafu, ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu, SchemaOnlyModeMismatchSnafu, SnapshotVerifyFailedSnafu, UnexpectedValueTypeSnafu, }; @@ -38,7 +39,9 @@ use crate::data::export_v2::manifest::{ }; use crate::data::export_v2::schema::{DDL_DIR, SCHEMA_DIR, SCHEMAS_FILE}; use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema}; -use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri}; +use crate::data::snapshot_storage::{ + OpenDalStorage, SnapshotStorage, validate_snapshot_uri, validate_uri, +}; use crate::data::sql::{escape_sql_identifier, escape_sql_literal}; use crate::database::{DatabaseClient, parse_proxy_opts}; @@ -51,6 +54,8 @@ pub enum ExportV2Command { List(ExportListCommand), /// Verify snapshot integrity. Verify(ExportVerifyCommand), + /// Delete a snapshot and all data under it. + Delete(ExportDeleteCommand), } impl ExportV2Command { @@ -59,6 +64,7 @@ impl ExportV2Command { ExportV2Command::Create(cmd) => cmd.build().await, ExportV2Command::List(cmd) => cmd.build().await, ExportV2Command::Verify(cmd) => cmd.build().await, + ExportV2Command::Delete(cmd) => cmd.build().await, } } } @@ -172,6 +178,75 @@ impl ExportVerify { } } +/// Delete a snapshot and all data under it. +#[derive(Debug, Parser)] +pub struct ExportDeleteCommand { + /// Snapshot storage location (e.g., s3://bucket/path, file:///tmp/backup). + #[clap(long)] + snapshot: String, + + /// Skip interactive confirmation. + #[clap(long = "no-confirm", alias = "yes")] + skip_confirmation: bool, + + /// Object store configuration for remote storage backends. + #[clap(flatten)] + storage: ObjectStoreConfig, +} + +impl ExportDeleteCommand { + pub async fn build(&self) -> std::result::Result, BoxedError> { + validate_snapshot_uri(&self.snapshot).map_err(BoxedError::new)?; + let storage = + OpenDalStorage::from_uri(&self.snapshot, &self.storage).map_err(BoxedError::new)?; + + Ok(Box::new(ExportDelete { + snapshot: self.snapshot.clone(), + skip_confirmation: self.skip_confirmation, + storage, + })) + } +} + +/// Export delete tool implementation. +pub struct ExportDelete { + snapshot: String, + skip_confirmation: bool, + storage: OpenDalStorage, +} + +#[async_trait] +impl Tool for ExportDelete { + async fn do_work(&self) -> std::result::Result<(), BoxedError> { + self.run().await.map_err(BoxedError::new) + } +} + +impl ExportDelete { + async fn run(&self) -> Result<()> { + self.run_with_confirmation(confirm_delete).await + } + + async fn run_with_confirmation(&self, confirm: F) -> Result<()> + where + F: FnOnce(&str) -> Result, + { + let manifest = self.storage.read_manifest().await?; + print_delete_summary(&self.snapshot, &manifest); + + if !self.skip_confirmation && !confirm(&self.snapshot)? { + println!("Deletion cancelled."); + return Ok(()); + } + + println!("Deleting snapshot..."); + self.storage.delete_snapshot().await?; + println!("Snapshot deleted successfully."); + + Ok(()) + } +} + /// Create a new snapshot. #[derive(Debug, Parser)] pub struct ExportCreateCommand { @@ -1239,6 +1314,79 @@ fn print_verify_report(snapshot: &str, report: &VerifyReport) { ); } +fn print_delete_summary(snapshot: &str, manifest: &Manifest) { + println!("Snapshot: {}", manifest.snapshot_id); + println!(" Location: {}", snapshot); + println!( + " Created: {} UTC", + manifest.created_at.format("%Y-%m-%d %H:%M:%S") + ); + println!(" Catalog: {}", manifest.catalog); + println!(" Schemas: {}", manifest.schemas.join(", ")); + println!(" Chunks: {}", format_delete_chunks(manifest)); +} + +fn format_delete_chunks(manifest: &Manifest) -> String { + if manifest.schema_only { + return "0 (schema-only)".to_string(); + } + + let summary = summarize_chunks(manifest); + if manifest.is_complete() { + format!("{} (all processed)", summary.total) + } else { + format!( + "{} ({} completed, {} skipped, {} pending, {} in_progress, {} failed)", + summary.total, + summary.completed, + summary.skipped, + summary.pending, + summary.in_progress, + summary.failed + ) + } +} + +fn confirm_delete(snapshot: &str) -> Result { + println!(); + println!( + "Warning: this removes the entire snapshot directory/prefix, not only files listed in manifest." + ); + println!("This will permanently delete all data under:"); + println!(" {}", display_snapshot_prefix(snapshot)); + print!("Type 'yes' to confirm deletion: "); + io::stdout().flush().map_err(|error| { + IoSnafu { + operation: "flushing delete confirmation prompt", + error, + } + .build() + })?; + + let mut input = String::new(); + io::stdin().read_line(&mut input).map_err(|error| { + IoSnafu { + operation: "reading delete confirmation", + error, + } + .build() + })?; + + Ok(delete_confirmation_matches(&input)) +} + +fn delete_confirmation_matches(input: &str) -> bool { + input.trim() == "yes" +} + +fn display_snapshot_prefix(snapshot: &str) -> String { + if snapshot.ends_with('/') { + snapshot.to_string() + } else { + format!("{}/", snapshot) + } +} + #[cfg(test)] mod tests { use chrono::TimeZone; @@ -1563,6 +1711,7 @@ mod tests { ); assert_eq!(snapshot_status(&complete), "complete"); assert_eq!(format_list_chunks(&complete), "2/2"); + assert_eq!(format_delete_chunks(&complete), "2 (all processed)"); let incomplete = test_manifest( chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), @@ -1571,6 +1720,150 @@ mod tests { ); assert_eq!(snapshot_status(&incomplete), "incomplete"); assert_eq!(format_list_chunks(&incomplete), "1/2"); + assert_eq!( + format_delete_chunks(&incomplete), + "2 (1 completed, 0 skipped, 1 pending, 0 in_progress, 0 failed)" + ); + } + + #[tokio::test] + async fn test_delete_build_rejects_bucket_root_uri() { + let cmd = ExportDeleteCommand::parse_from([ + "export-v2-delete", + "--snapshot", + "s3://bucket", + "--no-confirm", + ]); + + let error = cmd.build().await.err().unwrap().to_string(); + assert!(error.contains("non-empty path")); + } + + #[test] + fn test_delete_skip_confirmation_aliases() { + let no_confirm = ExportDeleteCommand::parse_from([ + "export-v2-delete", + "--snapshot", + "s3://bucket/snapshot", + "--no-confirm", + ]); + assert!(no_confirm.skip_confirmation); + + let yes = ExportDeleteCommand::parse_from([ + "export-v2-delete", + "--snapshot", + "s3://bucket/snapshot", + "--yes", + ]); + assert!(yes.skip_confirmation); + } + + #[tokio::test] + async fn test_delete_snapshot_with_no_confirm_removes_snapshot_contents() { + let parent = tempdir().unwrap(); + let snapshot = parent.path().join("snapshot"); + let sibling = parent.path().join("sibling"); + std::fs::create_dir_all(&snapshot).unwrap(); + std::fs::create_dir_all(&sibling).unwrap(); + std::fs::write(sibling.join("keep.txt"), b"keep").unwrap(); + write_root_manifest( + &snapshot, + test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + true, + true, + ), + ); + write_snapshot_file(&snapshot, "schema/schemas.json", b"[]"); + + let uri = Url::from_directory_path(&snapshot).unwrap().to_string(); + let delete = ExportDelete { + snapshot: uri, + skip_confirmation: true, + storage: file_storage_for_dir(&snapshot), + }; + + delete + .run_with_confirmation(|_| unreachable!()) + .await + .unwrap(); + + assert!(!snapshot.join(MANIFEST_FILE).exists()); + assert!(!snapshot.join("schema/schemas.json").exists()); + assert!(sibling.join("keep.txt").exists()); + } + + #[tokio::test] + async fn test_delete_snapshot_requires_manifest() { + let dir = tempdir().unwrap(); + let uri = Url::from_directory_path(dir.path()).unwrap().to_string(); + let delete = ExportDelete { + snapshot: uri, + skip_confirmation: true, + storage: file_storage_for_dir(dir.path()), + }; + + let error = delete + .run_with_confirmation(|_| unreachable!()) + .await + .err() + .unwrap() + .to_string(); + + assert!(error.contains("Snapshot not found")); + assert!(dir.path().exists()); + } + + #[tokio::test] + async fn test_delete_snapshot_cancels_without_exact_confirmation() { + let dir = tempdir().unwrap(); + write_root_manifest( + dir.path(), + test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + true, + true, + ), + ); + write_snapshot_file(dir.path(), "schema/schemas.json", b"[]"); + let uri = Url::from_directory_path(dir.path()).unwrap().to_string(); + let delete = ExportDelete { + snapshot: uri.clone(), + skip_confirmation: false, + storage: file_storage_for_dir(dir.path()), + }; + + delete + .run_with_confirmation(|snapshot| { + assert_eq!(snapshot, uri); + Ok(false) + }) + .await + .unwrap(); + + assert!(dir.path().join(MANIFEST_FILE).exists()); + assert!(dir.path().join("schema/schemas.json").exists()); + } + + #[test] + fn test_delete_confirmation_requires_exact_yes() { + assert!(delete_confirmation_matches("yes")); + assert!(delete_confirmation_matches(" yes\n")); + assert!(!delete_confirmation_matches("YES")); + assert!(!delete_confirmation_matches("y")); + assert!(!delete_confirmation_matches("yes please")); + } + + #[test] + fn test_display_snapshot_prefix_adds_trailing_slash() { + assert_eq!( + display_snapshot_prefix("s3://bucket/snapshot"), + "s3://bucket/snapshot/" + ); + assert_eq!( + display_snapshot_prefix("s3://bucket/snapshot/"), + "s3://bucket/snapshot/" + ); } #[tokio::test] diff --git a/src/cli/src/data/export_v2/error.rs b/src/cli/src/data/export_v2/error.rs index 8d9a53f186..e16e3a6176 100644 --- a/src/cli/src/data/export_v2/error.rs +++ b/src/cli/src/data/export_v2/error.rs @@ -71,6 +71,14 @@ pub enum Error { location: Location, }, + #[snafu(display("I/O error while {}: {}", operation, error))] + Io { + operation: &'static str, + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Cannot resume snapshot with a different schema_only mode (existing: {}, requested: {}). Use --force to recreate.", existing_schema_only, @@ -223,6 +231,8 @@ impl ErrorExt for Error { | Error::UnexpectedValueType { .. } | Error::UrlParse { .. } => StatusCode::Internal, + Error::Io { .. } => StatusCode::External, + Error::Database { error, .. } => error.status_code(), Error::SnapshotNotFound { .. } => StatusCode::InvalidArguments, diff --git a/src/cli/src/data/snapshot_storage.rs b/src/cli/src/data/snapshot_storage.rs index da8fdf6ab1..93e211628a 100644 --- a/src/cli/src/data/snapshot_storage.rs +++ b/src/cli/src/data/snapshot_storage.rs @@ -18,6 +18,7 @@ //! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem). use std::collections::BTreeSet; +use std::path::Component; use async_trait::async_trait; use futures::TryStreamExt; @@ -131,6 +132,92 @@ pub fn validate_uri(uri: &str) -> Result { StorageScheme::from_uri(uri) } +/// Validates a URI for snapshot-scoped destructive operations. +/// +/// Unlike read-only parent scans, destructive commands must target a concrete +/// snapshot directory instead of a bucket/container root or filesystem root. +/// Remote storage buckets/containers already provide namespace isolation, so a +/// non-empty object prefix is enough; local filesystem paths require at least +/// two non-root path segments to avoid deleting broad system directories. +pub fn validate_snapshot_uri(uri: &str) -> Result { + let scheme = validate_uri(uri)?; + reject_query_or_fragment(uri)?; + match scheme { + StorageScheme::File => validate_file_snapshot_uri(uri)?, + StorageScheme::S3 | StorageScheme::Oss | StorageScheme::Gcs | StorageScheme::Azblob => { + extract_remote_location_with_root_policy(uri, false)?; + } + } + Ok(scheme) +} + +fn reject_query_or_fragment(uri: &str) -> Result<()> { + let url = Url::parse(uri).context(UrlParseSnafu)?; + if url.query().is_some() || url.fragment().is_some() { + return InvalidUriSnafu { + uri, + reason: "snapshot URI must not include query or fragment", + } + .fail(); + } + + Ok(()) +} + +fn validate_file_snapshot_uri(uri: &str) -> Result<()> { + if has_explicit_dot_segment(uri) { + return InvalidUriSnafu { + uri, + reason: "file snapshot URI must not contain '.' or '..' path segments", + } + .fail(); + } + + let path = extract_file_path_from_uri(uri)?; + let mut normal_component_count = 0; + + // This is only a path-shape guard for destructive operations. It does not + // resolve symlinks. Drive prefixes and root separators also do not count + // toward depth; delete still relies on the manifest check and explicit + // confirmation before removing the rooted storage prefix. + for component in std::path::Path::new(&path).components() { + match component { + Component::Normal(_) => normal_component_count += 1, + Component::CurDir | Component::ParentDir => { + return InvalidUriSnafu { + uri, + reason: "file snapshot URI must not contain '.' or '..' path segments", + } + .fail(); + } + Component::Prefix(_) | Component::RootDir => {} + } + } + + if normal_component_count < 2 { + return InvalidUriSnafu { + uri, + reason: "file snapshot URI must point to a directory at least two levels deep", + } + .fail(); + } + + Ok(()) +} + +fn has_explicit_dot_segment(uri: &str) -> bool { + // Defense in depth: catch dot segments at the raw URI level before + // `Url::to_file_path()` can normalize them away. The `Path::components()` + // check below still runs because URL decoding can reintroduce them. + let without_fragment = uri.split_once('#').map_or(uri, |(path, _)| path); + let path = without_fragment + .split_once('?') + .map_or(without_fragment, |(path, _)| path); + + path.split('/') + .any(|segment| segment == "." || segment == "..") +} + fn schema_index_path() -> String { format!("{}/{}", SCHEMA_DIR, SCHEMAS_FILE) } @@ -708,6 +795,43 @@ mod tests { assert!(OpenDalStorage::from_parent_uri("s3://bucket", &storage).is_ok()); } + #[test] + fn test_validate_snapshot_uri_rejects_dangerous_roots() { + assert!(validate_snapshot_uri("s3://bucket").is_err()); + assert!(validate_snapshot_uri("s3://bucket/").is_err()); + assert!(validate_snapshot_uri("oss://bucket").is_err()); + assert!(validate_snapshot_uri("gs://bucket").is_err()); + assert!(validate_snapshot_uri("azblob://container").is_err()); + assert!(validate_snapshot_uri("s3://bucket/snapshot?version=1").is_err()); + assert!(validate_snapshot_uri("file:///tmp/backup#fragment").is_err()); + assert!(validate_snapshot_uri("file:///").is_err()); + assert!(validate_snapshot_uri("file:///tmp").is_err()); + assert!(validate_snapshot_uri("file:///tmp/backup/.").is_err()); + assert!(validate_snapshot_uri("file:///tmp/backup/..").is_err()); + } + + #[test] + fn test_validate_snapshot_uri_accepts_snapshot_paths() { + assert_eq!( + validate_snapshot_uri("s3://bucket/snapshots/prod").unwrap(), + StorageScheme::S3 + ); + + let dir = tempdir().unwrap(); + let snapshot = dir.path().join("snapshot"); + std::fs::create_dir_all(&snapshot).unwrap(); + let uri = Url::from_directory_path(snapshot).unwrap().to_string(); + assert_eq!(validate_snapshot_uri(&uri).unwrap(), StorageScheme::File); + } + + #[cfg(windows)] + #[test] + fn test_validate_snapshot_uri_windows_drive_prefix_depth() { + assert!(validate_snapshot_uri("file:///C:/").is_err()); + assert!(validate_snapshot_uri("file:///C:/Users").is_err()); + assert!(validate_snapshot_uri("file:///C:/Users/snapshot").is_ok()); + } + #[cfg(not(windows))] #[test] fn test_extract_path_from_uri_unix_examples() {