test(cli): add minio export-import v2 e2e

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-06-16 23:19:26 +08:00
parent 301e8f72b0
commit 3aabeef192

View File

@@ -22,7 +22,7 @@ use snafu::ResultExt;
use tempfile::tempdir;
use url::Url;
use super::command::{ExportCreateCommand, ExportVerifyCommand};
use super::command::{ExportCreateCommand, ExportDeleteCommand, ExportVerifyCommand};
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::manifest::ChunkStatus;
use crate::data::import_v2::ImportV2Command;
@@ -552,6 +552,208 @@ async fn export_import_v2_data_roundtrip_e2e() -> Result<()> {
Ok(())
}
#[tokio::test]
#[ignore]
async fn export_import_v2_minio_roundtrip_e2e() -> Result<()> {
let required_env = [
"GT_MINIO_BUCKET",
"GT_MINIO_ACCESS_KEY_ID",
"GT_MINIO_ACCESS_KEY",
"GT_MINIO_ENDPOINT_URL",
];
let missing_env = required_env
.iter()
.filter(|key| env::var(key).is_err())
.copied()
.collect::<Vec<_>>();
if !missing_env.is_empty() {
eprintln!(
"skipping export_import_v2_minio_roundtrip_e2e; missing env vars: {}",
missing_env.join(", ")
);
return Ok(());
}
let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string());
let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string());
let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok();
let bucket = env::var("GT_MINIO_BUCKET").expect("checked above");
let access_key_id = env::var("GT_MINIO_ACCESS_KEY_ID").expect("checked above");
let secret_access_key = env::var("GT_MINIO_ACCESS_KEY").expect("checked above");
let endpoint = env::var("GT_MINIO_ENDPOINT_URL").expect("checked above");
let region = env::var("GT_MINIO_REGION").unwrap_or_else(|_| "us-west-2".to_string());
let schema = "test_db_minio_roundtrip";
let snapshot_uri = format!(
"s3://{}/export-import-v2-e2e/{}",
bucket,
uuid::Uuid::new_v4()
);
let append_common_storage_args = |args: &mut Vec<String>| {
args.extend([
"--s3".to_string(),
"--s3-region".to_string(),
region.clone(),
"--s3-access-key-id".to_string(),
access_key_id.clone(),
"--s3-secret-access-key".to_string(),
secret_access_key.clone(),
"--s3-endpoint".to_string(),
endpoint.clone(),
]);
};
let append_auth_args = |args: &mut Vec<String>| {
if let Some(auth) = &auth_basic {
args.extend(["--auth-basic".to_string(), auth.clone()]);
}
};
let database_client = DatabaseClient::new(
addr.clone(),
catalog.clone(),
auth_basic.clone(),
Duration::from_secs(60),
None,
false,
);
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
database_client
.sql_in_public(&format!("CREATE DATABASE {schema}"))
.await?;
database_client
.sql(
"CREATE TABLE metrics (\
ts TIMESTAMP TIME INDEX, \
host STRING PRIMARY KEY, \
cpu DOUBLE \
) ENGINE=mito",
schema,
)
.await?;
database_client
.sql(
"INSERT INTO metrics (ts, host, cpu) VALUES \
('2025-01-01T00:00:00Z', 'minio-h1', 1.0), \
('2025-01-01T01:00:00Z', 'minio-h2', 2.0)",
schema,
)
.await?;
let expected_rows = query_count(&database_client, schema, "metrics").await?;
assert_eq!(expected_rows, 2);
let mut export_args = vec![
"export-v2-create".to_string(),
"--addr".to_string(),
addr.clone(),
"--to".to_string(),
snapshot_uri.clone(),
"--catalog".to_string(),
catalog.clone(),
"--schemas".to_string(),
schema.to_string(),
"--start-time".to_string(),
"2025-01-01T00:00:00Z".to_string(),
"--end-time".to_string(),
"2025-01-01T02:00:00Z".to_string(),
"--chunk-time-window".to_string(),
"1h".to_string(),
"--chunk-parallelism".to_string(),
"2".to_string(),
"--progress".to_string(),
"never".to_string(),
];
append_auth_args(&mut export_args);
append_common_storage_args(&mut export_args);
let export_cmd = ExportCreateCommand::parse_from(export_args);
export_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
let mut verify_args = vec![
"export-v2-verify".to_string(),
"--snapshot".to_string(),
snapshot_uri.clone(),
];
append_common_storage_args(&mut verify_args);
let verify_cmd = ExportVerifyCommand::parse_from(verify_args);
verify_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
let import_state_dir = tempdir().context(FileIoSnafu)?;
let import_state_path = import_state_dir.path().join("import-state.json");
let import_state_path = import_state_path.to_string_lossy().into_owned();
let mut import_args = vec![
"import-v2".to_string(),
"--addr".to_string(),
addr.clone(),
"--from".to_string(),
snapshot_uri.clone(),
"--catalog".to_string(),
catalog.clone(),
"--schemas".to_string(),
schema.to_string(),
"--task-parallelism".to_string(),
"2".to_string(),
"--state-path".to_string(),
import_state_path,
"--progress".to_string(),
"never".to_string(),
];
append_auth_args(&mut import_args);
append_common_storage_args(&mut import_args);
let import_cmd = ImportV2Command::parse_from(import_args);
import_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
let actual_rows = query_count(&database_client, schema, "metrics").await?;
assert_eq!(actual_rows, expected_rows);
let mut delete_args = vec![
"export-v2-delete".to_string(),
"--snapshot".to_string(),
snapshot_uri.clone(),
"--no-confirm".to_string(),
];
append_common_storage_args(&mut delete_args);
let delete_cmd = ExportDeleteCommand::parse_from(delete_args);
match delete_cmd.build().await {
Ok(delete) => {
if let Err(err) = delete.do_work().await {
eprintln!("best-effort failed to delete snapshot {snapshot_uri}: {err}");
}
}
Err(err) => eprintln!("best-effort failed to build delete for {snapshot_uri}: {err}"),
}
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
Ok(())
}
#[tokio::test]
#[ignore]
async fn import_v2_resume_from_completed_chunk_e2e() -> Result<()> {