From e3f1ef55b6c904d4787f12cef0fbfafc36040945 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 15 Jun 2026 21:16:49 +0800 Subject: [PATCH] feat(cli): allow overriding import-v2 state path Signed-off-by: jeremyhi --- src/cli/src/data/import_v2/command.rs | 84 +++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 5 deletions(-) diff --git a/src/cli/src/data/import_v2/command.rs b/src/cli/src/data/import_v2/command.rs index 43bef46be0..d885c4fc78 100644 --- a/src/cli/src/data/import_v2/command.rs +++ b/src/cli/src/data/import_v2/command.rs @@ -15,6 +15,7 @@ //! Import V2 CLI command. use std::collections::HashSet; +use std::path::{Path, PathBuf}; use std::time::Duration; use async_trait::async_trait; @@ -75,6 +76,12 @@ pub struct ImportV2Command { #[clap(long, default_value = "1", value_parser = parse_task_parallelism)] task_parallelism: usize, + /// Override the import resume state file path. + /// + /// Defaults to a stable path under `~/.greptime/import_state`. + #[clap(long)] + state_path: Option, + /// Basic authentication (user:password). #[clap(long)] auth_basic: Option, @@ -137,6 +144,7 @@ impl ImportV2Command { dry_run: self.dry_run, progress: self.progress, task_parallelism: self.task_parallelism, + state_path: self.state_path.clone(), snapshot_uri: self.from.clone(), storage_config: self.storage.clone(), storage: Box::new(storage), @@ -145,6 +153,26 @@ impl ImportV2Command { } } +/// Resolves the import resume state file path. When `override_path` is set it is +/// used verbatim; otherwise the stable default under `~/.greptime/import_state` +/// is derived from the import identity. +fn resolve_state_path( + override_path: Option<&Path>, + snapshot_id: &str, + target_addr: &str, + catalog: &str, + schemas: &[String], +) -> Result { + if let Some(path) = override_path { + return Ok(path.to_path_buf()); + } + default_state_path(snapshot_id, target_addr, catalog, schemas).context( + ImportStatePathUnavailableSnafu { + snapshot_id: snapshot_id.to_string(), + }, + ) +} + fn parse_task_parallelism(value: &str) -> std::result::Result { let parallelism = value .parse::() @@ -163,6 +191,7 @@ pub struct Import { dry_run: bool, progress: ProgressMode, task_parallelism: usize, + state_path: Option, snapshot_uri: String, storage_config: ObjectStoreConfig, storage: Box, @@ -241,15 +270,13 @@ impl Import { } let mut resume_session = if !data_tasks.is_empty() { - let state_path = default_state_path( + let state_path = resolve_state_path( + self.state_path.as_deref(), &manifest.snapshot_id.to_string(), self.database_client.addr(), &self.catalog, &schemas_to_import, - ) - .context(ImportStatePathUnavailableSnafu { - snapshot_id: manifest.snapshot_id.to_string(), - })?; + )?; Some( prepare_import_resume(ImportResumeConfig { snapshot_id: manifest.snapshot_id.to_string(), @@ -770,6 +797,53 @@ mod tests { ); } + #[test] + fn test_state_path_defaults_to_none() { + assert_eq!(parse_command(&[]).state_path, None); + } + + #[test] + fn test_state_path_parses_explicit_value() { + assert_eq!( + parse_command(&["--state-path", "/tmp/import_state.json"]).state_path, + Some(PathBuf::from("/tmp/import_state.json")) + ); + } + + #[test] + fn test_resolve_state_path_prefers_override() { + let override_path = PathBuf::from("/tmp/custom_import_state.json"); + let resolved = resolve_state_path( + Some(override_path.as_path()), + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &["public".to_string()], + ) + .unwrap(); + assert_eq!(resolved, override_path); + } + + #[test] + fn test_resolve_state_path_uses_default_when_absent() { + let resolved = resolve_state_path( + None, + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &["public".to_string()], + ) + .unwrap(); + let expected = default_state_path( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &["public".to_string()], + ) + .unwrap(); + assert_eq!(resolved, expected); + } + #[test] fn test_task_parallelism_rejects_invalid_values() { for value in ["0", "65", "abc"] {