feat(cli): implement import-v2 data import pipeline (#7898)

* feat(cli): implement import-v2 data import pipeline

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: cargo fmt

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix(cli): harden import-v2 snapshot validation

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: excape sql

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix(cli): redact escaped secrets in copy sql logs

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* test(cli): tighten export v2 e2e helpers

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix(cli): log execution time

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-04-13 20:15:36 -07:00
committed by GitHub
parent 8ad77ce649
commit e3f7ea8783
5 changed files with 1072 additions and 73 deletions

View File

@@ -41,7 +41,7 @@
mod chunker;
mod command;
mod coordinator;
mod data;
pub(crate) mod data;
pub mod error;
pub mod extractor;
pub mod manifest;

View File

@@ -38,18 +38,64 @@ pub(super) struct CopyTarget {
secrets: Vec<Option<String>>,
}
pub(crate) struct CopySource {
pub(crate) location: String,
pub(crate) connection: String,
secrets: Vec<Option<String>>,
}
impl CopyTarget {
fn mask_sql(&self, sql: &str) -> String {
mask_secrets(sql, &self.secrets)
}
}
impl CopySource {
fn mask_sql(&self, sql: &str) -> String {
mask_secrets(sql, &self.secrets)
}
}
pub(super) fn build_copy_target(
snapshot_uri: &str,
storage: &ObjectStoreConfig,
schema: &str,
chunk_id: u32,
) -> Result<CopyTarget> {
let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?;
Ok(CopyTarget {
location: location.location,
connection: location.connection,
secrets: location.secrets,
})
}
pub(crate) fn build_copy_source(
snapshot_uri: &str,
storage: &ObjectStoreConfig,
schema: &str,
chunk_id: u32,
) -> Result<CopySource> {
let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?;
Ok(CopySource {
location: location.location,
connection: location.connection,
secrets: location.secrets,
})
}
struct CopyLocation {
location: String,
connection: String,
secrets: Vec<Option<String>>,
}
fn build_copy_location(
snapshot_uri: &str,
storage: &ObjectStoreConfig,
schema: &str,
chunk_id: u32,
) -> Result<CopyLocation> {
let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?;
let scheme = StorageScheme::from_uri(snapshot_uri)?;
let suffix = data_dir_for_schema_chunk(schema, chunk_id);
@@ -64,7 +110,7 @@ pub(super) fn build_copy_target(
.build()
})?;
let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix));
Ok(CopyTarget {
Ok(CopyLocation {
location,
connection: String::new(),
secrets: Vec::new(),
@@ -74,7 +120,7 @@ pub(super) fn build_copy_target(
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_s3_connection(storage);
Ok(CopyTarget {
Ok(CopyLocation {
location,
connection,
secrets,
@@ -84,7 +130,7 @@ pub(super) fn build_copy_target(
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_oss_connection(storage);
Ok(CopyTarget {
Ok(CopyLocation {
location,
connection,
secrets,
@@ -94,7 +140,7 @@ pub(super) fn build_copy_target(
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?;
Ok(CopyTarget {
Ok(CopyLocation {
location,
connection,
secrets,
@@ -104,7 +150,7 @@ pub(super) fn build_copy_target(
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_azblob_connection(storage);
Ok(CopyTarget {
Ok(CopyLocation {
location,
connection,
secrets,
@@ -138,6 +184,30 @@ pub(super) async fn execute_copy_database(
Ok(())
}
pub(crate) async fn execute_copy_database_from(
database_client: &DatabaseClient,
catalog: &str,
schema: &str,
source: &CopySource,
format: DataFormat,
) -> Result<()> {
let sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='{}'){};"#,
escape_sql_identifier(catalog),
escape_sql_identifier(schema),
escape_sql_literal(&source.location),
format,
source.connection
);
let safe_sql = source.mask_sql(&sql);
info!("Executing sql: {}", safe_sql);
database_client
.sql_in_public(&sql)
.await
.context(DatabaseSnafu)?;
Ok(())
}
fn build_with_options(options: &CopyOptions) -> String {
let mut parts = vec![format!("FORMAT='{}'", options.format)];
if let Some(start) = options.time_range.start {
@@ -328,6 +398,10 @@ fn mask_secrets(sql: &str, secrets: &[Option<String>]) -> String {
if let Some(secret) = secret
&& !secret.is_empty()
{
let escaped = escape_sql_literal(secret);
if escaped != *secret {
masked = masked.replace(&escaped, "[REDACTED]");
}
masked = masked.replace(secret, "[REDACTED]");
}
}
@@ -430,6 +504,17 @@ mod tests {
assert!(!masked.contains("sig=secret-token"));
}
#[test]
fn test_mask_secrets_redacts_sql_escaped_literals() {
let sql =
"COPY DATABASE \"greptime\".\"public\" TO 's3://bucket' CONNECTION (SECRET='ab''cd');";
let masked = mask_secrets(sql, &[Some("ab'cd".to_string())]);
assert!(!masked.contains("ab'cd"));
assert!(!masked.contains("ab''cd"));
assert!(masked.contains("SECRET='[REDACTED]'"));
}
#[test]
fn test_build_copy_target_decodes_file_uri_path() {
let storage = ObjectStoreConfig::default();

View File

@@ -17,17 +17,49 @@ use std::time::Duration;
use clap::Parser;
use common_error::ext::BoxedError;
use serde_json::Value;
use snafu::ResultExt;
use tempfile::tempdir;
use url::Url;
use super::command::ExportCreateCommand;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::manifest::ChunkStatus;
use crate::data::import_v2::ImportV2Command;
use crate::data::snapshot_storage::OpenDalStorage;
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage};
use crate::data::sql::escape_sql_identifier;
use crate::database::DatabaseClient;
use crate::error::{FileIoSnafu, InvalidArgumentsSnafu, OtherSnafu, Result};
async fn query_count(database_client: &DatabaseClient, schema: &str, table: &str) -> Result<u64> {
let sql = format!("SELECT COUNT(*) FROM {}", escape_sql_identifier(table));
let rows = database_client.sql(&sql, schema).await?;
let first_row = rows.as_ref().and_then(|rows| rows.first()).ok_or_else(|| {
InvalidArgumentsSnafu {
msg: format!("empty result for query: {sql}"),
}
.build()
})?;
let first_value = first_row.first().ok_or_else(|| {
InvalidArgumentsSnafu {
msg: format!("no first column for query: {sql}"),
}
.build()
})?;
match first_value {
Value::Number(n) => n.as_u64().ok_or_else(|| {
InvalidArgumentsSnafu {
msg: format!("count is not u64 for query: {sql}"),
}
.build()
}),
_ => InvalidArgumentsSnafu {
msg: format!("unexpected count type for query: {sql}"),
}
.fail(),
}
}
#[tokio::test]
#[ignore]
async fn export_import_v2_schema_parity_e2e() -> Result<()> {
@@ -339,3 +371,515 @@ async fn import_v2_ddl_dry_run_e2e() -> Result<()> {
Ok(())
}
#[tokio::test]
#[ignore]
async fn export_import_v2_data_roundtrip_e2e() -> Result<()> {
let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string());
let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string());
let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok();
let schema = "test_db_data_roundtrip";
let database_client = DatabaseClient::new(
addr.clone(),
catalog.clone(),
auth_basic.clone(),
Duration::from_secs(60),
None,
false,
);
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
database_client
.sql_in_public(&format!("CREATE DATABASE {schema}"))
.await?;
database_client
.sql(
"CREATE TABLE metrics (\
ts TIMESTAMP TIME INDEX, \
host STRING PRIMARY KEY, \
cpu DOUBLE \
) ENGINE=mito",
schema,
)
.await?;
database_client
.sql(
"INSERT INTO metrics (ts, host, cpu) VALUES \
('2025-01-01T00:00:00Z', 'h1', 1.0), \
('2025-01-01T01:00:00Z', 'h2', 2.0)",
schema,
)
.await?;
let expected_rows = query_count(&database_client, schema, "metrics").await?;
let src_dir = tempdir().context(FileIoSnafu)?;
let src_uri = Url::from_directory_path(src_dir.path())
.map_err(|_| {
InvalidArgumentsSnafu {
msg: "invalid temp dir path".to_string(),
}
.build()
})?
.to_string();
let mut export_args = vec![
"export-v2-create",
"--addr",
&addr,
"--to",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema,
];
if let Some(auth) = &auth_basic {
export_args.push("--auth-basic");
export_args.push(auth);
}
let export_cmd = ExportCreateCommand::parse_from(export_args);
export_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
let mut import_args = vec![
"import-v2",
"--addr",
&addr,
"--from",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema,
];
if let Some(auth) = &auth_basic {
import_args.push("--auth-basic");
import_args.push(auth);
}
let import_cmd = ImportV2Command::parse_from(import_args);
import_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
let actual_rows = query_count(&database_client, schema, "metrics").await?;
assert_eq!(actual_rows, expected_rows);
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
Ok(())
}
#[tokio::test]
#[ignore]
async fn import_v2_fails_on_incomplete_snapshot_e2e() -> Result<()> {
let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string());
let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string());
let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok();
let schema = "test_db_incomplete_snapshot";
let database_client = DatabaseClient::new(
addr.clone(),
catalog.clone(),
auth_basic.clone(),
Duration::from_secs(60),
None,
false,
);
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
database_client
.sql_in_public(&format!("CREATE DATABASE {schema}"))
.await?;
database_client
.sql(
"CREATE TABLE metrics (\
ts TIMESTAMP TIME INDEX, \
host STRING PRIMARY KEY, \
cpu DOUBLE \
) ENGINE=mito",
schema,
)
.await?;
database_client
.sql(
"INSERT INTO metrics (ts, host, cpu) VALUES ('2025-01-01T00:00:00Z', 'h1', 1.0)",
schema,
)
.await?;
let src_dir = tempdir().context(FileIoSnafu)?;
let src_uri = Url::from_directory_path(src_dir.path())
.map_err(|_| {
InvalidArgumentsSnafu {
msg: "invalid temp dir path".to_string(),
}
.build()
})?
.to_string();
let mut export_args = vec![
"export-v2-create",
"--addr",
&addr,
"--to",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema,
];
if let Some(auth) = &auth_basic {
export_args.push("--auth-basic");
export_args.push(auth);
}
let export_cmd = ExportCreateCommand::parse_from(export_args);
export_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
let storage_config = ObjectStoreConfig::default();
let storage = OpenDalStorage::from_uri(&src_uri, &storage_config)
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let mut manifest = storage
.read_manifest()
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
if let Some(first_chunk) = manifest.chunks.first_mut() {
first_chunk.status = ChunkStatus::Failed;
}
storage
.write_manifest(&manifest)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
let mut import_args = vec![
"import-v2",
"--addr",
&addr,
"--from",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema,
];
if let Some(auth) = &auth_basic {
import_args.push("--auth-basic");
import_args.push(auth);
}
let import_cmd = ImportV2Command::parse_from(import_args);
let err = import_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.expect_err("import should fail on incomplete snapshot");
assert!(err.to_string().contains("Incomplete snapshot"));
Ok(())
}
#[tokio::test]
#[ignore]
async fn import_v2_schema_filter_data_e2e() -> Result<()> {
let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string());
let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string());
let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok();
let schema_a = "test_db_filter_a";
let schema_b = "test_db_filter_b";
let database_client = DatabaseClient::new(
addr.clone(),
catalog.clone(),
auth_basic.clone(),
Duration::from_secs(60),
None,
false,
);
for schema in [schema_a, schema_b] {
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
database_client
.sql_in_public(&format!("CREATE DATABASE {schema}"))
.await?;
database_client
.sql(
"CREATE TABLE metrics (\
ts TIMESTAMP TIME INDEX, \
host STRING PRIMARY KEY, \
cpu DOUBLE \
) ENGINE=mito",
schema,
)
.await?;
}
database_client
.sql(
"INSERT INTO metrics (ts, host, cpu) VALUES ('2025-01-01T00:00:00Z', 'a1', 1.0)",
schema_a,
)
.await?;
database_client
.sql(
"INSERT INTO metrics (ts, host, cpu) VALUES ('2025-01-01T00:00:00Z', 'b1', 2.0)",
schema_b,
)
.await?;
let expected_rows_a = query_count(&database_client, schema_a, "metrics").await?;
let src_dir = tempdir().context(FileIoSnafu)?;
let src_uri = Url::from_directory_path(src_dir.path())
.map_err(|_| {
InvalidArgumentsSnafu {
msg: "invalid temp dir path".to_string(),
}
.build()
})?
.to_string();
let mut export_args = vec![
"export-v2-create",
"--addr",
&addr,
"--to",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema_a,
"--schemas",
schema_b,
];
if let Some(auth) = &auth_basic {
export_args.push("--auth-basic");
export_args.push(auth);
}
let export_cmd = ExportCreateCommand::parse_from(export_args);
export_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
for schema in [schema_a, schema_b] {
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
}
let mut import_args = vec![
"import-v2",
"--addr",
&addr,
"--from",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema_a,
];
if let Some(auth) = &auth_basic {
import_args.push("--auth-basic");
import_args.push(auth);
}
let import_cmd = ImportV2Command::parse_from(import_args);
import_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
let actual_rows_a = query_count(&database_client, schema_a, "metrics").await?;
assert_eq!(actual_rows_a, expected_rows_a);
let schema_b_query = database_client
.sql("SELECT COUNT(*) FROM metrics", schema_b)
.await;
assert!(schema_b_query.is_err(), "schema_b should not be imported");
for schema in [schema_a, schema_b] {
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
}
Ok(())
}
#[tokio::test]
#[ignore]
async fn export_import_v2_skipped_chunk_e2e() -> Result<()> {
let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string());
let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string());
let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok();
let schema = "test_db_skipped_chunk";
let database_client = DatabaseClient::new(
addr.clone(),
catalog.clone(),
auth_basic.clone(),
Duration::from_secs(60),
None,
false,
);
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
database_client
.sql_in_public(&format!("CREATE DATABASE {schema}"))
.await?;
database_client
.sql(
"CREATE TABLE metrics (\
ts TIMESTAMP TIME INDEX, \
host STRING PRIMARY KEY, \
cpu DOUBLE \
) ENGINE=mito",
schema,
)
.await?;
database_client
.sql(
"INSERT INTO metrics (ts, host, cpu) VALUES \
('2025-01-01T00:00:00Z', 'h1', 1.0), \
('2025-01-01T01:00:00Z', 'h2', 2.0)",
schema,
)
.await?;
let src_dir = tempdir().context(FileIoSnafu)?;
let src_uri = Url::from_directory_path(src_dir.path())
.map_err(|_| {
InvalidArgumentsSnafu {
msg: "invalid temp dir path".to_string(),
}
.build()
})?
.to_string();
let mut export_args = vec![
"export-v2-create",
"--addr",
&addr,
"--to",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema,
"--start-time",
"2025-01-01T00:00:00Z",
"--end-time",
"2025-01-01T02:00:00Z",
"--chunk-time-window",
"1h",
];
if let Some(auth) = &auth_basic {
export_args.push("--auth-basic");
export_args.push(auth);
}
let export_cmd = ExportCreateCommand::parse_from(export_args);
export_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
let storage_config = ObjectStoreConfig::default();
let storage = OpenDalStorage::from_uri(&src_uri, &storage_config)
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let mut manifest = storage
.read_manifest()
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
assert_eq!(manifest.chunks.len(), 2);
manifest.chunks[0].status = ChunkStatus::Skipped;
manifest.chunks[0].files.clear();
storage
.write_manifest(&manifest)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
let mut import_args = vec![
"import-v2",
"--addr",
&addr,
"--from",
&src_uri,
"--catalog",
&catalog,
"--schemas",
schema,
];
if let Some(auth) = &auth_basic {
import_args.push("--auth-basic");
import_args.push(auth);
}
let import_cmd = ImportV2Command::parse_from(import_args);
import_cmd
.build()
.await
.context(OtherSnafu)?
.do_work()
.await
.context(OtherSnafu)?;
let actual_rows = query_count(&database_client, schema, "metrics").await?;
assert_eq!(actual_rows, 1);
database_client
.sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}"))
.await?;
Ok(())
}

View File

@@ -15,7 +15,7 @@
//! Import V2 CLI command.
use std::collections::HashSet;
use std::time::Duration;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use clap::Parser;
@@ -25,13 +25,15 @@ use snafu::ResultExt;
use crate::Tool;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::manifest::MANIFEST_VERSION;
use crate::data::export_v2::data::{build_copy_source, execute_copy_database_from};
use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, DataFormat, MANIFEST_VERSION};
use crate::data::import_v2::error::{
FullSnapshotImportNotSupportedSnafu, ManifestVersionMismatchSnafu, Result,
SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
ChunkImportFailedSnafu, EmptyChunkManifestSnafu, IncompleteSnapshotSnafu,
ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result, SchemaNotInSnapshotSnafu,
SnapshotStorageSnafu,
};
use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
use crate::data::path::ddl_path_for_schema;
use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
use crate::database::{DatabaseClient, parse_proxy_opts};
@@ -116,8 +118,11 @@ impl ImportV2Command {
);
Ok(Box::new(Import {
catalog: self.catalog.clone(),
schemas,
dry_run: self.dry_run,
snapshot_uri: self.from.clone(),
storage_config: self.storage.clone(),
storage: Box::new(storage),
database_client,
}))
@@ -126,8 +131,11 @@ impl ImportV2Command {
/// Import tool implementation.
pub struct Import {
catalog: String,
schemas: Option<Vec<String>>,
dry_run: bool,
snapshot_uri: String,
storage_config: ObjectStoreConfig,
storage: Box<dyn SnapshotStorage>,
database_client: DatabaseClient,
}
@@ -164,13 +172,6 @@ impl Import {
info!("Snapshot contains {} schema(s)", manifest.schemas.len());
if !manifest.schema_only && !manifest.chunks.is_empty() {
return FullSnapshotImportNotSupportedSnafu {
chunk_count: manifest.chunks.len(),
}
.fail();
}
// 2. Determine schemas to import
let schemas_to_import = match &self.schemas {
Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
@@ -184,6 +185,15 @@ impl Import {
info!("Generated {} DDL statements", ddl_statements.len());
let data_prefixes = if !manifest.schema_only && !manifest.chunks.is_empty() {
Some(
validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import)
.await?,
)
} else {
None
};
// 4. Dry-run mode: print DDL and exit
if self.dry_run {
info!("Dry-run mode - DDL statements to execute:");
@@ -193,6 +203,12 @@ impl Import {
println!("{};", stmt.sql);
println!();
}
if !manifest.schema_only && !manifest.chunks.is_empty() {
for line in format_data_import_plan(&manifest.chunks, &schemas_to_import) {
println!("{line}");
}
println!();
}
return Ok(());
}
@@ -200,6 +216,16 @@ impl Import {
let executor = DdlExecutor::new(&self.database_client);
executor.execute_strict(&ddl_statements).await?;
if !manifest.schema_only && !manifest.chunks.is_empty() {
self.import_data(
&manifest.chunks,
&schemas_to_import,
manifest.format,
data_prefixes.expect("validated full snapshot must provide data prefixes"),
)
.await?;
}
info!(
"Import completed: {} DDL statements executed",
ddl_statements.len()
@@ -226,6 +252,80 @@ impl Import {
Ok(statements)
}
async fn import_data(
&self,
chunks: &[ChunkMeta],
schemas: &[String],
format: DataFormat,
actual_prefixes: HashSet<String>,
) -> Result<()> {
let import_start = Instant::now();
let total_chunks = chunks
.iter()
.filter(|chunk| chunk.status == ChunkStatus::Completed)
.count();
info!(
"Importing data: {} chunks, {} schemas",
total_chunks,
schemas.len()
);
for (idx, chunk) in chunks.iter().enumerate() {
if chunk.status == ChunkStatus::Skipped {
info!(
"[{}/{}] Chunk {}: skipped (no data)",
idx + 1,
chunks.len(),
chunk.id
);
continue;
}
info!(
"[{}/{}] Chunk {} ({:?} ~ {:?})",
idx + 1,
chunks.len(),
chunk.id,
chunk.time_range.start,
chunk.time_range.end
);
for schema in schemas {
if !validate_chunk_schema_files(chunk, schema, &actual_prefixes)? {
info!(" {}: no data, skipped", schema);
continue;
}
info!(" {}: importing...", schema);
let copy_start = Instant::now();
let source =
build_copy_source(&self.snapshot_uri, &self.storage_config, schema, chunk.id)
.context(ChunkImportFailedSnafu {
chunk_id: chunk.id,
schema: schema.clone(),
})?;
execute_copy_database_from(
&self.database_client,
&self.catalog,
schema,
&source,
format,
)
.await
.context(ChunkImportFailedSnafu {
chunk_id: chunk.id,
schema: schema.clone(),
})?;
info!(" {}: done in {:?}", schema, copy_start.elapsed());
}
}
info!("Data import finished in {:?}", import_start.elapsed());
Ok(())
}
}
fn parse_ddl_statements(content: &str) -> Vec<String> {
@@ -395,21 +495,130 @@ fn canonicalize_schema_filter(
Ok(canonicalized)
}
fn validate_chunk_statuses(chunks: &[ChunkMeta]) -> Result<()> {
let invalid_chunk = chunks
.iter()
.find(|chunk| !matches!(chunk.status, ChunkStatus::Completed | ChunkStatus::Skipped));
if let Some(chunk) = invalid_chunk {
return IncompleteSnapshotSnafu {
chunk_id: chunk.id,
status: chunk.status,
}
.fail();
}
Ok(())
}
fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool {
let prefix = data_dir_for_schema_chunk(schema, chunk.id);
chunk.files.iter().any(|path| {
let normalized = path.trim_start_matches('/');
normalized.starts_with(&prefix)
})
}
fn format_data_import_plan(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<String> {
let mut lines = vec!["-- Data import plan:".to_string()];
for chunk in chunks {
lines.push(format!("-- Chunk {}: {:?}", chunk.id, chunk.status));
for schema in schemas {
if chunk_has_schema_files(chunk, schema) {
lines.push(format!("-- {} -> COPY DATABASE FROM", schema));
}
}
}
lines
}
async fn validate_data_snapshot(
storage: &dyn SnapshotStorage,
chunks: &[ChunkMeta],
schemas: &[String],
) -> Result<HashSet<String>> {
validate_chunk_statuses(chunks)?;
let actual_prefixes = collect_chunk_data_prefixes(storage).await?;
for chunk in chunks {
if chunk.status == ChunkStatus::Skipped {
continue;
}
if chunk.files.is_empty() {
return EmptyChunkManifestSnafu { chunk_id: chunk.id }.fail();
}
for schema in schemas {
validate_chunk_schema_files(chunk, schema, &actual_prefixes)?;
}
}
Ok(actual_prefixes)
}
async fn collect_chunk_data_prefixes(storage: &dyn SnapshotStorage) -> Result<HashSet<String>> {
let files = storage
.list_files_recursive("data/")
.await
.context(SnapshotStorageSnafu)?;
let mut prefixes = HashSet::new();
for path in files {
let normalized = path.trim_start_matches('/');
let mut parts = normalized.splitn(4, '/');
let Some(root) = parts.next() else {
continue;
};
let Some(schema) = parts.next() else {
continue;
};
let Some(chunk_id) = parts.next() else {
continue;
};
if root != "data" {
continue;
}
prefixes.insert(format!("data/{schema}/{chunk_id}/"));
}
Ok(prefixes)
}
fn validate_chunk_schema_files(
chunk: &ChunkMeta,
schema: &str,
actual_prefixes: &HashSet<String>,
) -> Result<bool> {
if !chunk_has_schema_files(chunk, schema) {
return Ok(false);
}
let prefix = data_dir_for_schema_chunk(schema, chunk.id);
if !actual_prefixes.contains(&prefix) {
return MissingChunkDataSnafu {
chunk_id: chunk.id,
schema: schema.to_string(),
path: prefix,
}
.fail();
}
Ok(true)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use std::collections::{HashMap, HashSet};
use async_trait::async_trait;
use super::*;
use crate::Tool;
use crate::data::export_v2::manifest::{ChunkMeta, DataFormat, Manifest, TimeRange};
use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, Manifest, TimeRange};
use crate::data::export_v2::schema::SchemaSnapshot;
use crate::data::snapshot_storage::SnapshotStorage;
use crate::database::DatabaseClient;
struct StubStorage {
manifest: Manifest,
files_by_prefix: HashMap<String, Vec<String>>,
}
#[async_trait]
@@ -454,9 +663,14 @@ mod tests {
async fn list_files_recursive(
&self,
_prefix: &str,
prefix: &str,
) -> crate::data::export_v2::error::Result<Vec<String>> {
unimplemented!("not needed in import_v2::command tests")
Ok(self
.files_by_prefix
.iter()
.filter(|(candidate, _)| candidate.starts_with(prefix))
.flat_map(|(_, files)| files.clone())
.collect())
}
async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
@@ -464,48 +678,6 @@ mod tests {
}
}
fn test_database_client() -> DatabaseClient {
DatabaseClient::new(
"127.0.0.1:4000".to_string(),
"greptime".to_string(),
None,
Duration::from_secs(1),
None,
false,
)
}
#[tokio::test]
async fn test_import_rejects_full_snapshot_before_schema_execution() {
let mut manifest = Manifest::new_full(
"greptime".to_string(),
vec!["public".to_string()],
TimeRange::unbounded(),
DataFormat::Parquet,
);
manifest
.chunks
.push(ChunkMeta::new(1, TimeRange::unbounded()));
let import = Import {
schemas: None,
dry_run: false,
storage: Box::new(StubStorage { manifest }),
database_client: test_database_client(),
};
let error = import
.do_work()
.await
.expect_err("full snapshot import should fail");
assert!(
error
.to_string()
.contains("Importing data from full snapshots is not implemented yet")
);
}
#[test]
fn test_parse_ddl_statements() {
let content = r#"
@@ -640,4 +812,164 @@ CREATE VIEW v AS SELECT 1;
assert!(!starts_with_keyword("CREATED TABLE t", "CREATE"));
assert!(!starts_with_keyword("TABLESPACE foo", "TABLE"));
}
#[test]
fn test_validate_chunk_statuses_rejects_failed_chunk() {
let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
failed.status = ChunkStatus::Failed;
let error = validate_chunk_statuses(&[failed]).expect_err("failed chunk should error");
assert!(error.to_string().contains("Incomplete snapshot"));
}
#[test]
fn test_validate_chunk_statuses_accepts_completed_and_skipped_chunks() {
let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
completed.status = ChunkStatus::Completed;
let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
assert!(validate_chunk_statuses(&[completed, skipped]).is_ok());
}
#[test]
fn test_chunk_has_schema_files_matches_encoded_schema_prefix() {
let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
chunk.files = vec![
"data/public/7/a.parquet".to_string(),
"data/%E6%B5%8B%E8%AF%95/7/b.parquet".to_string(),
];
assert!(chunk_has_schema_files(&chunk, "public"));
assert!(chunk_has_schema_files(&chunk, "测试"));
assert!(!chunk_has_schema_files(&chunk, "metrics"));
}
#[test]
fn test_format_data_import_plan_includes_matching_schemas_only() {
let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
completed.status = ChunkStatus::Completed;
completed.files = vec![
"data/public/1/a.parquet".to_string(),
"data/%E6%B5%8B%E8%AF%95/1/b.parquet".to_string(),
];
let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
let lines = format_data_import_plan(
&[completed, skipped],
&[
"public".to_string(),
"测试".to_string(),
"metrics".to_string(),
],
);
assert_eq!(lines[0], "-- Data import plan:");
assert!(lines.contains(&"-- Chunk 1: Completed".to_string()));
assert!(lines.contains(&"-- public -> COPY DATABASE FROM".to_string()));
assert!(lines.contains(&"-- 测试 -> COPY DATABASE FROM".to_string()));
assert!(!lines.contains(&"-- metrics -> COPY DATABASE FROM".to_string()));
assert!(lines.contains(&"-- Chunk 2: Skipped".to_string()));
}
#[tokio::test]
async fn test_collect_chunk_data_prefixes_indexes_present_prefixes() {
let storage = StubStorage {
manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
files_by_prefix: HashMap::from([
(
"data/public/7/".to_string(),
vec!["data/public/7/a.parquet".to_string()],
),
(
"data/%E6%B5%8B%E8%AF%95/9/".to_string(),
vec!["data/%E6%B5%8B%E8%AF%95/9/b.parquet".to_string()],
),
]),
};
let prefixes = collect_chunk_data_prefixes(&storage).await.unwrap();
assert!(prefixes.contains("data/public/7/"));
assert!(prefixes.contains("data/%E6%B5%8B%E8%AF%95/9/"));
}
#[test]
fn test_validate_chunk_schema_files_accepts_present_prefix() {
let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
chunk.files = vec!["data/public/7/a.parquet".to_string()];
let actual_prefixes = HashSet::from(["data/public/7/".to_string()]);
assert!(validate_chunk_schema_files(&chunk, "public", &actual_prefixes).unwrap());
}
#[test]
fn test_validate_chunk_schema_files_rejects_missing_prefix() {
let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
chunk.files = vec!["data/public/7/a.parquet".to_string()];
let error = validate_chunk_schema_files(&chunk, "public", &HashSet::new())
.expect_err("missing chunk prefix should fail")
.to_string();
assert!(error.contains("marked completed but no files were found"));
}
#[test]
fn test_validate_chunk_schema_files_skips_absent_schema() {
let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
chunk.files = vec!["data/public/7/a.parquet".to_string()];
assert!(!validate_chunk_schema_files(&chunk, "metrics", &HashSet::new()).unwrap());
}
#[tokio::test]
async fn test_validate_data_snapshot_rejects_failed_chunk_before_dry_run() {
let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
failed.status = ChunkStatus::Failed;
let storage = StubStorage {
manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
files_by_prefix: HashMap::new(),
};
let error = validate_data_snapshot(&storage, &[failed], &["public".to_string()])
.await
.expect_err("failed chunk should reject dry-run validation")
.to_string();
assert!(error.contains("Incomplete snapshot"));
}
#[tokio::test]
async fn test_validate_data_snapshot_rejects_missing_chunk_prefix_before_dry_run() {
let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
completed.status = ChunkStatus::Completed;
completed.files = vec!["data/public/7/a.parquet".to_string()];
let storage = StubStorage {
manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
files_by_prefix: HashMap::new(),
};
let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
.await
.expect_err("missing chunk prefix should reject dry-run validation")
.to_string();
assert!(error.contains("marked completed but no files were found"));
}
#[tokio::test]
async fn test_validate_data_snapshot_rejects_completed_chunk_with_empty_manifest() {
let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
completed.status = ChunkStatus::Completed;
let storage = StubStorage {
manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
files_by_prefix: HashMap::new(),
};
let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
.await
.expect_err("empty completed chunk should reject validation")
.to_string();
assert!(error.contains("file manifest is empty"));
}
}

View File

@@ -19,6 +19,8 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use crate::data::export_v2::manifest::ChunkStatus;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
@@ -45,12 +47,44 @@ pub enum Error {
location: Location,
},
#[snafu(display("Incomplete snapshot: chunk {} has status {:?}", chunk_id, status))]
IncompleteSnapshot {
chunk_id: u32,
status: ChunkStatus,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Importing data from full snapshots is not implemented yet (snapshot has {} chunk(s))",
chunk_count
"Snapshot is inconsistent: chunk {} is marked completed but its file manifest is empty",
chunk_id
))]
FullSnapshotImportNotSupported {
chunk_count: usize,
EmptyChunkManifest {
chunk_id: u32,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Snapshot is inconsistent: chunk {} for schema '{}' is marked completed but no files were found under '{}'",
chunk_id,
schema,
path
))]
MissingChunkData {
chunk_id: u32,
schema: String,
path: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Chunk {} import failed for schema '{}'", chunk_id, schema))]
ChunkImportFailed {
chunk_id: u32,
schema: String,
#[snafu(source)]
error: crate::data::export_v2::error::Error,
#[snafu(implicit)]
location: Location,
},
@@ -80,9 +114,13 @@ impl ErrorExt for Error {
Error::SnapshotNotFound { .. }
| Error::SchemaNotInSnapshot { .. }
| Error::ManifestVersionMismatch { .. }
| Error::FullSnapshotImportNotSupported { .. } => StatusCode::InvalidArguments,
| Error::IncompleteSnapshot { .. }
| Error::EmptyChunkManifest { .. }
| Error::MissingChunkData { .. } => StatusCode::InvalidArguments,
Error::Database { error, .. } => error.status_code(),
Error::SnapshotStorage { error, .. } => error.status_code(),
Error::SnapshotStorage { error, .. } | Error::ChunkImportFailed { error, .. } => {
error.status_code()
}
}
}