diff --git a/src/cli/src/data/export_v2.rs b/src/cli/src/data/export_v2.rs index 1921ffe4b4..9bdf3c63f5 100644 --- a/src/cli/src/data/export_v2.rs +++ b/src/cli/src/data/export_v2.rs @@ -41,7 +41,7 @@ mod chunker; mod command; mod coordinator; -mod data; +pub(crate) mod data; pub mod error; pub mod extractor; pub mod manifest; diff --git a/src/cli/src/data/export_v2/data.rs b/src/cli/src/data/export_v2/data.rs index 25d70ee118..dce924a717 100644 --- a/src/cli/src/data/export_v2/data.rs +++ b/src/cli/src/data/export_v2/data.rs @@ -38,18 +38,64 @@ pub(super) struct CopyTarget { secrets: Vec>, } +pub(crate) struct CopySource { + pub(crate) location: String, + pub(crate) connection: String, + secrets: Vec>, +} + impl CopyTarget { fn mask_sql(&self, sql: &str) -> String { mask_secrets(sql, &self.secrets) } } +impl CopySource { + fn mask_sql(&self, sql: &str) -> String { + mask_secrets(sql, &self.secrets) + } +} + pub(super) fn build_copy_target( snapshot_uri: &str, storage: &ObjectStoreConfig, schema: &str, chunk_id: u32, ) -> Result { + let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?; + Ok(CopyTarget { + location: location.location, + connection: location.connection, + secrets: location.secrets, + }) +} + +pub(crate) fn build_copy_source( + snapshot_uri: &str, + storage: &ObjectStoreConfig, + schema: &str, + chunk_id: u32, +) -> Result { + let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?; + Ok(CopySource { + location: location.location, + connection: location.connection, + secrets: location.secrets, + }) +} + +struct CopyLocation { + location: String, + connection: String, + secrets: Vec>, +} + +fn build_copy_location( + snapshot_uri: &str, + storage: &ObjectStoreConfig, + schema: &str, + chunk_id: u32, +) -> Result { let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?; let scheme = StorageScheme::from_uri(snapshot_uri)?; let suffix = data_dir_for_schema_chunk(schema, chunk_id); @@ -64,7 +110,7 @@ pub(super) fn build_copy_target( .build() })?; let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix)); - Ok(CopyTarget { + Ok(CopyLocation { location, connection: String::new(), secrets: Vec::new(), @@ -74,7 +120,7 @@ pub(super) fn build_copy_target( let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix)); let (connection, secrets) = build_s3_connection(storage); - Ok(CopyTarget { + Ok(CopyLocation { location, connection, secrets, @@ -84,7 +130,7 @@ pub(super) fn build_copy_target( let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix)); let (connection, secrets) = build_oss_connection(storage); - Ok(CopyTarget { + Ok(CopyLocation { location, connection, secrets, @@ -94,7 +140,7 @@ pub(super) fn build_copy_target( let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix)); let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?; - Ok(CopyTarget { + Ok(CopyLocation { location, connection, secrets, @@ -104,7 +150,7 @@ pub(super) fn build_copy_target( let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix)); let (connection, secrets) = build_azblob_connection(storage); - Ok(CopyTarget { + Ok(CopyLocation { location, connection, secrets, @@ -138,6 +184,30 @@ pub(super) async fn execute_copy_database( Ok(()) } +pub(crate) async fn execute_copy_database_from( + database_client: &DatabaseClient, + catalog: &str, + schema: &str, + source: &CopySource, + format: DataFormat, +) -> Result<()> { + let sql = format!( + r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='{}'){};"#, + escape_sql_identifier(catalog), + escape_sql_identifier(schema), + escape_sql_literal(&source.location), + format, + source.connection + ); + let safe_sql = source.mask_sql(&sql); + info!("Executing sql: {}", safe_sql); + database_client + .sql_in_public(&sql) + .await + .context(DatabaseSnafu)?; + Ok(()) +} + fn build_with_options(options: &CopyOptions) -> String { let mut parts = vec![format!("FORMAT='{}'", options.format)]; if let Some(start) = options.time_range.start { @@ -328,6 +398,10 @@ fn mask_secrets(sql: &str, secrets: &[Option]) -> String { if let Some(secret) = secret && !secret.is_empty() { + let escaped = escape_sql_literal(secret); + if escaped != *secret { + masked = masked.replace(&escaped, "[REDACTED]"); + } masked = masked.replace(secret, "[REDACTED]"); } } @@ -430,6 +504,17 @@ mod tests { assert!(!masked.contains("sig=secret-token")); } + #[test] + fn test_mask_secrets_redacts_sql_escaped_literals() { + let sql = + "COPY DATABASE \"greptime\".\"public\" TO 's3://bucket' CONNECTION (SECRET='ab''cd');"; + let masked = mask_secrets(sql, &[Some("ab'cd".to_string())]); + + assert!(!masked.contains("ab'cd")); + assert!(!masked.contains("ab''cd")); + assert!(masked.contains("SECRET='[REDACTED]'")); + } + #[test] fn test_build_copy_target_decodes_file_uri_path() { let storage = ObjectStoreConfig::default(); diff --git a/src/cli/src/data/export_v2/tests.rs b/src/cli/src/data/export_v2/tests.rs index bd28801a0d..92f45c6aec 100644 --- a/src/cli/src/data/export_v2/tests.rs +++ b/src/cli/src/data/export_v2/tests.rs @@ -17,17 +17,49 @@ use std::time::Duration; use clap::Parser; use common_error::ext::BoxedError; +use serde_json::Value; use snafu::ResultExt; use tempfile::tempdir; use url::Url; use super::command::ExportCreateCommand; use crate::common::ObjectStoreConfig; +use crate::data::export_v2::manifest::ChunkStatus; use crate::data::import_v2::ImportV2Command; -use crate::data::snapshot_storage::OpenDalStorage; +use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage}; +use crate::data::sql::escape_sql_identifier; use crate::database::DatabaseClient; use crate::error::{FileIoSnafu, InvalidArgumentsSnafu, OtherSnafu, Result}; +async fn query_count(database_client: &DatabaseClient, schema: &str, table: &str) -> Result { + let sql = format!("SELECT COUNT(*) FROM {}", escape_sql_identifier(table)); + let rows = database_client.sql(&sql, schema).await?; + let first_row = rows.as_ref().and_then(|rows| rows.first()).ok_or_else(|| { + InvalidArgumentsSnafu { + msg: format!("empty result for query: {sql}"), + } + .build() + })?; + let first_value = first_row.first().ok_or_else(|| { + InvalidArgumentsSnafu { + msg: format!("no first column for query: {sql}"), + } + .build() + })?; + match first_value { + Value::Number(n) => n.as_u64().ok_or_else(|| { + InvalidArgumentsSnafu { + msg: format!("count is not u64 for query: {sql}"), + } + .build() + }), + _ => InvalidArgumentsSnafu { + msg: format!("unexpected count type for query: {sql}"), + } + .fail(), + } +} + #[tokio::test] #[ignore] async fn export_import_v2_schema_parity_e2e() -> Result<()> { @@ -339,3 +371,515 @@ async fn import_v2_ddl_dry_run_e2e() -> Result<()> { Ok(()) } + +#[tokio::test] +#[ignore] +async fn export_import_v2_data_roundtrip_e2e() -> Result<()> { + let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string()); + let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string()); + let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok(); + let schema = "test_db_data_roundtrip"; + + let database_client = DatabaseClient::new( + addr.clone(), + catalog.clone(), + auth_basic.clone(), + Duration::from_secs(60), + None, + false, + ); + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + database_client + .sql_in_public(&format!("CREATE DATABASE {schema}")) + .await?; + database_client + .sql( + "CREATE TABLE metrics (\ + ts TIMESTAMP TIME INDEX, \ + host STRING PRIMARY KEY, \ + cpu DOUBLE \ + ) ENGINE=mito", + schema, + ) + .await?; + database_client + .sql( + "INSERT INTO metrics (ts, host, cpu) VALUES \ + ('2025-01-01T00:00:00Z', 'h1', 1.0), \ + ('2025-01-01T01:00:00Z', 'h2', 2.0)", + schema, + ) + .await?; + + let expected_rows = query_count(&database_client, schema, "metrics").await?; + + let src_dir = tempdir().context(FileIoSnafu)?; + let src_uri = Url::from_directory_path(src_dir.path()) + .map_err(|_| { + InvalidArgumentsSnafu { + msg: "invalid temp dir path".to_string(), + } + .build() + })? + .to_string(); + + let mut export_args = vec![ + "export-v2-create", + "--addr", + &addr, + "--to", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema, + ]; + if let Some(auth) = &auth_basic { + export_args.push("--auth-basic"); + export_args.push(auth); + } + let export_cmd = ExportCreateCommand::parse_from(export_args); + export_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + + let mut import_args = vec![ + "import-v2", + "--addr", + &addr, + "--from", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema, + ]; + if let Some(auth) = &auth_basic { + import_args.push("--auth-basic"); + import_args.push(auth); + } + let import_cmd = ImportV2Command::parse_from(import_args); + import_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + let actual_rows = query_count(&database_client, schema, "metrics").await?; + assert_eq!(actual_rows, expected_rows); + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + + Ok(()) +} + +#[tokio::test] +#[ignore] +async fn import_v2_fails_on_incomplete_snapshot_e2e() -> Result<()> { + let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string()); + let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string()); + let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok(); + let schema = "test_db_incomplete_snapshot"; + + let database_client = DatabaseClient::new( + addr.clone(), + catalog.clone(), + auth_basic.clone(), + Duration::from_secs(60), + None, + false, + ); + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + database_client + .sql_in_public(&format!("CREATE DATABASE {schema}")) + .await?; + database_client + .sql( + "CREATE TABLE metrics (\ + ts TIMESTAMP TIME INDEX, \ + host STRING PRIMARY KEY, \ + cpu DOUBLE \ + ) ENGINE=mito", + schema, + ) + .await?; + database_client + .sql( + "INSERT INTO metrics (ts, host, cpu) VALUES ('2025-01-01T00:00:00Z', 'h1', 1.0)", + schema, + ) + .await?; + + let src_dir = tempdir().context(FileIoSnafu)?; + let src_uri = Url::from_directory_path(src_dir.path()) + .map_err(|_| { + InvalidArgumentsSnafu { + msg: "invalid temp dir path".to_string(), + } + .build() + })? + .to_string(); + + let mut export_args = vec![ + "export-v2-create", + "--addr", + &addr, + "--to", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema, + ]; + if let Some(auth) = &auth_basic { + export_args.push("--auth-basic"); + export_args.push(auth); + } + let export_cmd = ExportCreateCommand::parse_from(export_args); + export_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + let storage_config = ObjectStoreConfig::default(); + let storage = OpenDalStorage::from_uri(&src_uri, &storage_config) + .map_err(BoxedError::new) + .context(OtherSnafu)?; + let mut manifest = storage + .read_manifest() + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; + if let Some(first_chunk) = manifest.chunks.first_mut() { + first_chunk.status = ChunkStatus::Failed; + } + storage + .write_manifest(&manifest) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + + let mut import_args = vec![ + "import-v2", + "--addr", + &addr, + "--from", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema, + ]; + if let Some(auth) = &auth_basic { + import_args.push("--auth-basic"); + import_args.push(auth); + } + let import_cmd = ImportV2Command::parse_from(import_args); + let err = import_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .expect_err("import should fail on incomplete snapshot"); + assert!(err.to_string().contains("Incomplete snapshot")); + + Ok(()) +} + +#[tokio::test] +#[ignore] +async fn import_v2_schema_filter_data_e2e() -> Result<()> { + let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string()); + let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string()); + let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok(); + let schema_a = "test_db_filter_a"; + let schema_b = "test_db_filter_b"; + + let database_client = DatabaseClient::new( + addr.clone(), + catalog.clone(), + auth_basic.clone(), + Duration::from_secs(60), + None, + false, + ); + + for schema in [schema_a, schema_b] { + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + database_client + .sql_in_public(&format!("CREATE DATABASE {schema}")) + .await?; + database_client + .sql( + "CREATE TABLE metrics (\ + ts TIMESTAMP TIME INDEX, \ + host STRING PRIMARY KEY, \ + cpu DOUBLE \ + ) ENGINE=mito", + schema, + ) + .await?; + } + database_client + .sql( + "INSERT INTO metrics (ts, host, cpu) VALUES ('2025-01-01T00:00:00Z', 'a1', 1.0)", + schema_a, + ) + .await?; + database_client + .sql( + "INSERT INTO metrics (ts, host, cpu) VALUES ('2025-01-01T00:00:00Z', 'b1', 2.0)", + schema_b, + ) + .await?; + + let expected_rows_a = query_count(&database_client, schema_a, "metrics").await?; + + let src_dir = tempdir().context(FileIoSnafu)?; + let src_uri = Url::from_directory_path(src_dir.path()) + .map_err(|_| { + InvalidArgumentsSnafu { + msg: "invalid temp dir path".to_string(), + } + .build() + })? + .to_string(); + + let mut export_args = vec![ + "export-v2-create", + "--addr", + &addr, + "--to", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema_a, + "--schemas", + schema_b, + ]; + if let Some(auth) = &auth_basic { + export_args.push("--auth-basic"); + export_args.push(auth); + } + let export_cmd = ExportCreateCommand::parse_from(export_args); + export_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + for schema in [schema_a, schema_b] { + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + } + + let mut import_args = vec![ + "import-v2", + "--addr", + &addr, + "--from", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema_a, + ]; + if let Some(auth) = &auth_basic { + import_args.push("--auth-basic"); + import_args.push(auth); + } + let import_cmd = ImportV2Command::parse_from(import_args); + import_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + let actual_rows_a = query_count(&database_client, schema_a, "metrics").await?; + assert_eq!(actual_rows_a, expected_rows_a); + + let schema_b_query = database_client + .sql("SELECT COUNT(*) FROM metrics", schema_b) + .await; + assert!(schema_b_query.is_err(), "schema_b should not be imported"); + + for schema in [schema_a, schema_b] { + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + } + + Ok(()) +} + +#[tokio::test] +#[ignore] +async fn export_import_v2_skipped_chunk_e2e() -> Result<()> { + let addr = env::var("GREPTIME_ADDR").unwrap_or_else(|_| "127.0.0.1:4000".to_string()); + let catalog = env::var("GREPTIME_CATALOG").unwrap_or_else(|_| "greptime".to_string()); + let auth_basic = env::var("GREPTIME_AUTH_BASIC").ok(); + let schema = "test_db_skipped_chunk"; + + let database_client = DatabaseClient::new( + addr.clone(), + catalog.clone(), + auth_basic.clone(), + Duration::from_secs(60), + None, + false, + ); + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + database_client + .sql_in_public(&format!("CREATE DATABASE {schema}")) + .await?; + database_client + .sql( + "CREATE TABLE metrics (\ + ts TIMESTAMP TIME INDEX, \ + host STRING PRIMARY KEY, \ + cpu DOUBLE \ + ) ENGINE=mito", + schema, + ) + .await?; + database_client + .sql( + "INSERT INTO metrics (ts, host, cpu) VALUES \ + ('2025-01-01T00:00:00Z', 'h1', 1.0), \ + ('2025-01-01T01:00:00Z', 'h2', 2.0)", + schema, + ) + .await?; + + let src_dir = tempdir().context(FileIoSnafu)?; + let src_uri = Url::from_directory_path(src_dir.path()) + .map_err(|_| { + InvalidArgumentsSnafu { + msg: "invalid temp dir path".to_string(), + } + .build() + })? + .to_string(); + + let mut export_args = vec![ + "export-v2-create", + "--addr", + &addr, + "--to", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema, + "--start-time", + "2025-01-01T00:00:00Z", + "--end-time", + "2025-01-01T02:00:00Z", + "--chunk-time-window", + "1h", + ]; + if let Some(auth) = &auth_basic { + export_args.push("--auth-basic"); + export_args.push(auth); + } + let export_cmd = ExportCreateCommand::parse_from(export_args); + export_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + let storage_config = ObjectStoreConfig::default(); + let storage = OpenDalStorage::from_uri(&src_uri, &storage_config) + .map_err(BoxedError::new) + .context(OtherSnafu)?; + let mut manifest = storage + .read_manifest() + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; + assert_eq!(manifest.chunks.len(), 2); + manifest.chunks[0].status = ChunkStatus::Skipped; + manifest.chunks[0].files.clear(); + storage + .write_manifest(&manifest) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + + let mut import_args = vec![ + "import-v2", + "--addr", + &addr, + "--from", + &src_uri, + "--catalog", + &catalog, + "--schemas", + schema, + ]; + if let Some(auth) = &auth_basic { + import_args.push("--auth-basic"); + import_args.push(auth); + } + let import_cmd = ImportV2Command::parse_from(import_args); + import_cmd + .build() + .await + .context(OtherSnafu)? + .do_work() + .await + .context(OtherSnafu)?; + + let actual_rows = query_count(&database_client, schema, "metrics").await?; + assert_eq!(actual_rows, 1); + + database_client + .sql_in_public(&format!("DROP DATABASE IF EXISTS {schema}")) + .await?; + + Ok(()) +} diff --git a/src/cli/src/data/import_v2/command.rs b/src/cli/src/data/import_v2/command.rs index 6a9d440071..ea92e15ee1 100644 --- a/src/cli/src/data/import_v2/command.rs +++ b/src/cli/src/data/import_v2/command.rs @@ -15,7 +15,7 @@ //! Import V2 CLI command. use std::collections::HashSet; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use clap::Parser; @@ -25,13 +25,15 @@ use snafu::ResultExt; use crate::Tool; use crate::common::ObjectStoreConfig; -use crate::data::export_v2::manifest::MANIFEST_VERSION; +use crate::data::export_v2::data::{build_copy_source, execute_copy_database_from}; +use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, DataFormat, MANIFEST_VERSION}; use crate::data::import_v2::error::{ - FullSnapshotImportNotSupportedSnafu, ManifestVersionMismatchSnafu, Result, - SchemaNotInSnapshotSnafu, SnapshotStorageSnafu, + ChunkImportFailedSnafu, EmptyChunkManifestSnafu, IncompleteSnapshotSnafu, + ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result, SchemaNotInSnapshotSnafu, + SnapshotStorageSnafu, }; use crate::data::import_v2::executor::{DdlExecutor, DdlStatement}; -use crate::data::path::ddl_path_for_schema; +use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema}; use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri}; use crate::database::{DatabaseClient, parse_proxy_opts}; @@ -116,8 +118,11 @@ impl ImportV2Command { ); Ok(Box::new(Import { + catalog: self.catalog.clone(), schemas, dry_run: self.dry_run, + snapshot_uri: self.from.clone(), + storage_config: self.storage.clone(), storage: Box::new(storage), database_client, })) @@ -126,8 +131,11 @@ impl ImportV2Command { /// Import tool implementation. pub struct Import { + catalog: String, schemas: Option>, dry_run: bool, + snapshot_uri: String, + storage_config: ObjectStoreConfig, storage: Box, database_client: DatabaseClient, } @@ -164,13 +172,6 @@ impl Import { info!("Snapshot contains {} schema(s)", manifest.schemas.len()); - if !manifest.schema_only && !manifest.chunks.is_empty() { - return FullSnapshotImportNotSupportedSnafu { - chunk_count: manifest.chunks.len(), - } - .fail(); - } - // 2. Determine schemas to import let schemas_to_import = match &self.schemas { Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?, @@ -184,6 +185,15 @@ impl Import { info!("Generated {} DDL statements", ddl_statements.len()); + let data_prefixes = if !manifest.schema_only && !manifest.chunks.is_empty() { + Some( + validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import) + .await?, + ) + } else { + None + }; + // 4. Dry-run mode: print DDL and exit if self.dry_run { info!("Dry-run mode - DDL statements to execute:"); @@ -193,6 +203,12 @@ impl Import { println!("{};", stmt.sql); println!(); } + if !manifest.schema_only && !manifest.chunks.is_empty() { + for line in format_data_import_plan(&manifest.chunks, &schemas_to_import) { + println!("{line}"); + } + println!(); + } return Ok(()); } @@ -200,6 +216,16 @@ impl Import { let executor = DdlExecutor::new(&self.database_client); executor.execute_strict(&ddl_statements).await?; + if !manifest.schema_only && !manifest.chunks.is_empty() { + self.import_data( + &manifest.chunks, + &schemas_to_import, + manifest.format, + data_prefixes.expect("validated full snapshot must provide data prefixes"), + ) + .await?; + } + info!( "Import completed: {} DDL statements executed", ddl_statements.len() @@ -226,6 +252,80 @@ impl Import { Ok(statements) } + + async fn import_data( + &self, + chunks: &[ChunkMeta], + schemas: &[String], + format: DataFormat, + actual_prefixes: HashSet, + ) -> Result<()> { + let import_start = Instant::now(); + let total_chunks = chunks + .iter() + .filter(|chunk| chunk.status == ChunkStatus::Completed) + .count(); + info!( + "Importing data: {} chunks, {} schemas", + total_chunks, + schemas.len() + ); + + for (idx, chunk) in chunks.iter().enumerate() { + if chunk.status == ChunkStatus::Skipped { + info!( + "[{}/{}] Chunk {}: skipped (no data)", + idx + 1, + chunks.len(), + chunk.id + ); + continue; + } + + info!( + "[{}/{}] Chunk {} ({:?} ~ {:?})", + idx + 1, + chunks.len(), + chunk.id, + chunk.time_range.start, + chunk.time_range.end + ); + + for schema in schemas { + if !validate_chunk_schema_files(chunk, schema, &actual_prefixes)? { + info!(" {}: no data, skipped", schema); + continue; + } + + info!(" {}: importing...", schema); + let copy_start = Instant::now(); + let source = + build_copy_source(&self.snapshot_uri, &self.storage_config, schema, chunk.id) + .context(ChunkImportFailedSnafu { + chunk_id: chunk.id, + schema: schema.clone(), + })?; + + execute_copy_database_from( + &self.database_client, + &self.catalog, + schema, + &source, + format, + ) + .await + .context(ChunkImportFailedSnafu { + chunk_id: chunk.id, + schema: schema.clone(), + })?; + + info!(" {}: done in {:?}", schema, copy_start.elapsed()); + } + } + + info!("Data import finished in {:?}", import_start.elapsed()); + Ok(()) + } } fn parse_ddl_statements(content: &str) -> Vec { @@ -395,21 +495,130 @@ fn canonicalize_schema_filter( Ok(canonicalized) } +fn validate_chunk_statuses(chunks: &[ChunkMeta]) -> Result<()> { + let invalid_chunk = chunks + .iter() + .find(|chunk| !matches!(chunk.status, ChunkStatus::Completed | ChunkStatus::Skipped)); + + if let Some(chunk) = invalid_chunk { + return IncompleteSnapshotSnafu { + chunk_id: chunk.id, + status: chunk.status, + } + .fail(); + } + + Ok(()) +} + +fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool { + let prefix = data_dir_for_schema_chunk(schema, chunk.id); + chunk.files.iter().any(|path| { + let normalized = path.trim_start_matches('/'); + normalized.starts_with(&prefix) + }) +} + +fn format_data_import_plan(chunks: &[ChunkMeta], schemas: &[String]) -> Vec { + let mut lines = vec!["-- Data import plan:".to_string()]; + for chunk in chunks { + lines.push(format!("-- Chunk {}: {:?}", chunk.id, chunk.status)); + for schema in schemas { + if chunk_has_schema_files(chunk, schema) { + lines.push(format!("-- {} -> COPY DATABASE FROM", schema)); + } + } + } + lines +} + +async fn validate_data_snapshot( + storage: &dyn SnapshotStorage, + chunks: &[ChunkMeta], + schemas: &[String], +) -> Result> { + validate_chunk_statuses(chunks)?; + let actual_prefixes = collect_chunk_data_prefixes(storage).await?; + + for chunk in chunks { + if chunk.status == ChunkStatus::Skipped { + continue; + } + if chunk.files.is_empty() { + return EmptyChunkManifestSnafu { chunk_id: chunk.id }.fail(); + } + for schema in schemas { + validate_chunk_schema_files(chunk, schema, &actual_prefixes)?; + } + } + + Ok(actual_prefixes) +} + +async fn collect_chunk_data_prefixes(storage: &dyn SnapshotStorage) -> Result> { + let files = storage + .list_files_recursive("data/") + .await + .context(SnapshotStorageSnafu)?; + let mut prefixes = HashSet::new(); + + for path in files { + let normalized = path.trim_start_matches('/'); + let mut parts = normalized.splitn(4, '/'); + let Some(root) = parts.next() else { + continue; + }; + let Some(schema) = parts.next() else { + continue; + }; + let Some(chunk_id) = parts.next() else { + continue; + }; + if root != "data" { + continue; + } + prefixes.insert(format!("data/{schema}/{chunk_id}/")); + } + + Ok(prefixes) +} + +fn validate_chunk_schema_files( + chunk: &ChunkMeta, + schema: &str, + actual_prefixes: &HashSet, +) -> Result { + if !chunk_has_schema_files(chunk, schema) { + return Ok(false); + } + + let prefix = data_dir_for_schema_chunk(schema, chunk.id); + if !actual_prefixes.contains(&prefix) { + return MissingChunkDataSnafu { + chunk_id: chunk.id, + schema: schema.to_string(), + path: prefix, + } + .fail(); + } + + Ok(true) +} + #[cfg(test)] mod tests { - use std::time::Duration; + use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use super::*; - use crate::Tool; - use crate::data::export_v2::manifest::{ChunkMeta, DataFormat, Manifest, TimeRange}; + use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, Manifest, TimeRange}; use crate::data::export_v2::schema::SchemaSnapshot; use crate::data::snapshot_storage::SnapshotStorage; - use crate::database::DatabaseClient; struct StubStorage { manifest: Manifest, + files_by_prefix: HashMap>, } #[async_trait] @@ -454,9 +663,14 @@ mod tests { async fn list_files_recursive( &self, - _prefix: &str, + prefix: &str, ) -> crate::data::export_v2::error::Result> { - unimplemented!("not needed in import_v2::command tests") + Ok(self + .files_by_prefix + .iter() + .filter(|(candidate, _)| candidate.starts_with(prefix)) + .flat_map(|(_, files)| files.clone()) + .collect()) } async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> { @@ -464,48 +678,6 @@ mod tests { } } - fn test_database_client() -> DatabaseClient { - DatabaseClient::new( - "127.0.0.1:4000".to_string(), - "greptime".to_string(), - None, - Duration::from_secs(1), - None, - false, - ) - } - - #[tokio::test] - async fn test_import_rejects_full_snapshot_before_schema_execution() { - let mut manifest = Manifest::new_full( - "greptime".to_string(), - vec!["public".to_string()], - TimeRange::unbounded(), - DataFormat::Parquet, - ); - manifest - .chunks - .push(ChunkMeta::new(1, TimeRange::unbounded())); - - let import = Import { - schemas: None, - dry_run: false, - storage: Box::new(StubStorage { manifest }), - database_client: test_database_client(), - }; - - let error = import - .do_work() - .await - .expect_err("full snapshot import should fail"); - - assert!( - error - .to_string() - .contains("Importing data from full snapshots is not implemented yet") - ); - } - #[test] fn test_parse_ddl_statements() { let content = r#" @@ -640,4 +812,164 @@ CREATE VIEW v AS SELECT 1; assert!(!starts_with_keyword("CREATED TABLE t", "CREATE")); assert!(!starts_with_keyword("TABLESPACE foo", "TABLE")); } + + #[test] + fn test_validate_chunk_statuses_rejects_failed_chunk() { + let mut failed = ChunkMeta::new(3, TimeRange::unbounded()); + failed.status = ChunkStatus::Failed; + + let error = validate_chunk_statuses(&[failed]).expect_err("failed chunk should error"); + assert!(error.to_string().contains("Incomplete snapshot")); + } + + #[test] + fn test_validate_chunk_statuses_accepts_completed_and_skipped_chunks() { + let mut completed = ChunkMeta::new(1, TimeRange::unbounded()); + completed.status = ChunkStatus::Completed; + let skipped = ChunkMeta::skipped(2, TimeRange::unbounded()); + + assert!(validate_chunk_statuses(&[completed, skipped]).is_ok()); + } + + #[test] + fn test_chunk_has_schema_files_matches_encoded_schema_prefix() { + let mut chunk = ChunkMeta::new(7, TimeRange::unbounded()); + chunk.files = vec![ + "data/public/7/a.parquet".to_string(), + "data/%E6%B5%8B%E8%AF%95/7/b.parquet".to_string(), + ]; + + assert!(chunk_has_schema_files(&chunk, "public")); + assert!(chunk_has_schema_files(&chunk, "测试")); + assert!(!chunk_has_schema_files(&chunk, "metrics")); + } + + #[test] + fn test_format_data_import_plan_includes_matching_schemas_only() { + let mut completed = ChunkMeta::new(1, TimeRange::unbounded()); + completed.status = ChunkStatus::Completed; + completed.files = vec![ + "data/public/1/a.parquet".to_string(), + "data/%E6%B5%8B%E8%AF%95/1/b.parquet".to_string(), + ]; + let skipped = ChunkMeta::skipped(2, TimeRange::unbounded()); + + let lines = format_data_import_plan( + &[completed, skipped], + &[ + "public".to_string(), + "测试".to_string(), + "metrics".to_string(), + ], + ); + + assert_eq!(lines[0], "-- Data import plan:"); + assert!(lines.contains(&"-- Chunk 1: Completed".to_string())); + assert!(lines.contains(&"-- public -> COPY DATABASE FROM".to_string())); + assert!(lines.contains(&"-- 测试 -> COPY DATABASE FROM".to_string())); + assert!(!lines.contains(&"-- metrics -> COPY DATABASE FROM".to_string())); + assert!(lines.contains(&"-- Chunk 2: Skipped".to_string())); + } + + #[tokio::test] + async fn test_collect_chunk_data_prefixes_indexes_present_prefixes() { + let storage = StubStorage { + manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]), + files_by_prefix: HashMap::from([ + ( + "data/public/7/".to_string(), + vec!["data/public/7/a.parquet".to_string()], + ), + ( + "data/%E6%B5%8B%E8%AF%95/9/".to_string(), + vec!["data/%E6%B5%8B%E8%AF%95/9/b.parquet".to_string()], + ), + ]), + }; + + let prefixes = collect_chunk_data_prefixes(&storage).await.unwrap(); + + assert!(prefixes.contains("data/public/7/")); + assert!(prefixes.contains("data/%E6%B5%8B%E8%AF%95/9/")); + } + + #[test] + fn test_validate_chunk_schema_files_accepts_present_prefix() { + let mut chunk = ChunkMeta::new(7, TimeRange::unbounded()); + chunk.files = vec!["data/public/7/a.parquet".to_string()]; + let actual_prefixes = HashSet::from(["data/public/7/".to_string()]); + + assert!(validate_chunk_schema_files(&chunk, "public", &actual_prefixes).unwrap()); + } + + #[test] + fn test_validate_chunk_schema_files_rejects_missing_prefix() { + let mut chunk = ChunkMeta::new(7, TimeRange::unbounded()); + chunk.files = vec!["data/public/7/a.parquet".to_string()]; + + let error = validate_chunk_schema_files(&chunk, "public", &HashSet::new()) + .expect_err("missing chunk prefix should fail") + .to_string(); + assert!(error.contains("marked completed but no files were found")); + } + + #[test] + fn test_validate_chunk_schema_files_skips_absent_schema() { + let mut chunk = ChunkMeta::new(7, TimeRange::unbounded()); + chunk.files = vec!["data/public/7/a.parquet".to_string()]; + + assert!(!validate_chunk_schema_files(&chunk, "metrics", &HashSet::new()).unwrap()); + } + + #[tokio::test] + async fn test_validate_data_snapshot_rejects_failed_chunk_before_dry_run() { + let mut failed = ChunkMeta::new(3, TimeRange::unbounded()); + failed.status = ChunkStatus::Failed; + + let storage = StubStorage { + manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]), + files_by_prefix: HashMap::new(), + }; + + let error = validate_data_snapshot(&storage, &[failed], &["public".to_string()]) + .await + .expect_err("failed chunk should reject dry-run validation") + .to_string(); + assert!(error.contains("Incomplete snapshot")); + } + + #[tokio::test] + async fn test_validate_data_snapshot_rejects_missing_chunk_prefix_before_dry_run() { + let mut completed = ChunkMeta::new(7, TimeRange::unbounded()); + completed.status = ChunkStatus::Completed; + completed.files = vec!["data/public/7/a.parquet".to_string()]; + + let storage = StubStorage { + manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]), + files_by_prefix: HashMap::new(), + }; + + let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()]) + .await + .expect_err("missing chunk prefix should reject dry-run validation") + .to_string(); + assert!(error.contains("marked completed but no files were found")); + } + + #[tokio::test] + async fn test_validate_data_snapshot_rejects_completed_chunk_with_empty_manifest() { + let mut completed = ChunkMeta::new(7, TimeRange::unbounded()); + completed.status = ChunkStatus::Completed; + + let storage = StubStorage { + manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]), + files_by_prefix: HashMap::new(), + }; + + let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()]) + .await + .expect_err("empty completed chunk should reject validation") + .to_string(); + assert!(error.contains("file manifest is empty")); + } } diff --git a/src/cli/src/data/import_v2/error.rs b/src/cli/src/data/import_v2/error.rs index 169f11c0fa..e2b6f90d1f 100644 --- a/src/cli/src/data/import_v2/error.rs +++ b/src/cli/src/data/import_v2/error.rs @@ -19,6 +19,8 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; +use crate::data::export_v2::manifest::ChunkStatus; + #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -45,12 +47,44 @@ pub enum Error { location: Location, }, + #[snafu(display("Incomplete snapshot: chunk {} has status {:?}", chunk_id, status))] + IncompleteSnapshot { + chunk_id: u32, + status: ChunkStatus, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( - "Importing data from full snapshots is not implemented yet (snapshot has {} chunk(s))", - chunk_count + "Snapshot is inconsistent: chunk {} is marked completed but its file manifest is empty", + chunk_id ))] - FullSnapshotImportNotSupported { - chunk_count: usize, + EmptyChunkManifest { + chunk_id: u32, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Snapshot is inconsistent: chunk {} for schema '{}' is marked completed but no files were found under '{}'", + chunk_id, + schema, + path + ))] + MissingChunkData { + chunk_id: u32, + schema: String, + path: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Chunk {} import failed for schema '{}'", chunk_id, schema))] + ChunkImportFailed { + chunk_id: u32, + schema: String, + #[snafu(source)] + error: crate::data::export_v2::error::Error, #[snafu(implicit)] location: Location, }, @@ -80,9 +114,13 @@ impl ErrorExt for Error { Error::SnapshotNotFound { .. } | Error::SchemaNotInSnapshot { .. } | Error::ManifestVersionMismatch { .. } - | Error::FullSnapshotImportNotSupported { .. } => StatusCode::InvalidArguments, + | Error::IncompleteSnapshot { .. } + | Error::EmptyChunkManifest { .. } + | Error::MissingChunkData { .. } => StatusCode::InvalidArguments, Error::Database { error, .. } => error.status_code(), - Error::SnapshotStorage { error, .. } => error.status_code(), + Error::SnapshotStorage { error, .. } | Error::ChunkImportFailed { error, .. } => { + error.status_code() + } } }