From 3aabeef192c86f6e3447ce83525404de98f0481f Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 16 Jun 2026 23:19:26 +0800 Subject: [PATCH] test(cli): add minio export-import v2 e2e Signed-off-by: jeremyhi --- src/cli/src/data/export_v2/tests.rs | 204 +++++++++++++++++++++++++++- 1 file changed, 203 insertions(+), 1 deletion(-) diff --git a/src/cli/src/data/export_v2/tests.rs b/src/cli/src/data/export_v2/tests.rs index fd96aff6b1..501b9f48ba 100644 --- a/src/cli/src/data/export_v2/tests.rs +++ b/src/cli/src/data/export_v2/tests.rs @@ -22,7 +22,7 @@ use snafu::ResultExt; use tempfile::tempdir; use url::Url; -use super::command::{ExportCreateCommand, ExportVerifyCommand}; +use super::command::{ExportCreateCommand, ExportDeleteCommand, ExportVerifyCommand}; use crate::common::ObjectStoreConfig; use crate::data::export_v2::manifest::ChunkStatus; use crate::data::import_v2::ImportV2Command; @@ -552,6 +552,208 @@ async fn export_import_v2_data_roundtrip_e2e() -> Result<()> { Ok(()) } +#[tokio::test] +#[ignore] +async fn export_import_v2_minio_roundtrip_e2e() -> Result<()> { + let required_env = [ + "GT_MINIO_BUCKET", + "GT_MINIO_ACCESS_KEY_ID", + "GT_MINIO_ACCESS_KEY", + "GT_MINIO_ENDPOINT_URL", + ]; + let missing_env = required_env + .iter() + .filter(|key| env::var(key).is_err()) + .copied() + .collect::>(); + if !missing_env.is_empty() { + eprintln!( + "skipping export_import_v2_minio_roundtrip_e2e; missing env vars: {}", + missing_env.join(", ") + ); + return Ok(()); + } + + let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string()); + let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string()); + let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok(); + let bucket = env::var("GT_MINIO_BUCKET").expect("checked above"); + let access_key_id = env::var("GT_MINIO_ACCESS_KEY_ID").expect("checked above"); + let secret_access_key = env::var("GT_MINIO_ACCESS_KEY").expect("checked above"); + let endpoint = env::var("GT_MINIO_ENDPOINT_URL").expect("checked above"); + let region = env::var("GT_MINIO_REGION").unwrap_or_else(|_| "us-west-2".to_string()); + let schema = "test_db_minio_roundtrip"; + let snapshot_uri = format!( + "s3://{}/export-import-v2-e2e/{}", + bucket, + uuid::Uuid::new_v4() + ); + + let append_common_storage_args = |args: &mut Vec| { + args.extend([ + "--s3".to_string(), + "--s3-region".to_string(), + region.clone(), + "--s3-access-key-id".to_string(), + access_key_id.clone(), + "--s3-secret-access-key".to_string(), + secret_access_key.clone(), + "--s3-endpoint".to_string(), + endpoint.clone(), + ]); + }; + let append_auth_args = |args: &mut Vec| { + if let Some(auth) = &auth_basic { + args.extend(["--auth-basic".to_string(), auth.clone()]); + } + }; + + let database_client = DatabaseClient::new( + addr.clone(), + catalog.clone(), + auth_basic.clone(), + Duration::from_secs(60), + None, + false, + ); + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + database_client + .sql_in_public(&format!("CREATE DATABASE {schema}")) + .await?; + database_client + .sql( + "CREATE TABLE metrics (\ + ts TIMESTAMP TIME INDEX, \ + host STRING PRIMARY KEY, \ + cpu DOUBLE \ + ) ENGINE=mito", + schema, + ) + .await?; + database_client + .sql( + "INSERT INTO metrics (ts, host, cpu) VALUES \ + ('2025-01-01T00:00:00Z', 'minio-h1', 1.0), \ + ('2025-01-01T01:00:00Z', 'minio-h2', 2.0)", + schema, + ) + .await?; + + let expected_rows = query_count(&database_client, schema, "metrics").await?; + assert_eq!(expected_rows, 2); + + let mut export_args = vec![ + "export-v2-create".to_string(), + "--addr".to_string(), + addr.clone(), + "--to".to_string(), + snapshot_uri.clone(), + "--catalog".to_string(), + catalog.clone(), + "--schemas".to_string(), + schema.to_string(), + "--start-time".to_string(), + "2025-01-01T00:00:00Z".to_string(), + "--end-time".to_string(), + "2025-01-01T02:00:00Z".to_string(), + "--chunk-time-window".to_string(), + "1h".to_string(), + "--chunk-parallelism".to_string(), + "2".to_string(), + "--progress".to_string(), + "never".to_string(), + ]; + append_auth_args(&mut export_args); + append_common_storage_args(&mut export_args); + let export_cmd = ExportCreateCommand::parse_from(export_args); + export_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + let mut verify_args = vec![ + "export-v2-verify".to_string(), + "--snapshot".to_string(), + snapshot_uri.clone(), + ]; + append_common_storage_args(&mut verify_args); + let verify_cmd = ExportVerifyCommand::parse_from(verify_args); + verify_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + + let import_state_dir = tempdir().context(FileIoSnafu)?; + let import_state_path = import_state_dir.path().join("import-state.json"); + let import_state_path = import_state_path.to_string_lossy().into_owned(); + let mut import_args = vec![ + "import-v2".to_string(), + "--addr".to_string(), + addr.clone(), + "--from".to_string(), + snapshot_uri.clone(), + "--catalog".to_string(), + catalog.clone(), + "--schemas".to_string(), + schema.to_string(), + "--task-parallelism".to_string(), + "2".to_string(), + "--state-path".to_string(), + import_state_path, + "--progress".to_string(), + "never".to_string(), + ]; + append_auth_args(&mut import_args); + append_common_storage_args(&mut import_args); + let import_cmd = ImportV2Command::parse_from(import_args); + import_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + let actual_rows = query_count(&database_client, schema, "metrics").await?; + assert_eq!(actual_rows, expected_rows); + + let mut delete_args = vec![ + "export-v2-delete".to_string(), + "--snapshot".to_string(), + snapshot_uri.clone(), + "--no-confirm".to_string(), + ]; + append_common_storage_args(&mut delete_args); + let delete_cmd = ExportDeleteCommand::parse_from(delete_args); + match delete_cmd.build().await { + Ok(delete) => { + if let Err(err) = delete.do_work().await { + eprintln!("best-effort failed to delete snapshot {snapshot_uri}: {err}"); + } + } + Err(err) => eprintln!("best-effort failed to build delete for {snapshot_uri}: {err}"), + } + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + + Ok(()) +} + #[tokio::test] #[ignore] async fn import_v2_resume_from_completed_chunk_e2e() -> Result<()> {