feat(cli): allow overriding import-v2 state path

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-06-15 21:16:49 +08:00
parent 4373c77d35
commit e3f1ef55b6

View File

@@ -15,6 +15,7 @@
//! Import V2 CLI command.
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::time::Duration;
use async_trait::async_trait;
@@ -75,6 +76,12 @@ pub struct ImportV2Command {
#[clap(long, default_value = "1", value_parser = parse_task_parallelism)]
task_parallelism: usize,
/// Override the import resume state file path.
///
/// Defaults to a stable path under `~/.greptime/import_state`.
#[clap(long)]
state_path: Option<PathBuf>,
/// Basic authentication (user:password).
#[clap(long)]
auth_basic: Option<String>,
@@ -137,6 +144,7 @@ impl ImportV2Command {
dry_run: self.dry_run,
progress: self.progress,
task_parallelism: self.task_parallelism,
state_path: self.state_path.clone(),
snapshot_uri: self.from.clone(),
storage_config: self.storage.clone(),
storage: Box::new(storage),
@@ -145,6 +153,26 @@ impl ImportV2Command {
}
}
/// Resolves the import resume state file path. When `override_path` is set it is
/// used verbatim; otherwise the stable default under `~/.greptime/import_state`
/// is derived from the import identity.
fn resolve_state_path(
override_path: Option<&Path>,
snapshot_id: &str,
target_addr: &str,
catalog: &str,
schemas: &[String],
) -> Result<PathBuf> {
if let Some(path) = override_path {
return Ok(path.to_path_buf());
}
default_state_path(snapshot_id, target_addr, catalog, schemas).context(
ImportStatePathUnavailableSnafu {
snapshot_id: snapshot_id.to_string(),
},
)
}
fn parse_task_parallelism(value: &str) -> std::result::Result<usize, String> {
let parallelism = value
.parse::<usize>()
@@ -163,6 +191,7 @@ pub struct Import {
dry_run: bool,
progress: ProgressMode,
task_parallelism: usize,
state_path: Option<PathBuf>,
snapshot_uri: String,
storage_config: ObjectStoreConfig,
storage: Box<dyn SnapshotStorage>,
@@ -241,15 +270,13 @@ impl Import {
}
let mut resume_session = if !data_tasks.is_empty() {
let state_path = default_state_path(
let state_path = resolve_state_path(
self.state_path.as_deref(),
&manifest.snapshot_id.to_string(),
self.database_client.addr(),
&self.catalog,
&schemas_to_import,
)
.context(ImportStatePathUnavailableSnafu {
snapshot_id: manifest.snapshot_id.to_string(),
})?;
)?;
Some(
prepare_import_resume(ImportResumeConfig {
snapshot_id: manifest.snapshot_id.to_string(),
@@ -770,6 +797,53 @@ mod tests {
);
}
#[test]
fn test_state_path_defaults_to_none() {
assert_eq!(parse_command(&[]).state_path, None);
}
#[test]
fn test_state_path_parses_explicit_value() {
assert_eq!(
parse_command(&["--state-path", "/tmp/import_state.json"]).state_path,
Some(PathBuf::from("/tmp/import_state.json"))
);
}
#[test]
fn test_resolve_state_path_prefers_override() {
let override_path = PathBuf::from("/tmp/custom_import_state.json");
let resolved = resolve_state_path(
Some(override_path.as_path()),
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&["public".to_string()],
)
.unwrap();
assert_eq!(resolved, override_path);
}
#[test]
fn test_resolve_state_path_uses_default_when_absent() {
let resolved = resolve_state_path(
None,
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&["public".to_string()],
)
.unwrap();
let expected = default_state_path(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&["public".to_string()],
)
.unwrap();
assert_eq!(resolved, expected);
}
#[test]
fn test_task_parallelism_rejects_invalid_values() {
for value in ["0", "65", "abc"] {