feat: add export-v2 snapshot listing (#8096)

* feat: add export-v2 snapshot listing

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: allow export-v2 list at storage roots

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-05-12 20:04:04 -07:00
committed by GitHub
parent 117a460d34
commit 0518567e5a
2 changed files with 462 additions and 13 deletions

View File

@@ -34,7 +34,7 @@ use crate::data::export_v2::error::{
};
use crate::data::export_v2::extractor::SchemaExtractor;
use crate::data::export_v2::manifest::{
ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange,
ChunkMeta, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange,
};
use crate::data::path::ddl_path_for_schema;
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
@@ -46,16 +46,73 @@ use crate::database::{DatabaseClient, parse_proxy_opts};
pub enum ExportV2Command {
/// Create a new snapshot.
Create(ExportCreateCommand),
/// List snapshots under a parent location.
List(ExportListCommand),
}
impl ExportV2Command {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
ExportV2Command::Create(cmd) => cmd.build().await,
ExportV2Command::List(cmd) => cmd.build().await,
}
}
}
/// List snapshots under a parent location.
#[derive(Debug, Parser)]
pub struct ExportListCommand {
/// Parent storage location whose direct subdirectories are snapshots.
#[clap(long)]
location: String,
/// Object store configuration for remote storage backends.
#[clap(flatten)]
storage: ObjectStoreConfig,
}
impl ExportListCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
validate_uri(&self.location).map_err(BoxedError::new)?;
let storage = OpenDalStorage::from_parent_uri(&self.location, &self.storage)
.map_err(BoxedError::new)?;
Ok(Box::new(ExportList {
location: self.location.clone(),
storage,
}))
}
}
/// Export list tool implementation.
pub struct ExportList {
location: String,
storage: OpenDalStorage,
}
#[async_trait]
impl Tool for ExportList {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
self.run().await.map_err(BoxedError::new)
}
}
impl ExportList {
async fn run(&self) -> Result<()> {
let result = scan_snapshots(&self.storage).await?;
println!("Scanning: {}", self.location);
if result.snapshots.is_empty() {
println!("No snapshots found.");
} else {
print_snapshot_list(&result.snapshots, result.unreadable.len());
}
print_unreadable_warnings(&result.unreadable);
Ok(())
}
}
/// Create a new snapshot.
#[derive(Debug, Parser)]
pub struct ExportCreateCommand {
@@ -628,10 +685,138 @@ fn format_chunk_plan(chunks: &[ChunkMeta]) -> String {
format!("[{}]", items.join(", "))
}
#[derive(Debug)]
struct SnapshotListEntry {
path: String,
manifest: Manifest,
}
#[derive(Debug, Default)]
struct SnapshotScanResult {
snapshots: Vec<SnapshotListEntry>,
unreadable: Vec<String>,
}
async fn scan_snapshots(storage: &OpenDalStorage) -> Result<SnapshotScanResult> {
let mut result = SnapshotScanResult::default();
for dir in storage.list_direct_child_dirs().await? {
let manifest_path = format!("{}/{}", dir.trim_matches('/'), MANIFEST_FILE);
let Some(data) = storage.read_file_if_exists(&manifest_path).await? else {
continue;
};
match serde_json::from_slice::<Manifest>(&data) {
Ok(manifest) => result.snapshots.push(SnapshotListEntry {
path: format!("{}/", dir.trim_matches('/')),
manifest,
}),
Err(_) => result
.unreadable
.push(format!("{}/", dir.trim_matches('/'))),
}
}
result
.snapshots
.sort_by_key(|entry| std::cmp::Reverse(entry.manifest.created_at));
result.unreadable.sort();
Ok(result)
}
fn print_snapshot_list(snapshots: &[SnapshotListEntry], unreadable_count: usize) {
if unreadable_count == 0 {
println!("Found {} snapshots:", snapshots.len());
} else {
println!(
"Found {} snapshots ({} {} skipped: unreadable manifest):",
snapshots.len(),
unreadable_count,
directory_word(unreadable_count)
);
}
println!();
println!(
" {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} Status",
"Path", "ID", "Created", "Catalog", "Schemas", "Chunks"
);
println!(
" {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} {:<10}",
"-".repeat(24),
"-".repeat(36),
"-".repeat(19),
"-".repeat(9),
"-".repeat(7),
"-".repeat(6),
"-".repeat(10)
);
for entry in snapshots {
let manifest = &entry.manifest;
println!(
" {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} {}",
entry.path,
manifest.snapshot_id,
manifest.created_at.format("%Y-%m-%d %H:%M:%S"),
manifest.catalog,
manifest.schemas.len(),
format_list_chunks(manifest),
snapshot_status(manifest)
);
}
}
fn print_unreadable_warnings(unreadable: &[String]) {
if unreadable.is_empty() {
return;
}
println!();
println!(
"Warning: {} {} had corrupt/unreadable manifest.json:",
unreadable.len(),
directory_word(unreadable.len())
);
for path in unreadable {
println!(" - {}", path);
}
}
fn directory_word(count: usize) -> &'static str {
if count == 1 {
"directory"
} else {
"directories"
}
}
fn snapshot_status(manifest: &Manifest) -> &'static str {
if manifest.schema_only {
"schema-only"
} else if manifest.is_complete() {
"complete"
} else {
"incomplete"
}
}
fn format_list_chunks(manifest: &Manifest) -> String {
let total = manifest.chunks.len();
if total == 0 {
return "0".to_string();
}
format!(
"{}/{}",
manifest.completed_count() + manifest.skipped_count(),
total
)
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use clap::Parser;
use tempfile::tempdir;
use url::Url;
use super::*;
use crate::data::path::ddl_path_for_schema;
@@ -886,4 +1071,124 @@ mod tests {
.to_string();
assert!(error.contains("time_range"));
}
#[tokio::test]
async fn test_scan_snapshots_sorts_and_tracks_unreadable_manifests() {
let dir = tempdir().unwrap();
write_test_manifest(
dir.path(),
"older",
test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
),
);
write_test_manifest(
dir.path(),
"newer",
test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap(),
false,
true,
),
);
std::fs::create_dir_all(dir.path().join("empty-dir")).unwrap();
std::fs::create_dir_all(dir.path().join("not-snapshot")).unwrap();
std::fs::write(dir.path().join("not-snapshot").join("data.txt"), "x").unwrap();
std::fs::create_dir_all(dir.path().join("broken")).unwrap();
std::fs::write(dir.path().join("broken").join(MANIFEST_FILE), "{not-json").unwrap();
let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
let storage = OpenDalStorage::from_file_uri(&uri).unwrap();
let result = scan_snapshots(&storage).await.unwrap();
assert_eq!(result.snapshots.len(), 2);
assert_eq!(
result.snapshots[0].manifest.created_at,
chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap()
);
assert_eq!(
result.snapshots[1].manifest.created_at,
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap()
);
assert_eq!(result.unreadable, vec!["broken/".to_string()]);
assert_eq!(result.snapshots[0].path, "newer/");
assert_eq!(result.snapshots[1].path, "older/");
}
#[test]
fn test_snapshot_list_status_and_chunk_summary() {
let schema_only = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
true,
true,
);
assert_eq!(snapshot_status(&schema_only), "schema-only");
assert_eq!(format_list_chunks(&schema_only), "0");
let complete = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
assert_eq!(snapshot_status(&complete), "complete");
assert_eq!(format_list_chunks(&complete), "2/2");
let incomplete = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
false,
);
assert_eq!(snapshot_status(&incomplete), "incomplete");
assert_eq!(format_list_chunks(&incomplete), "1/2");
}
fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) {
let snapshot_dir = root.join(dir);
std::fs::create_dir_all(&snapshot_dir).unwrap();
std::fs::write(
snapshot_dir.join(MANIFEST_FILE),
serde_json::to_vec_pretty(&manifest).unwrap(),
)
.unwrap();
}
fn test_manifest(
created_at: chrono::DateTime<chrono::Utc>,
schema_only: bool,
complete: bool,
) -> Manifest {
let mut manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string(), "analytics".to_string()],
schema_only,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
manifest.created_at = created_at;
manifest.updated_at = created_at;
if !schema_only {
manifest.chunks.clear();
let mut first = ChunkMeta::new(1, TimeRange::unbounded());
first.mark_completed(vec!["data/public/chunk_1/file.parquet".to_string()], None);
manifest.chunks.push(first);
if complete {
manifest
.chunks
.push(ChunkMeta::skipped(2, TimeRange::unbounded()));
} else {
manifest
.chunks
.push(ChunkMeta::new(2, TimeRange::unbounded()));
}
}
manifest
}
}

View File

@@ -17,6 +17,8 @@
//! This module provides a unified interface for reading and writing snapshot data
//! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem).
use std::collections::BTreeSet;
use async_trait::async_trait;
use futures::TryStreamExt;
use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
@@ -75,7 +77,10 @@ impl StorageScheme {
}
/// Extracts bucket/container and root path from a URI.
fn extract_remote_location(uri: &str) -> Result<RemoteLocation> {
fn extract_remote_location_with_root_policy(
uri: &str,
allow_empty_root: bool,
) -> Result<RemoteLocation> {
let url = Url::parse(uri).context(UrlParseSnafu)?;
let bucket_or_container = url.host_str().unwrap_or("").to_string();
if bucket_or_container.is_empty() {
@@ -87,7 +92,7 @@ fn extract_remote_location(uri: &str) -> Result<RemoteLocation> {
}
let root = url.path().trim_start_matches('/').to_string();
if root.is_empty() {
if root.is_empty() && !allow_empty_root {
return InvalidUriSnafu {
uri,
reason: "snapshot URI must include a non-empty path after the bucket/container",
@@ -268,13 +273,21 @@ impl OpenDalStorage {
}
fn from_s3_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_s3_uri_with_root_policy(uri, storage, false)
}
fn from_s3_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_s3,
"s3:// requires --s3 and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.s3.clone();
config.s3_bucket = location.bucket_or_container;
config.s3_root = location.root;
@@ -291,13 +304,21 @@ impl OpenDalStorage {
}
fn from_oss_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_oss_uri_with_root_policy(uri, storage, false)
}
fn from_oss_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_oss,
"oss:// requires --oss and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.oss.clone();
config.oss_bucket = location.bucket_or_container;
config.oss_root = location.root;
@@ -314,17 +335,30 @@ impl OpenDalStorage {
}
fn from_gcs_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_gcs_uri_with_root_policy(uri, storage, false)
}
fn from_gcs_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_gcs,
"gs:// or gcs:// requires --gcs and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.gcs.clone();
config.gcs_bucket = location.bucket_or_container;
config.gcs_root = location.root;
Self::validate_remote_config(uri, "gcs", config.validate())?;
// GCS validate() rejects empty root, unlike S3/OSS/Azblob.
if allow_empty_root && config.gcs_root.is_empty() {
Self::validate_gcs_parent_config(uri, &config)?;
} else {
Self::validate_remote_config(uri, "gcs", config.validate())?;
}
let conn: GcsConnection = config.into();
let object_store = ObjectStore::new(Gcs::from(&conn))
@@ -336,14 +370,43 @@ impl OpenDalStorage {
))
}
fn validate_gcs_parent_config(
uri: &str,
config: &crate::common::PrefixedGcsConnection,
) -> Result<()> {
if config.gcs_bucket.is_empty() {
return InvalidUriSnafu {
uri,
reason: "invalid gcs config: GCS bucket must be set when --gcs is enabled.",
}
.fail();
}
if config.gcs_scope.is_empty() {
return InvalidUriSnafu {
uri,
reason: "invalid gcs config: GCS scope must be set when --gcs is enabled.",
}
.fail();
}
Ok(())
}
fn from_azblob_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_azblob_uri_with_root_policy(uri, storage, false)
}
fn from_azblob_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_azblob,
"azblob:// requires --azblob and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.azblob.clone();
config.azblob_container = location.bucket_or_container;
config.azblob_root = location.root;
@@ -370,6 +433,21 @@ impl OpenDalStorage {
}
}
/// Creates storage rooted at a snapshot parent URI.
///
/// Parent-oriented commands such as `export-v2 list` may scan bucket/container
/// roots. Snapshot-oriented commands must keep using `from_uri`, which rejects
/// empty remote roots to avoid unsafe snapshot operations at bucket scope.
pub fn from_parent_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
match StorageScheme::from_uri(uri)? {
StorageScheme::File => Self::from_file_uri_with_config(uri, storage),
StorageScheme::S3 => Self::from_s3_uri_with_root_policy(uri, storage, true),
StorageScheme::Oss => Self::from_oss_uri_with_root_policy(uri, storage, true),
StorageScheme::Gcs => Self::from_gcs_uri_with_root_policy(uri, storage, true),
StorageScheme::Azblob => Self::from_azblob_uri_with_root_policy(uri, storage, true),
}
}
/// Reads a file as bytes.
async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
let data = self
@@ -382,6 +460,17 @@ impl OpenDalStorage {
Ok(data.to_vec())
}
/// Reads a file as bytes if it exists.
pub(crate) async fn read_file_if_exists(&self, path: &str) -> Result<Option<Vec<u8>>> {
match self.object_store.read(path).await {
Ok(data) => Ok(Some(data.to_vec())),
Err(error) if error.kind() == ErrorKind::NotFound => Ok(None),
Err(error) => Err(error).context(StorageOperationSnafu {
operation: format!("read {}", path),
}),
}
}
/// Writes bytes to a file.
async fn write_file(&self, path: &str, data: Vec<u8>) -> Result<()> {
self.object_store
@@ -404,6 +493,37 @@ impl OpenDalStorage {
}
}
/// Lists direct child directory names under the storage root.
pub(crate) async fn list_direct_child_dirs(&self) -> Result<Vec<String>> {
let mut lister = match self.object_store.lister_with("/").recursive(false).await {
Ok(lister) => lister,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(error) => {
return Err(error).context(StorageOperationSnafu {
operation: "list /",
});
}
};
let mut dirs = BTreeSet::new();
while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu {
operation: "list /",
})? {
let path = entry.path().trim_matches('/');
if path.is_empty() {
continue;
}
if entry.metadata().is_dir()
&& let Some(name) = path.split('/').next()
{
dirs.insert(name.to_string());
}
}
Ok(dirs.into_iter().collect())
}
#[cfg(test)]
pub async fn read_schema(&self) -> Result<SchemaSnapshot> {
let schemas_path = schema_index_path();
@@ -557,11 +677,35 @@ mod tests {
#[test]
fn test_extract_remote_location_requires_non_empty_root() {
assert!(extract_remote_location("s3://bucket").is_err());
assert!(extract_remote_location("s3://bucket/").is_err());
assert!(extract_remote_location("oss://bucket").is_err());
assert!(extract_remote_location("gs://bucket").is_err());
assert!(extract_remote_location("azblob://container").is_err());
assert!(extract_remote_location_with_root_policy("s3://bucket", false).is_err());
assert!(extract_remote_location_with_root_policy("s3://bucket/", false).is_err());
assert!(extract_remote_location_with_root_policy("oss://bucket", false).is_err());
assert!(extract_remote_location_with_root_policy("gs://bucket", false).is_err());
assert!(extract_remote_location_with_root_policy("azblob://container", false).is_err());
}
#[test]
fn test_extract_remote_location_allows_empty_root_when_permitted() {
let location = extract_remote_location_with_root_policy("s3://bucket", true).unwrap();
assert_eq!(location.bucket_or_container, "bucket");
assert_eq!(location.root, "");
let location =
extract_remote_location_with_root_policy("azblob://container/", true).unwrap();
assert_eq!(location.bucket_or_container, "container");
assert_eq!(location.root, "");
}
#[test]
fn test_parent_storage_allows_s3_bucket_root() {
let mut storage = ObjectStoreConfig {
enable_s3: true,
..Default::default()
};
storage.s3.s3_region = Some("us-east-1".to_string());
assert!(OpenDalStorage::from_uri("s3://bucket", &storage).is_err());
assert!(OpenDalStorage::from_parent_uri("s3://bucket", &storage).is_ok());
}
#[cfg(not(windows))]