diff --git a/src/cli/src/data/export_v2/command.rs b/src/cli/src/data/export_v2/command.rs index ddcb323fef..a7da6d3862 100644 --- a/src/cli/src/data/export_v2/command.rs +++ b/src/cli/src/data/export_v2/command.rs @@ -34,7 +34,7 @@ use crate::data::export_v2::error::{ }; use crate::data::export_v2::extractor::SchemaExtractor; use crate::data::export_v2::manifest::{ - ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange, + ChunkMeta, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange, }; use crate::data::path::ddl_path_for_schema; use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri}; @@ -46,16 +46,73 @@ use crate::database::{DatabaseClient, parse_proxy_opts}; pub enum ExportV2Command { /// Create a new snapshot. Create(ExportCreateCommand), + /// List snapshots under a parent location. + List(ExportListCommand), } impl ExportV2Command { pub async fn build(&self) -> std::result::Result, BoxedError> { match self { ExportV2Command::Create(cmd) => cmd.build().await, + ExportV2Command::List(cmd) => cmd.build().await, } } } +/// List snapshots under a parent location. +#[derive(Debug, Parser)] +pub struct ExportListCommand { + /// Parent storage location whose direct subdirectories are snapshots. + #[clap(long)] + location: String, + + /// Object store configuration for remote storage backends. + #[clap(flatten)] + storage: ObjectStoreConfig, +} + +impl ExportListCommand { + pub async fn build(&self) -> std::result::Result, BoxedError> { + validate_uri(&self.location).map_err(BoxedError::new)?; + let storage = OpenDalStorage::from_parent_uri(&self.location, &self.storage) + .map_err(BoxedError::new)?; + + Ok(Box::new(ExportList { + location: self.location.clone(), + storage, + })) + } +} + +/// Export list tool implementation. +pub struct ExportList { + location: String, + storage: OpenDalStorage, +} + +#[async_trait] +impl Tool for ExportList { + async fn do_work(&self) -> std::result::Result<(), BoxedError> { + self.run().await.map_err(BoxedError::new) + } +} + +impl ExportList { + async fn run(&self) -> Result<()> { + let result = scan_snapshots(&self.storage).await?; + + println!("Scanning: {}", self.location); + if result.snapshots.is_empty() { + println!("No snapshots found."); + } else { + print_snapshot_list(&result.snapshots, result.unreadable.len()); + } + print_unreadable_warnings(&result.unreadable); + + Ok(()) + } +} + /// Create a new snapshot. #[derive(Debug, Parser)] pub struct ExportCreateCommand { @@ -628,10 +685,138 @@ fn format_chunk_plan(chunks: &[ChunkMeta]) -> String { format!("[{}]", items.join(", ")) } +#[derive(Debug)] +struct SnapshotListEntry { + path: String, + manifest: Manifest, +} + +#[derive(Debug, Default)] +struct SnapshotScanResult { + snapshots: Vec, + unreadable: Vec, +} + +async fn scan_snapshots(storage: &OpenDalStorage) -> Result { + let mut result = SnapshotScanResult::default(); + for dir in storage.list_direct_child_dirs().await? { + let manifest_path = format!("{}/{}", dir.trim_matches('/'), MANIFEST_FILE); + let Some(data) = storage.read_file_if_exists(&manifest_path).await? else { + continue; + }; + + match serde_json::from_slice::(&data) { + Ok(manifest) => result.snapshots.push(SnapshotListEntry { + path: format!("{}/", dir.trim_matches('/')), + manifest, + }), + Err(_) => result + .unreadable + .push(format!("{}/", dir.trim_matches('/'))), + } + } + + result + .snapshots + .sort_by_key(|entry| std::cmp::Reverse(entry.manifest.created_at)); + result.unreadable.sort(); + Ok(result) +} + +fn print_snapshot_list(snapshots: &[SnapshotListEntry], unreadable_count: usize) { + if unreadable_count == 0 { + println!("Found {} snapshots:", snapshots.len()); + } else { + println!( + "Found {} snapshots ({} {} skipped: unreadable manifest):", + snapshots.len(), + unreadable_count, + directory_word(unreadable_count) + ); + } + println!(); + println!( + " {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} Status", + "Path", "ID", "Created", "Catalog", "Schemas", "Chunks" + ); + println!( + " {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} {:<10}", + "-".repeat(24), + "-".repeat(36), + "-".repeat(19), + "-".repeat(9), + "-".repeat(7), + "-".repeat(6), + "-".repeat(10) + ); + for entry in snapshots { + let manifest = &entry.manifest; + println!( + " {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} {}", + entry.path, + manifest.snapshot_id, + manifest.created_at.format("%Y-%m-%d %H:%M:%S"), + manifest.catalog, + manifest.schemas.len(), + format_list_chunks(manifest), + snapshot_status(manifest) + ); + } +} + +fn print_unreadable_warnings(unreadable: &[String]) { + if unreadable.is_empty() { + return; + } + + println!(); + println!( + "Warning: {} {} had corrupt/unreadable manifest.json:", + unreadable.len(), + directory_word(unreadable.len()) + ); + for path in unreadable { + println!(" - {}", path); + } +} + +fn directory_word(count: usize) -> &'static str { + if count == 1 { + "directory" + } else { + "directories" + } +} + +fn snapshot_status(manifest: &Manifest) -> &'static str { + if manifest.schema_only { + "schema-only" + } else if manifest.is_complete() { + "complete" + } else { + "incomplete" + } +} + +fn format_list_chunks(manifest: &Manifest) -> String { + let total = manifest.chunks.len(); + if total == 0 { + return "0".to_string(); + } + + format!( + "{}/{}", + manifest.completed_count() + manifest.skipped_count(), + total + ) +} + #[cfg(test)] mod tests { use chrono::TimeZone; use clap::Parser; + use tempfile::tempdir; + use url::Url; use super::*; use crate::data::path::ddl_path_for_schema; @@ -886,4 +1071,124 @@ mod tests { .to_string(); assert!(error.contains("time_range")); } + + #[tokio::test] + async fn test_scan_snapshots_sorts_and_tracks_unreadable_manifests() { + let dir = tempdir().unwrap(); + write_test_manifest( + dir.path(), + "older", + test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ), + ); + write_test_manifest( + dir.path(), + "newer", + test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap(), + false, + true, + ), + ); + + std::fs::create_dir_all(dir.path().join("empty-dir")).unwrap(); + std::fs::create_dir_all(dir.path().join("not-snapshot")).unwrap(); + std::fs::write(dir.path().join("not-snapshot").join("data.txt"), "x").unwrap(); + std::fs::create_dir_all(dir.path().join("broken")).unwrap(); + std::fs::write(dir.path().join("broken").join(MANIFEST_FILE), "{not-json").unwrap(); + + let uri = Url::from_directory_path(dir.path()).unwrap().to_string(); + let storage = OpenDalStorage::from_file_uri(&uri).unwrap(); + let result = scan_snapshots(&storage).await.unwrap(); + + assert_eq!(result.snapshots.len(), 2); + assert_eq!( + result.snapshots[0].manifest.created_at, + chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap() + ); + assert_eq!( + result.snapshots[1].manifest.created_at, + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap() + ); + assert_eq!(result.unreadable, vec!["broken/".to_string()]); + assert_eq!(result.snapshots[0].path, "newer/"); + assert_eq!(result.snapshots[1].path, "older/"); + } + + #[test] + fn test_snapshot_list_status_and_chunk_summary() { + let schema_only = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + true, + true, + ); + assert_eq!(snapshot_status(&schema_only), "schema-only"); + assert_eq!(format_list_chunks(&schema_only), "0"); + + let complete = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + true, + ); + assert_eq!(snapshot_status(&complete), "complete"); + assert_eq!(format_list_chunks(&complete), "2/2"); + + let incomplete = test_manifest( + chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(), + false, + false, + ); + assert_eq!(snapshot_status(&incomplete), "incomplete"); + assert_eq!(format_list_chunks(&incomplete), "1/2"); + } + + fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) { + let snapshot_dir = root.join(dir); + std::fs::create_dir_all(&snapshot_dir).unwrap(); + std::fs::write( + snapshot_dir.join(MANIFEST_FILE), + serde_json::to_vec_pretty(&manifest).unwrap(), + ) + .unwrap(); + } + + fn test_manifest( + created_at: chrono::DateTime, + schema_only: bool, + complete: bool, + ) -> Manifest { + let mut manifest = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string(), "analytics".to_string()], + schema_only, + TimeRange::unbounded(), + DataFormat::Parquet, + None, + ) + .unwrap(); + manifest.created_at = created_at; + manifest.updated_at = created_at; + + if !schema_only { + manifest.chunks.clear(); + let mut first = ChunkMeta::new(1, TimeRange::unbounded()); + first.mark_completed(vec!["data/public/chunk_1/file.parquet".to_string()], None); + manifest.chunks.push(first); + + if complete { + manifest + .chunks + .push(ChunkMeta::skipped(2, TimeRange::unbounded())); + } else { + manifest + .chunks + .push(ChunkMeta::new(2, TimeRange::unbounded())); + } + } + + manifest + } } diff --git a/src/cli/src/data/snapshot_storage.rs b/src/cli/src/data/snapshot_storage.rs index be94b197df..2ee07b5586 100644 --- a/src/cli/src/data/snapshot_storage.rs +++ b/src/cli/src/data/snapshot_storage.rs @@ -17,6 +17,8 @@ //! This module provides a unified interface for reading and writing snapshot data //! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem). +use std::collections::BTreeSet; + use async_trait::async_trait; use futures::TryStreamExt; use object_store::services::{Azblob, Fs, Gcs, Oss, S3}; @@ -75,7 +77,10 @@ impl StorageScheme { } /// Extracts bucket/container and root path from a URI. -fn extract_remote_location(uri: &str) -> Result { +fn extract_remote_location_with_root_policy( + uri: &str, + allow_empty_root: bool, +) -> Result { let url = Url::parse(uri).context(UrlParseSnafu)?; let bucket_or_container = url.host_str().unwrap_or("").to_string(); if bucket_or_container.is_empty() { @@ -87,7 +92,7 @@ fn extract_remote_location(uri: &str) -> Result { } let root = url.path().trim_start_matches('/').to_string(); - if root.is_empty() { + if root.is_empty() && !allow_empty_root { return InvalidUriSnafu { uri, reason: "snapshot URI must include a non-empty path after the bucket/container", @@ -268,13 +273,21 @@ impl OpenDalStorage { } fn from_s3_uri(uri: &str, storage: &ObjectStoreConfig) -> Result { + Self::from_s3_uri_with_root_policy(uri, storage, false) + } + + fn from_s3_uri_with_root_policy( + uri: &str, + storage: &ObjectStoreConfig, + allow_empty_root: bool, + ) -> Result { Self::ensure_backend_enabled( uri, storage.enable_s3, "s3:// requires --s3 and related options", )?; - let location = extract_remote_location(uri)?; + let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?; let mut config = storage.s3.clone(); config.s3_bucket = location.bucket_or_container; config.s3_root = location.root; @@ -291,13 +304,21 @@ impl OpenDalStorage { } fn from_oss_uri(uri: &str, storage: &ObjectStoreConfig) -> Result { + Self::from_oss_uri_with_root_policy(uri, storage, false) + } + + fn from_oss_uri_with_root_policy( + uri: &str, + storage: &ObjectStoreConfig, + allow_empty_root: bool, + ) -> Result { Self::ensure_backend_enabled( uri, storage.enable_oss, "oss:// requires --oss and related options", )?; - let location = extract_remote_location(uri)?; + let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?; let mut config = storage.oss.clone(); config.oss_bucket = location.bucket_or_container; config.oss_root = location.root; @@ -314,17 +335,30 @@ impl OpenDalStorage { } fn from_gcs_uri(uri: &str, storage: &ObjectStoreConfig) -> Result { + Self::from_gcs_uri_with_root_policy(uri, storage, false) + } + + fn from_gcs_uri_with_root_policy( + uri: &str, + storage: &ObjectStoreConfig, + allow_empty_root: bool, + ) -> Result { Self::ensure_backend_enabled( uri, storage.enable_gcs, "gs:// or gcs:// requires --gcs and related options", )?; - let location = extract_remote_location(uri)?; + let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?; let mut config = storage.gcs.clone(); config.gcs_bucket = location.bucket_or_container; config.gcs_root = location.root; - Self::validate_remote_config(uri, "gcs", config.validate())?; + // GCS validate() rejects empty root, unlike S3/OSS/Azblob. + if allow_empty_root && config.gcs_root.is_empty() { + Self::validate_gcs_parent_config(uri, &config)?; + } else { + Self::validate_remote_config(uri, "gcs", config.validate())?; + } let conn: GcsConnection = config.into(); let object_store = ObjectStore::new(Gcs::from(&conn)) @@ -336,14 +370,43 @@ impl OpenDalStorage { )) } + fn validate_gcs_parent_config( + uri: &str, + config: &crate::common::PrefixedGcsConnection, + ) -> Result<()> { + if config.gcs_bucket.is_empty() { + return InvalidUriSnafu { + uri, + reason: "invalid gcs config: GCS bucket must be set when --gcs is enabled.", + } + .fail(); + } + if config.gcs_scope.is_empty() { + return InvalidUriSnafu { + uri, + reason: "invalid gcs config: GCS scope must be set when --gcs is enabled.", + } + .fail(); + } + Ok(()) + } + fn from_azblob_uri(uri: &str, storage: &ObjectStoreConfig) -> Result { + Self::from_azblob_uri_with_root_policy(uri, storage, false) + } + + fn from_azblob_uri_with_root_policy( + uri: &str, + storage: &ObjectStoreConfig, + allow_empty_root: bool, + ) -> Result { Self::ensure_backend_enabled( uri, storage.enable_azblob, "azblob:// requires --azblob and related options", )?; - let location = extract_remote_location(uri)?; + let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?; let mut config = storage.azblob.clone(); config.azblob_container = location.bucket_or_container; config.azblob_root = location.root; @@ -370,6 +433,21 @@ impl OpenDalStorage { } } + /// Creates storage rooted at a snapshot parent URI. + /// + /// Parent-oriented commands such as `export-v2 list` may scan bucket/container + /// roots. Snapshot-oriented commands must keep using `from_uri`, which rejects + /// empty remote roots to avoid unsafe snapshot operations at bucket scope. + pub fn from_parent_uri(uri: &str, storage: &ObjectStoreConfig) -> Result { + match StorageScheme::from_uri(uri)? { + StorageScheme::File => Self::from_file_uri_with_config(uri, storage), + StorageScheme::S3 => Self::from_s3_uri_with_root_policy(uri, storage, true), + StorageScheme::Oss => Self::from_oss_uri_with_root_policy(uri, storage, true), + StorageScheme::Gcs => Self::from_gcs_uri_with_root_policy(uri, storage, true), + StorageScheme::Azblob => Self::from_azblob_uri_with_root_policy(uri, storage, true), + } + } + /// Reads a file as bytes. async fn read_file(&self, path: &str) -> Result> { let data = self @@ -382,6 +460,17 @@ impl OpenDalStorage { Ok(data.to_vec()) } + /// Reads a file as bytes if it exists. + pub(crate) async fn read_file_if_exists(&self, path: &str) -> Result>> { + match self.object_store.read(path).await { + Ok(data) => Ok(Some(data.to_vec())), + Err(error) if error.kind() == ErrorKind::NotFound => Ok(None), + Err(error) => Err(error).context(StorageOperationSnafu { + operation: format!("read {}", path), + }), + } + } + /// Writes bytes to a file. async fn write_file(&self, path: &str, data: Vec) -> Result<()> { self.object_store @@ -404,6 +493,37 @@ impl OpenDalStorage { } } + /// Lists direct child directory names under the storage root. + pub(crate) async fn list_direct_child_dirs(&self) -> Result> { + let mut lister = match self.object_store.lister_with("/").recursive(false).await { + Ok(lister) => lister, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()), + Err(error) => { + return Err(error).context(StorageOperationSnafu { + operation: "list /", + }); + } + }; + + let mut dirs = BTreeSet::new(); + while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu { + operation: "list /", + })? { + let path = entry.path().trim_matches('/'); + if path.is_empty() { + continue; + } + + if entry.metadata().is_dir() + && let Some(name) = path.split('/').next() + { + dirs.insert(name.to_string()); + } + } + + Ok(dirs.into_iter().collect()) + } + #[cfg(test)] pub async fn read_schema(&self) -> Result { let schemas_path = schema_index_path(); @@ -557,11 +677,35 @@ mod tests { #[test] fn test_extract_remote_location_requires_non_empty_root() { - assert!(extract_remote_location("s3://bucket").is_err()); - assert!(extract_remote_location("s3://bucket/").is_err()); - assert!(extract_remote_location("oss://bucket").is_err()); - assert!(extract_remote_location("gs://bucket").is_err()); - assert!(extract_remote_location("azblob://container").is_err()); + assert!(extract_remote_location_with_root_policy("s3://bucket", false).is_err()); + assert!(extract_remote_location_with_root_policy("s3://bucket/", false).is_err()); + assert!(extract_remote_location_with_root_policy("oss://bucket", false).is_err()); + assert!(extract_remote_location_with_root_policy("gs://bucket", false).is_err()); + assert!(extract_remote_location_with_root_policy("azblob://container", false).is_err()); + } + + #[test] + fn test_extract_remote_location_allows_empty_root_when_permitted() { + let location = extract_remote_location_with_root_policy("s3://bucket", true).unwrap(); + assert_eq!(location.bucket_or_container, "bucket"); + assert_eq!(location.root, ""); + + let location = + extract_remote_location_with_root_policy("azblob://container/", true).unwrap(); + assert_eq!(location.bucket_or_container, "container"); + assert_eq!(location.root, ""); + } + + #[test] + fn test_parent_storage_allows_s3_bucket_root() { + let mut storage = ObjectStoreConfig { + enable_s3: true, + ..Default::default() + }; + storage.s3.s3_region = Some("us-east-1".to_string()); + + assert!(OpenDalStorage::from_uri("s3://bucket", &storage).is_err()); + assert!(OpenDalStorage::from_parent_uri("s3://bucket", &storage).is_ok()); } #[cfg(not(windows))]