feat(cli): add export-v2 delete command (#8162)

* feat(cli): add export-v2 celete command

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat(cli): refine delete confirmation flag

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-05-25 21:28:23 -07:00
committed by GitHub
parent fd53ebc8a3
commit c84462bdc1
3 changed files with 429 additions and 2 deletions

View File

@@ -15,6 +15,7 @@
//! Export V2 CLI commands.
use std::collections::HashSet;
use std::io::{self, Write};
use std::time::Duration;
use async_trait::async_trait;
@@ -28,7 +29,7 @@ use crate::Tool;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::coordinator::export_data;
use crate::data::export_v2::error::{
ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu,
ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu, IoSnafu,
ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu,
SchemaOnlyModeMismatchSnafu, SnapshotVerifyFailedSnafu, UnexpectedValueTypeSnafu,
};
@@ -38,7 +39,9 @@ use crate::data::export_v2::manifest::{
};
use crate::data::export_v2::schema::{DDL_DIR, SCHEMA_DIR, SCHEMAS_FILE};
use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
use crate::data::snapshot_storage::{
OpenDalStorage, SnapshotStorage, validate_snapshot_uri, validate_uri,
};
use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
use crate::database::{DatabaseClient, parse_proxy_opts};
@@ -51,6 +54,8 @@ pub enum ExportV2Command {
List(ExportListCommand),
/// Verify snapshot integrity.
Verify(ExportVerifyCommand),
/// Delete a snapshot and all data under it.
Delete(ExportDeleteCommand),
}
impl ExportV2Command {
@@ -59,6 +64,7 @@ impl ExportV2Command {
ExportV2Command::Create(cmd) => cmd.build().await,
ExportV2Command::List(cmd) => cmd.build().await,
ExportV2Command::Verify(cmd) => cmd.build().await,
ExportV2Command::Delete(cmd) => cmd.build().await,
}
}
}
@@ -172,6 +178,75 @@ impl ExportVerify {
}
}
/// Delete a snapshot and all data under it.
#[derive(Debug, Parser)]
pub struct ExportDeleteCommand {
/// Snapshot storage location (e.g., s3://bucket/path, file:///tmp/backup).
#[clap(long)]
snapshot: String,
/// Skip interactive confirmation.
#[clap(long = "no-confirm", alias = "yes")]
skip_confirmation: bool,
/// Object store configuration for remote storage backends.
#[clap(flatten)]
storage: ObjectStoreConfig,
}
impl ExportDeleteCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
validate_snapshot_uri(&self.snapshot).map_err(BoxedError::new)?;
let storage =
OpenDalStorage::from_uri(&self.snapshot, &self.storage).map_err(BoxedError::new)?;
Ok(Box::new(ExportDelete {
snapshot: self.snapshot.clone(),
skip_confirmation: self.skip_confirmation,
storage,
}))
}
}
/// Export delete tool implementation.
pub struct ExportDelete {
snapshot: String,
skip_confirmation: bool,
storage: OpenDalStorage,
}
#[async_trait]
impl Tool for ExportDelete {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
self.run().await.map_err(BoxedError::new)
}
}
impl ExportDelete {
async fn run(&self) -> Result<()> {
self.run_with_confirmation(confirm_delete).await
}
async fn run_with_confirmation<F>(&self, confirm: F) -> Result<()>
where
F: FnOnce(&str) -> Result<bool>,
{
let manifest = self.storage.read_manifest().await?;
print_delete_summary(&self.snapshot, &manifest);
if !self.skip_confirmation && !confirm(&self.snapshot)? {
println!("Deletion cancelled.");
return Ok(());
}
println!("Deleting snapshot...");
self.storage.delete_snapshot().await?;
println!("Snapshot deleted successfully.");
Ok(())
}
}
/// Create a new snapshot.
#[derive(Debug, Parser)]
pub struct ExportCreateCommand {
@@ -1239,6 +1314,79 @@ fn print_verify_report(snapshot: &str, report: &VerifyReport) {
);
}
fn print_delete_summary(snapshot: &str, manifest: &Manifest) {
println!("Snapshot: {}", manifest.snapshot_id);
println!(" Location: {}", snapshot);
println!(
" Created: {} UTC",
manifest.created_at.format("%Y-%m-%d %H:%M:%S")
);
println!(" Catalog: {}", manifest.catalog);
println!(" Schemas: {}", manifest.schemas.join(", "));
println!(" Chunks: {}", format_delete_chunks(manifest));
}
fn format_delete_chunks(manifest: &Manifest) -> String {
if manifest.schema_only {
return "0 (schema-only)".to_string();
}
let summary = summarize_chunks(manifest);
if manifest.is_complete() {
format!("{} (all processed)", summary.total)
} else {
format!(
"{} ({} completed, {} skipped, {} pending, {} in_progress, {} failed)",
summary.total,
summary.completed,
summary.skipped,
summary.pending,
summary.in_progress,
summary.failed
)
}
}
fn confirm_delete(snapshot: &str) -> Result<bool> {
println!();
println!(
"Warning: this removes the entire snapshot directory/prefix, not only files listed in manifest."
);
println!("This will permanently delete all data under:");
println!(" {}", display_snapshot_prefix(snapshot));
print!("Type 'yes' to confirm deletion: ");
io::stdout().flush().map_err(|error| {
IoSnafu {
operation: "flushing delete confirmation prompt",
error,
}
.build()
})?;
let mut input = String::new();
io::stdin().read_line(&mut input).map_err(|error| {
IoSnafu {
operation: "reading delete confirmation",
error,
}
.build()
})?;
Ok(delete_confirmation_matches(&input))
}
fn delete_confirmation_matches(input: &str) -> bool {
input.trim() == "yes"
}
fn display_snapshot_prefix(snapshot: &str) -> String {
if snapshot.ends_with('/') {
snapshot.to_string()
} else {
format!("{}/", snapshot)
}
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
@@ -1563,6 +1711,7 @@ mod tests {
);
assert_eq!(snapshot_status(&complete), "complete");
assert_eq!(format_list_chunks(&complete), "2/2");
assert_eq!(format_delete_chunks(&complete), "2 (all processed)");
let incomplete = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
@@ -1571,6 +1720,150 @@ mod tests {
);
assert_eq!(snapshot_status(&incomplete), "incomplete");
assert_eq!(format_list_chunks(&incomplete), "1/2");
assert_eq!(
format_delete_chunks(&incomplete),
"2 (1 completed, 0 skipped, 1 pending, 0 in_progress, 0 failed)"
);
}
#[tokio::test]
async fn test_delete_build_rejects_bucket_root_uri() {
let cmd = ExportDeleteCommand::parse_from([
"export-v2-delete",
"--snapshot",
"s3://bucket",
"--no-confirm",
]);
let error = cmd.build().await.err().unwrap().to_string();
assert!(error.contains("non-empty path"));
}
#[test]
fn test_delete_skip_confirmation_aliases() {
let no_confirm = ExportDeleteCommand::parse_from([
"export-v2-delete",
"--snapshot",
"s3://bucket/snapshot",
"--no-confirm",
]);
assert!(no_confirm.skip_confirmation);
let yes = ExportDeleteCommand::parse_from([
"export-v2-delete",
"--snapshot",
"s3://bucket/snapshot",
"--yes",
]);
assert!(yes.skip_confirmation);
}
#[tokio::test]
async fn test_delete_snapshot_with_no_confirm_removes_snapshot_contents() {
let parent = tempdir().unwrap();
let snapshot = parent.path().join("snapshot");
let sibling = parent.path().join("sibling");
std::fs::create_dir_all(&snapshot).unwrap();
std::fs::create_dir_all(&sibling).unwrap();
std::fs::write(sibling.join("keep.txt"), b"keep").unwrap();
write_root_manifest(
&snapshot,
test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
true,
true,
),
);
write_snapshot_file(&snapshot, "schema/schemas.json", b"[]");
let uri = Url::from_directory_path(&snapshot).unwrap().to_string();
let delete = ExportDelete {
snapshot: uri,
skip_confirmation: true,
storage: file_storage_for_dir(&snapshot),
};
delete
.run_with_confirmation(|_| unreachable!())
.await
.unwrap();
assert!(!snapshot.join(MANIFEST_FILE).exists());
assert!(!snapshot.join("schema/schemas.json").exists());
assert!(sibling.join("keep.txt").exists());
}
#[tokio::test]
async fn test_delete_snapshot_requires_manifest() {
let dir = tempdir().unwrap();
let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
let delete = ExportDelete {
snapshot: uri,
skip_confirmation: true,
storage: file_storage_for_dir(dir.path()),
};
let error = delete
.run_with_confirmation(|_| unreachable!())
.await
.err()
.unwrap()
.to_string();
assert!(error.contains("Snapshot not found"));
assert!(dir.path().exists());
}
#[tokio::test]
async fn test_delete_snapshot_cancels_without_exact_confirmation() {
let dir = tempdir().unwrap();
write_root_manifest(
dir.path(),
test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
true,
true,
),
);
write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
let delete = ExportDelete {
snapshot: uri.clone(),
skip_confirmation: false,
storage: file_storage_for_dir(dir.path()),
};
delete
.run_with_confirmation(|snapshot| {
assert_eq!(snapshot, uri);
Ok(false)
})
.await
.unwrap();
assert!(dir.path().join(MANIFEST_FILE).exists());
assert!(dir.path().join("schema/schemas.json").exists());
}
#[test]
fn test_delete_confirmation_requires_exact_yes() {
assert!(delete_confirmation_matches("yes"));
assert!(delete_confirmation_matches(" yes\n"));
assert!(!delete_confirmation_matches("YES"));
assert!(!delete_confirmation_matches("y"));
assert!(!delete_confirmation_matches("yes please"));
}
#[test]
fn test_display_snapshot_prefix_adds_trailing_slash() {
assert_eq!(
display_snapshot_prefix("s3://bucket/snapshot"),
"s3://bucket/snapshot/"
);
assert_eq!(
display_snapshot_prefix("s3://bucket/snapshot/"),
"s3://bucket/snapshot/"
);
}
#[tokio::test]

View File

@@ -71,6 +71,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("I/O error while {}: {}", operation, error))]
Io {
operation: &'static str,
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Cannot resume snapshot with a different schema_only mode (existing: {}, requested: {}). Use --force to recreate.",
existing_schema_only,
@@ -223,6 +231,8 @@ impl ErrorExt for Error {
| Error::UnexpectedValueType { .. }
| Error::UrlParse { .. } => StatusCode::Internal,
Error::Io { .. } => StatusCode::External,
Error::Database { error, .. } => error.status_code(),
Error::SnapshotNotFound { .. } => StatusCode::InvalidArguments,

View File

@@ -18,6 +18,7 @@
//! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem).
use std::collections::BTreeSet;
use std::path::Component;
use async_trait::async_trait;
use futures::TryStreamExt;
@@ -131,6 +132,92 @@ pub fn validate_uri(uri: &str) -> Result<StorageScheme> {
StorageScheme::from_uri(uri)
}
/// Validates a URI for snapshot-scoped destructive operations.
///
/// Unlike read-only parent scans, destructive commands must target a concrete
/// snapshot directory instead of a bucket/container root or filesystem root.
/// Remote storage buckets/containers already provide namespace isolation, so a
/// non-empty object prefix is enough; local filesystem paths require at least
/// two non-root path segments to avoid deleting broad system directories.
pub fn validate_snapshot_uri(uri: &str) -> Result<StorageScheme> {
let scheme = validate_uri(uri)?;
reject_query_or_fragment(uri)?;
match scheme {
StorageScheme::File => validate_file_snapshot_uri(uri)?,
StorageScheme::S3 | StorageScheme::Oss | StorageScheme::Gcs | StorageScheme::Azblob => {
extract_remote_location_with_root_policy(uri, false)?;
}
}
Ok(scheme)
}
fn reject_query_or_fragment(uri: &str) -> Result<()> {
let url = Url::parse(uri).context(UrlParseSnafu)?;
if url.query().is_some() || url.fragment().is_some() {
return InvalidUriSnafu {
uri,
reason: "snapshot URI must not include query or fragment",
}
.fail();
}
Ok(())
}
fn validate_file_snapshot_uri(uri: &str) -> Result<()> {
if has_explicit_dot_segment(uri) {
return InvalidUriSnafu {
uri,
reason: "file snapshot URI must not contain '.' or '..' path segments",
}
.fail();
}
let path = extract_file_path_from_uri(uri)?;
let mut normal_component_count = 0;
// This is only a path-shape guard for destructive operations. It does not
// resolve symlinks. Drive prefixes and root separators also do not count
// toward depth; delete still relies on the manifest check and explicit
// confirmation before removing the rooted storage prefix.
for component in std::path::Path::new(&path).components() {
match component {
Component::Normal(_) => normal_component_count += 1,
Component::CurDir | Component::ParentDir => {
return InvalidUriSnafu {
uri,
reason: "file snapshot URI must not contain '.' or '..' path segments",
}
.fail();
}
Component::Prefix(_) | Component::RootDir => {}
}
}
if normal_component_count < 2 {
return InvalidUriSnafu {
uri,
reason: "file snapshot URI must point to a directory at least two levels deep",
}
.fail();
}
Ok(())
}
fn has_explicit_dot_segment(uri: &str) -> bool {
// Defense in depth: catch dot segments at the raw URI level before
// `Url::to_file_path()` can normalize them away. The `Path::components()`
// check below still runs because URL decoding can reintroduce them.
let without_fragment = uri.split_once('#').map_or(uri, |(path, _)| path);
let path = without_fragment
.split_once('?')
.map_or(without_fragment, |(path, _)| path);
path.split('/')
.any(|segment| segment == "." || segment == "..")
}
fn schema_index_path() -> String {
format!("{}/{}", SCHEMA_DIR, SCHEMAS_FILE)
}
@@ -708,6 +795,43 @@ mod tests {
assert!(OpenDalStorage::from_parent_uri("s3://bucket", &storage).is_ok());
}
#[test]
fn test_validate_snapshot_uri_rejects_dangerous_roots() {
assert!(validate_snapshot_uri("s3://bucket").is_err());
assert!(validate_snapshot_uri("s3://bucket/").is_err());
assert!(validate_snapshot_uri("oss://bucket").is_err());
assert!(validate_snapshot_uri("gs://bucket").is_err());
assert!(validate_snapshot_uri("azblob://container").is_err());
assert!(validate_snapshot_uri("s3://bucket/snapshot?version=1").is_err());
assert!(validate_snapshot_uri("file:///tmp/backup#fragment").is_err());
assert!(validate_snapshot_uri("file:///").is_err());
assert!(validate_snapshot_uri("file:///tmp").is_err());
assert!(validate_snapshot_uri("file:///tmp/backup/.").is_err());
assert!(validate_snapshot_uri("file:///tmp/backup/..").is_err());
}
#[test]
fn test_validate_snapshot_uri_accepts_snapshot_paths() {
assert_eq!(
validate_snapshot_uri("s3://bucket/snapshots/prod").unwrap(),
StorageScheme::S3
);
let dir = tempdir().unwrap();
let snapshot = dir.path().join("snapshot");
std::fs::create_dir_all(&snapshot).unwrap();
let uri = Url::from_directory_path(snapshot).unwrap().to_string();
assert_eq!(validate_snapshot_uri(&uri).unwrap(), StorageScheme::File);
}
#[cfg(windows)]
#[test]
fn test_validate_snapshot_uri_windows_drive_prefix_depth() {
assert!(validate_snapshot_uri("file:///C:/").is_err());
assert!(validate_snapshot_uri("file:///C:/Users").is_err());
assert!(validate_snapshot_uri("file:///C:/Users/snapshot").is_ok());
}
#[cfg(not(windows))]
#[test]
fn test_extract_path_from_uri_unix_examples() {