diff --git a/src/cli/src/data/export_v2.rs b/src/cli/src/data/export_v2.rs index 91020d2f2e..1921ffe4b4 100644 --- a/src/cli/src/data/export_v2.rs +++ b/src/cli/src/data/export_v2.rs @@ -30,7 +30,7 @@ //! --to file:///tmp/snapshot \ //! --schema-only //! -//! # Export with time range (M2) +//! # Export with time range //! greptime cli data export-v2 create \ //! --addr 127.0.0.1:4000 \ //! --to s3://bucket/snapshots/prod-20250101 \ @@ -38,7 +38,10 @@ //! --end-time 2025-01-31T23:59:59Z //! ``` +mod chunker; mod command; +mod coordinator; +mod data; pub mod error; pub mod extractor; pub mod manifest; diff --git a/src/cli/src/data/export_v2/chunker.rs b/src/cli/src/data/export_v2/chunker.rs new file mode 100644 index 0000000000..260d95fae9 --- /dev/null +++ b/src/cli/src/data/export_v2/chunker.rs @@ -0,0 +1,103 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use chrono::Duration as ChronoDuration; + +use crate::data::export_v2::manifest::{ChunkMeta, TimeRange}; + +pub fn generate_chunks(time_range: &TimeRange, window: Duration) -> Vec { + let (Some(start), Some(end)) = (time_range.start, time_range.end) else { + return vec![ChunkMeta::new(1, time_range.clone())]; + }; + + if start == end { + return vec![ChunkMeta::skipped(1, time_range.clone())]; + } + + if start > end { + return Vec::new(); + } + + let window = match ChronoDuration::from_std(window) { + Ok(window) if window > ChronoDuration::zero() => window, + _ => return vec![ChunkMeta::new(1, time_range.clone())], + }; + + let mut chunks = Vec::new(); + let mut cursor = start; + let mut id = 1; + + while cursor < end { + let next = cursor + .checked_add_signed(window) + .map_or(end, |timestamp| timestamp.min(end)); + chunks.push(ChunkMeta::new(id, TimeRange::new(Some(cursor), Some(next)))); + id += 1; + cursor = next; + } + + chunks +} + +#[cfg(test)] +mod tests { + use chrono::{TimeZone, Utc}; + + use super::*; + use crate::data::export_v2::manifest::ChunkStatus; + + #[test] + fn test_generate_chunks_unbounded() { + let range = TimeRange::unbounded(); + let chunks = generate_chunks(&range, Duration::from_secs(3600)); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].time_range, range); + } + + #[test] + fn test_generate_chunks_split() { + let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let end = Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap(); + let range = TimeRange::new(Some(start), Some(end)); + + let chunks = generate_chunks(&range, Duration::from_secs(3600)); + assert_eq!(chunks.len(), 3); + assert_eq!(chunks[0].time_range.start, Some(start)); + assert_eq!( + chunks[2].time_range.end, + Some(Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap()) + ); + } + + #[test] + fn test_generate_chunks_empty_range() { + let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let range = TimeRange::new(Some(start), Some(start)); + let chunks = generate_chunks(&range, Duration::from_secs(3600)); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].status, ChunkStatus::Skipped); + assert_eq!(chunks[0].time_range, range); + } + + #[test] + fn test_generate_chunks_invalid_range_is_empty() { + let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap(); + let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let range = TimeRange::new(Some(start), Some(end)); + let chunks = generate_chunks(&range, Duration::from_secs(3600)); + assert!(chunks.is_empty()); + } +} diff --git a/src/cli/src/data/export_v2/command.rs b/src/cli/src/data/export_v2/command.rs index 341436fe0f..ddcb323fef 100644 --- a/src/cli/src/data/export_v2/command.rs +++ b/src/cli/src/data/export_v2/command.rs @@ -26,12 +26,16 @@ use snafu::{OptionExt, ResultExt}; use crate::Tool; use crate::common::ObjectStoreConfig; +use crate::data::export_v2::coordinator::export_data; use crate::data::export_v2::error::{ - CannotResumeSchemaOnlySnafu, DataExportNotImplementedSnafu, DatabaseSnafu, EmptyResultSnafu, - ManifestVersionMismatchSnafu, Result, UnexpectedValueTypeSnafu, + ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu, + ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu, + SchemaOnlyModeMismatchSnafu, UnexpectedValueTypeSnafu, }; use crate::data::export_v2::extractor::SchemaExtractor; -use crate::data::export_v2::manifest::{DataFormat, MANIFEST_VERSION, Manifest}; +use crate::data::export_v2::manifest::{ + ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange, +}; use crate::data::path::ddl_path_for_schema; use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri}; use crate::data::sql::{escape_sql_identifier, escape_sql_literal}; @@ -84,6 +88,11 @@ pub struct ExportCreateCommand { #[clap(long)] end_time: Option, + /// Chunk time window (e.g., 1h, 6h, 1d, 7d). + /// Requires both --start-time and --end-time when specified. + #[clap(long, value_parser = humantime::parse_duration)] + chunk_time_window: Option, + /// Data format: parquet, csv, json. #[clap(long, value_enum, default_value = "parquet")] format: DataFormat, @@ -92,7 +101,7 @@ pub struct ExportCreateCommand { #[clap(long)] force: bool, - /// Concurrency level (for future use). + /// Parallelism for COPY DATABASE execution (server-side, per schema per chunk). #[clap(long, default_value = "1")] parallelism: usize, @@ -127,11 +136,38 @@ impl ExportCreateCommand { // Validate URI format validate_uri(&self.to).map_err(BoxedError::new)?; - if !self.schema_only { - return DataExportNotImplementedSnafu + let time_range = TimeRange::parse(self.start_time.as_deref(), self.end_time.as_deref()) + .map_err(BoxedError::new)?; + if self.chunk_time_window.is_some() && !time_range.is_bounded() { + return ChunkTimeWindowRequiresBoundsSnafu .fail() .map_err(BoxedError::new); } + if self.schema_only { + let mut invalid_args = Vec::new(); + if self.start_time.is_some() { + invalid_args.push("--start-time"); + } + if self.end_time.is_some() { + invalid_args.push("--end-time"); + } + if self.chunk_time_window.is_some() { + invalid_args.push("--chunk-time-window"); + } + if self.format != DataFormat::Parquet { + invalid_args.push("--format"); + } + if self.parallelism != 1 { + invalid_args.push("--parallelism"); + } + if !invalid_args.is_empty() { + return SchemaOnlyArgsNotAllowedSnafu { + args: invalid_args.join(", "), + } + .fail() + .map_err(BoxedError::new); + } + } // Parse schemas (empty vec means all schemas) let schemas = if self.schemas.is_empty() { @@ -155,12 +191,18 @@ impl ExportCreateCommand { ); Ok(Box::new(ExportCreate { - catalog: self.catalog.clone(), - schemas, - schema_only: self.schema_only, - _format: self.format, - force: self.force, - _parallelism: self.parallelism, + config: ExportConfig { + catalog: self.catalog.clone(), + schemas, + schema_only: self.schema_only, + format: self.format, + force: self.force, + time_range, + chunk_time_window: self.chunk_time_window, + parallelism: self.parallelism, + snapshot_uri: self.to.clone(), + storage_config: self.storage.clone(), + }, storage: Box::new(storage), database_client, })) @@ -169,14 +211,22 @@ impl ExportCreateCommand { /// Export tool implementation. pub struct ExportCreate { + config: ExportConfig, + storage: Box, + database_client: DatabaseClient, +} + +struct ExportConfig { catalog: String, schemas: Option>, schema_only: bool, - _format: DataFormat, + format: DataFormat, force: bool, - _parallelism: usize, - storage: Box, - database_client: DatabaseClient, + time_range: TimeRange, + chunk_time_window: Option, + parallelism: usize, + snapshot_uri: String, + storage_config: ObjectStoreConfig, } #[async_trait] @@ -192,12 +242,12 @@ impl ExportCreate { let exists = self.storage.exists().await?; if exists { - if self.force { + if self.config.force { info!("Deleting existing snapshot (--force)"); self.storage.delete_snapshot().await?; } else { // Resume mode - read existing manifest - let manifest = self.storage.read_manifest().await?; + let mut manifest = self.storage.read_manifest().await?; // Check version compatibility if manifest.version != MANIFEST_VERSION { @@ -208,10 +258,7 @@ impl ExportCreate { .fail(); } - // Cannot resume schema-only with data export - if manifest.schema_only && !self.schema_only { - return CannotResumeSchemaOnlySnafu.fail(); - } + validate_resume_config(&manifest, &self.config)?; info!( "Resuming existing snapshot: {} (completed: {}/{} chunks)", @@ -220,22 +267,31 @@ impl ExportCreate { manifest.chunks.len() ); - // For M1, we only handle schema-only exports - // M2 will add chunk resume logic if manifest.is_complete() { info!("Snapshot is already complete"); return Ok(()); } - // TODO: Resume data export in M2 - info!("Data export resume not yet implemented (M2)"); + if manifest.schema_only { + return Ok(()); + } + + export_data( + self.storage.as_ref(), + &self.database_client, + &self.config.snapshot_uri, + &self.config.storage_config, + &mut manifest, + self.config.parallelism, + ) + .await?; return Ok(()); } } // 2. Get schema list - let extractor = SchemaExtractor::new(&self.database_client, &self.catalog); - let schema_snapshot = extractor.extract(self.schemas.as_deref()).await?; + let extractor = SchemaExtractor::new(&self.database_client, &self.config.catalog); + let schema_snapshot = extractor.extract(self.config.schemas.as_deref()).await?; let schema_names: Vec = schema_snapshot .schemas @@ -245,7 +301,14 @@ impl ExportCreate { info!("Exporting schemas: {:?}", schema_names); // 3. Create manifest - let manifest = Manifest::new_schema_only(self.catalog.clone(), schema_names.clone()); + let mut manifest = Manifest::new_for_export( + self.config.catalog.clone(), + schema_names.clone(), + self.config.schema_only, + self.config.time_range.clone(), + self.config.format, + self.config.chunk_time_window, + )?; // 4. Write schema files self.storage.write_schema(&schema_snapshot).await?; @@ -259,14 +322,28 @@ impl ExportCreate { info!("Exported DDL for schema {} to {}", schema, ddl_path); } - // 6. Write manifest last. + // 6. Write manifest after schema artifacts and before any data export. // // The manifest is the snapshot commit point: only write it after the schema // index and all DDL files are durable, so a crash cannot leave a "valid" - // snapshot that is missing required schema artifacts. + // snapshot that is missing required schema artifacts. For full exports we + // still need the manifest before data copy starts, because chunk resume is + // tracked by updating this manifest in place. self.storage.write_manifest(&manifest).await?; info!("Snapshot created: {}", manifest.snapshot_id); + if !self.config.schema_only { + export_data( + self.storage.as_ref(), + &self.database_client, + &self.config.snapshot_uri, + &self.config.storage_config, + &mut manifest, + self.config.parallelism, + ) + .await?; + } + Ok(()) } @@ -321,7 +398,7 @@ impl ExportCreate { "SELECT table_name, table_type FROM information_schema.tables \ WHERE table_catalog = '{}' AND table_schema = '{}' \ AND (table_type = 'BASE TABLE' OR table_type = 'VIEW')", - escape_sql_literal(&self.catalog), + escape_sql_literal(&self.config.catalog), escape_sql_literal(schema) ); let records: Option>> = self @@ -359,7 +436,7 @@ impl ExportCreate { let sql = format!( "SELECT DISTINCT table_name FROM information_schema.columns \ WHERE table_catalog = '{}' AND table_schema = '{}' AND column_name = '__tsid'", - escape_sql_literal(&self.catalog), + escape_sql_literal(&self.config.catalog), escape_sql_literal(schema) ); let records: Option>> = self @@ -392,14 +469,14 @@ impl ExportCreate { Some(table) => format!( r#"SHOW CREATE {} "{}"."{}"."{}""#, show_type, - escape_sql_identifier(&self.catalog), + escape_sql_identifier(&self.config.catalog), escape_sql_identifier(schema), escape_sql_identifier(table) ), None => format!( r#"SHOW CREATE {} "{}"."{}""#, show_type, - escape_sql_identifier(&self.catalog), + escape_sql_identifier(&self.config.catalog), escape_sql_identifier(schema) ), }; @@ -442,8 +519,118 @@ fn build_schema_ddl( ddl } +fn validate_resume_config(manifest: &Manifest, config: &ExportConfig) -> Result<()> { + if manifest.schema_only != config.schema_only { + return SchemaOnlyModeMismatchSnafu { + existing_schema_only: manifest.schema_only, + requested_schema_only: config.schema_only, + } + .fail(); + } + + if manifest.catalog != config.catalog { + return ResumeConfigMismatchSnafu { + field: "catalog", + existing: manifest.catalog.clone(), + requested: config.catalog.clone(), + } + .fail(); + } + + // If no schema filter is provided on resume, inherit the existing snapshot + // selection instead of reinterpreting the request as "all schemas". + if let Some(requested_schemas) = &config.schemas + && !schema_selection_matches(&manifest.schemas, requested_schemas) + { + return ResumeConfigMismatchSnafu { + field: "schemas", + existing: format_schema_selection(&manifest.schemas), + requested: format_schema_selection(requested_schemas), + } + .fail(); + } + + if manifest.time_range != config.time_range { + return ResumeConfigMismatchSnafu { + field: "time_range", + existing: format!("{:?}", manifest.time_range), + requested: format!("{:?}", config.time_range), + } + .fail(); + } + + if manifest.format != config.format { + return ResumeConfigMismatchSnafu { + field: "format", + existing: manifest.format.to_string(), + requested: config.format.to_string(), + } + .fail(); + } + + let expected_plan = Manifest::new_for_export( + manifest.catalog.clone(), + manifest.schemas.clone(), + config.schema_only, + config.time_range.clone(), + config.format, + config.chunk_time_window, + )?; + if !chunk_plan_matches(manifest, &expected_plan) { + return ResumeConfigMismatchSnafu { + field: "chunk plan", + existing: format_chunk_plan(&manifest.chunks), + requested: format_chunk_plan(&expected_plan.chunks), + } + .fail(); + } + + Ok(()) +} + +fn schema_selection_matches(existing: &[String], requested: &[String]) -> bool { + canonical_schema_selection(existing) == canonical_schema_selection(requested) +} + +fn canonical_schema_selection(schemas: &[String]) -> Vec { + let mut canonicalized = Vec::new(); + let mut seen = HashSet::new(); + + for schema in schemas { + let normalized = schema.to_ascii_lowercase(); + if seen.insert(normalized.clone()) { + canonicalized.push(normalized); + } + } + + canonicalized.sort(); + canonicalized +} + +fn format_schema_selection(schemas: &[String]) -> String { + format!("[{}]", schemas.join(", ")) +} + +fn chunk_plan_matches(existing: &Manifest, expected: &Manifest) -> bool { + existing.chunks.len() == expected.chunks.len() + && existing + .chunks + .iter() + .zip(&expected.chunks) + .all(|(left, right)| left.id == right.id && left.time_range == right.time_range) +} + +fn format_chunk_plan(chunks: &[ChunkMeta]) -> String { + let items = chunks + .iter() + .map(|chunk| format!("#{}:{:?}", chunk.id, chunk.time_range)) + .collect::>(); + format!("[{}]", items.join(", ")) +} + #[cfg(test)] mod tests { + use chrono::TimeZone; use clap::Parser; use super::*; @@ -478,19 +665,225 @@ mod tests { } #[tokio::test] - async fn test_build_rejects_non_schema_only_export() { + async fn test_build_rejects_chunk_window_without_bounds() { let cmd = ExportCreateCommand::parse_from([ "export-v2-create", "--addr", "127.0.0.1:4000", "--to", "file:///tmp/export-v2-test", + "--chunk-time-window", + "1h", ]); let result = cmd.build().await; assert!(result.is_err()); let error = result.err().unwrap().to_string(); - assert!(error.contains("Data export is not implemented yet")); + assert!(error.contains("chunk_time_window requires both --start-time and --end-time")); + } + + #[tokio::test] + async fn test_build_rejects_data_export_args_in_schema_only_mode() { + let cmd = ExportCreateCommand::parse_from([ + "export-v2-create", + "--addr", + "127.0.0.1:4000", + "--to", + "file:///tmp/export-v2-test", + "--schema-only", + "--start-time", + "2024-01-01T00:00:00Z", + "--end-time", + "2024-01-02T00:00:00Z", + "--chunk-time-window", + "1h", + "--format", + "csv", + "--parallelism", + "2", + ]); + + let error = cmd.build().await.err().unwrap().to_string(); + + assert!(error.contains("--schema-only cannot be used with data export arguments")); + assert!(error.contains("--start-time")); + assert!(error.contains("--end-time")); + assert!(error.contains("--chunk-time-window")); + assert!(error.contains("--format")); + assert!(error.contains("--parallelism")); + } + + #[test] + fn test_schema_only_mode_mismatch_error_message() { + let error = crate::data::export_v2::error::SchemaOnlyModeMismatchSnafu { + existing_schema_only: false, + requested_schema_only: true, + } + .build() + .to_string(); + + assert!(error.contains("existing: false")); + assert!(error.contains("requested: true")); + } + + #[test] + fn test_validate_resume_config_rejects_catalog_mismatch() { + let manifest = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string()], + false, + TimeRange::unbounded(), + DataFormat::Parquet, + None, + ) + .unwrap(); + let config = ExportConfig { + catalog: "other".to_string(), + schemas: None, + schema_only: false, + format: DataFormat::Parquet, + force: false, + time_range: TimeRange::unbounded(), + chunk_time_window: None, + parallelism: 1, + snapshot_uri: "file:///tmp/snapshot".to_string(), + storage_config: ObjectStoreConfig::default(), + }; + + let error = validate_resume_config(&manifest, &config) + .err() + .unwrap() + .to_string(); + assert!(error.contains("catalog")); + } + + #[test] + fn test_validate_resume_config_accepts_schema_selection_with_different_case_and_order() { + let manifest = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string(), "analytics".to_string()], + false, + TimeRange::unbounded(), + DataFormat::Parquet, + None, + ) + .unwrap(); + let config = ExportConfig { + catalog: "greptime".to_string(), + schemas: Some(vec![ + "ANALYTICS".to_string(), + "PUBLIC".to_string(), + "public".to_string(), + ]), + schema_only: false, + format: DataFormat::Parquet, + force: false, + time_range: TimeRange::unbounded(), + chunk_time_window: None, + parallelism: 1, + snapshot_uri: "file:///tmp/snapshot".to_string(), + storage_config: ObjectStoreConfig::default(), + }; + + assert!(validate_resume_config(&manifest, &config).is_ok()); + } + + #[test] + fn test_validate_resume_config_rejects_chunk_plan_mismatch() { + let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap(); + let time_range = TimeRange::new(Some(start), Some(end)); + let manifest = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string()], + false, + time_range.clone(), + DataFormat::Parquet, + None, + ) + .unwrap(); + let config = ExportConfig { + catalog: "greptime".to_string(), + schemas: None, + schema_only: false, + format: DataFormat::Parquet, + force: false, + time_range, + chunk_time_window: Some(Duration::from_secs(3600)), + parallelism: 1, + snapshot_uri: "file:///tmp/snapshot".to_string(), + storage_config: ObjectStoreConfig::default(), + }; + + let error = validate_resume_config(&manifest, &config) + .err() + .unwrap() + .to_string(); + assert!(error.contains("chunk plan")); + } + + #[test] + fn test_validate_resume_config_rejects_format_mismatch() { + let manifest = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string()], + false, + TimeRange::unbounded(), + DataFormat::Parquet, + None, + ) + .unwrap(); + let config = ExportConfig { + catalog: "greptime".to_string(), + schemas: None, + schema_only: false, + format: DataFormat::Csv, + force: false, + time_range: TimeRange::unbounded(), + chunk_time_window: None, + parallelism: 1, + snapshot_uri: "file:///tmp/snapshot".to_string(), + storage_config: ObjectStoreConfig::default(), + }; + + let error = validate_resume_config(&manifest, &config) + .err() + .unwrap() + .to_string(); + assert!(error.contains("format")); + } + + #[test] + fn test_validate_resume_config_rejects_time_range_mismatch() { + let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap(); + let manifest = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string()], + false, + TimeRange::new(Some(start), Some(end)), + DataFormat::Parquet, + None, + ) + .unwrap(); + let config = ExportConfig { + catalog: "greptime".to_string(), + schemas: None, + schema_only: false, + format: DataFormat::Parquet, + force: false, + time_range: TimeRange::new(Some(start), Some(start)), + chunk_time_window: None, + parallelism: 1, + snapshot_uri: "file:///tmp/snapshot".to_string(), + storage_config: ObjectStoreConfig::default(), + }; + + let error = validate_resume_config(&manifest, &config) + .err() + .unwrap() + .to_string(); + assert!(error.contains("time_range")); } } diff --git a/src/cli/src/data/export_v2/coordinator.rs b/src/cli/src/data/export_v2/coordinator.rs new file mode 100644 index 0000000000..d96c01d693 --- /dev/null +++ b/src/cli/src/data/export_v2/coordinator.rs @@ -0,0 +1,166 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::info; + +use crate::common::ObjectStoreConfig; +use crate::data::export_v2::data::{CopyOptions, build_copy_target, execute_copy_database}; +use crate::data::export_v2::error::Result; +use crate::data::export_v2::manifest::{ChunkStatus, DataFormat, Manifest, TimeRange}; +use crate::data::path::data_dir_for_schema_chunk; +use crate::data::snapshot_storage::{SnapshotStorage, StorageScheme}; +use crate::database::DatabaseClient; + +struct ExportContext<'a> { + storage: &'a dyn SnapshotStorage, + database_client: &'a DatabaseClient, + snapshot_uri: &'a str, + storage_config: &'a ObjectStoreConfig, + catalog: &'a str, + schemas: &'a [String], + format: DataFormat, + parallelism: usize, +} + +pub async fn export_data( + storage: &dyn SnapshotStorage, + database_client: &DatabaseClient, + snapshot_uri: &str, + storage_config: &ObjectStoreConfig, + manifest: &mut Manifest, + parallelism: usize, +) -> Result<()> { + if manifest.chunks.is_empty() { + return Ok(()); + } + + for idx in 0..manifest.chunks.len() { + if matches!( + manifest.chunks[idx].status, + ChunkStatus::Completed | ChunkStatus::Skipped + ) { + continue; + } + + let (chunk_id, time_range) = mark_chunk_in_progress(manifest, idx); + manifest.touch(); + storage.write_manifest(manifest).await?; + + let context = ExportContext { + storage, + database_client, + snapshot_uri, + storage_config, + catalog: &manifest.catalog, + schemas: &manifest.schemas, + format: manifest.format, + parallelism, + }; + let export_result = export_chunk(&context, chunk_id, time_range).await; + + let result = match export_result { + Ok(files) => { + mark_chunk_completed(manifest, idx, files); + Ok(()) + } + Err(err) => { + mark_chunk_failed(manifest, idx, err.to_string()); + Err(err) + } + }; + + manifest.touch(); + storage.write_manifest(manifest).await?; + + result?; + } + + Ok(()) +} + +fn mark_chunk_in_progress(manifest: &mut Manifest, idx: usize) -> (u32, TimeRange) { + let chunk = &mut manifest.chunks[idx]; + chunk.mark_in_progress(); + (chunk.id, chunk.time_range.clone()) +} + +fn mark_chunk_completed(manifest: &mut Manifest, idx: usize, files: Vec) { + let chunk = &mut manifest.chunks[idx]; + if files.is_empty() { + chunk.mark_skipped(); + } else { + chunk.mark_completed(files, None); + } +} + +fn mark_chunk_failed(manifest: &mut Manifest, idx: usize, error: String) { + let chunk = &mut manifest.chunks[idx]; + chunk.mark_failed(error); +} + +async fn export_chunk( + context: &ExportContext<'_>, + chunk_id: u32, + time_range: TimeRange, +) -> Result> { + let scheme = StorageScheme::from_uri(context.snapshot_uri)?; + let needs_dir = matches!(scheme, StorageScheme::File); + let copy_options = CopyOptions { + format: context.format, + time_range, + parallelism: context.parallelism, + }; + + for schema in context.schemas { + let prefix = data_dir_for_schema_chunk(schema, chunk_id); + if needs_dir { + context.storage.create_dir_all(&prefix).await?; + } + + let target = build_copy_target( + context.snapshot_uri, + context.storage_config, + schema, + chunk_id, + )?; + execute_copy_database( + context.database_client, + context.catalog, + schema, + &target, + ©_options, + ) + .await?; + } + + let files = list_chunk_files(context.storage, context.schemas, chunk_id).await?; + info!("Collected {} files for chunk {}", files.len(), chunk_id); + Ok(files) +} + +async fn list_chunk_files( + storage: &dyn SnapshotStorage, + schemas: &[String], + chunk_id: u32, +) -> Result> { + let mut files = Vec::new(); + + for schema in schemas { + let prefix = data_dir_for_schema_chunk(schema, chunk_id); + files.extend(storage.list_files_recursive(&prefix).await?); + } + + files.sort(); + Ok(files) +} diff --git a/src/cli/src/data/export_v2/data.rs b/src/cli/src/data/export_v2/data.rs new file mode 100644 index 0000000000..fe2ec7c051 --- /dev/null +++ b/src/cli/src/data/export_v2/data.rs @@ -0,0 +1,440 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::secrets::{ExposeSecret, SecretString}; +use common_telemetry::info; +use object_store::util::{join_path, normalize_path}; +use snafu::ResultExt; +use url::Url; + +use crate::common::ObjectStoreConfig; +use crate::data::export_v2::error::{DatabaseSnafu, InvalidUriSnafu, Result, UrlParseSnafu}; +use crate::data::export_v2::manifest::{DataFormat, TimeRange}; +use crate::data::path::data_dir_for_schema_chunk; +use crate::data::snapshot_storage::StorageScheme; +use crate::data::sql::{escape_sql_identifier, escape_sql_literal}; +use crate::database::DatabaseClient; + +pub(super) struct CopyOptions { + pub(super) format: DataFormat, + pub(super) time_range: TimeRange, + pub(super) parallelism: usize, +} + +pub(super) struct CopyTarget { + pub(super) location: String, + pub(super) connection: String, + secrets: Vec>, +} + +impl CopyTarget { + fn mask_sql(&self, sql: &str) -> String { + mask_secrets(sql, &self.secrets) + } +} + +pub(super) fn build_copy_target( + snapshot_uri: &str, + storage: &ObjectStoreConfig, + schema: &str, + chunk_id: u32, +) -> Result { + let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?; + let scheme = StorageScheme::from_uri(snapshot_uri)?; + let suffix = data_dir_for_schema_chunk(schema, chunk_id); + + match scheme { + StorageScheme::File => { + let root = url.to_file_path().map_err(|_| { + InvalidUriSnafu { + uri: snapshot_uri, + reason: "file:// URI must use an absolute path like file:///tmp/backup", + } + .build() + })?; + let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix)); + Ok(CopyTarget { + location, + connection: String::new(), + secrets: Vec::new(), + }) + } + StorageScheme::S3 => { + let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; + let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix)); + let (connection, secrets) = build_s3_connection(storage); + Ok(CopyTarget { + location, + connection, + secrets, + }) + } + StorageScheme::Oss => { + let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; + let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix)); + let (connection, secrets) = build_oss_connection(storage); + Ok(CopyTarget { + location, + connection, + secrets, + }) + } + StorageScheme::Gcs => { + let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; + let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix)); + let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?; + Ok(CopyTarget { + location, + connection, + secrets, + }) + } + StorageScheme::Azblob => { + let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?; + let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix)); + let (connection, secrets) = build_azblob_connection(storage); + Ok(CopyTarget { + location, + connection, + secrets, + }) + } + } +} + +pub(super) async fn execute_copy_database( + database_client: &DatabaseClient, + catalog: &str, + schema: &str, + target: &CopyTarget, + options: &CopyOptions, +) -> Result<()> { + let with_options = build_with_options(options); + let sql = format!( + r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#, + escape_sql_identifier(catalog), + escape_sql_identifier(schema), + escape_sql_literal(&target.location), + with_options, + target.connection + ); + let safe_sql = target.mask_sql(&sql); + info!("Executing sql: {}", safe_sql); + database_client + .sql_in_public(&sql) + .await + .context(DatabaseSnafu)?; + Ok(()) +} + +fn build_with_options(options: &CopyOptions) -> String { + let mut parts = vec![format!("FORMAT='{}'", options.format)]; + if let Some(start) = options.time_range.start { + parts.push(format!( + "START_TIME='{}'", + escape_sql_literal(&start.to_rfc3339()) + )); + } + if let Some(end) = options.time_range.end { + parts.push(format!( + "END_TIME='{}'", + escape_sql_literal(&end.to_rfc3339()) + )); + } + parts.push(format!("PARALLELISM={}", options.parallelism)); + parts.join(", ") +} + +fn extract_bucket_root(url: &Url, snapshot_uri: &str) -> Result<(String, String)> { + let bucket = url.host_str().unwrap_or("").to_string(); + if bucket.is_empty() { + return InvalidUriSnafu { + uri: snapshot_uri, + reason: "URI must include bucket/container in host", + } + .fail(); + } + let root = url + .path() + .trim_start_matches('/') + .trim_end_matches('/') + .to_string(); + Ok((bucket, root)) +} + +fn join_root(root: &str, suffix: &str) -> String { + join_path(root, suffix).trim_start_matches('/').to_string() +} + +fn build_s3_connection(storage: &ObjectStoreConfig) -> (String, Vec>) { + let access_key_id = expose_optional_secret(&storage.s3.s3_access_key_id); + let secret_access_key = expose_optional_secret(&storage.s3.s3_secret_access_key); + + let mut options = Vec::new(); + if let Some(access_key_id) = &access_key_id { + options.push(format!( + "ACCESS_KEY_ID='{}'", + escape_sql_literal(access_key_id) + )); + } + if let Some(secret_access_key) = &secret_access_key { + options.push(format!( + "SECRET_ACCESS_KEY='{}'", + escape_sql_literal(secret_access_key) + )); + } + if let Some(region) = &storage.s3.s3_region { + options.push(format!("REGION='{}'", escape_sql_literal(region))); + } + if let Some(endpoint) = &storage.s3.s3_endpoint { + options.push(format!("ENDPOINT='{}'", escape_sql_literal(endpoint))); + } + + let secrets = vec![access_key_id, secret_access_key]; + let connection = if options.is_empty() { + String::new() + } else { + format!(" CONNECTION ({})", options.join(", ")) + }; + (connection, secrets) +} + +fn build_oss_connection(storage: &ObjectStoreConfig) -> (String, Vec>) { + let access_key_id = expose_optional_secret(&storage.oss.oss_access_key_id); + let access_key_secret = expose_optional_secret(&storage.oss.oss_access_key_secret); + + let mut options = Vec::new(); + if let Some(access_key_id) = &access_key_id { + options.push(format!( + "ACCESS_KEY_ID='{}'", + escape_sql_literal(access_key_id) + )); + } + if let Some(access_key_secret) = &access_key_secret { + options.push(format!( + "ACCESS_KEY_SECRET='{}'", + escape_sql_literal(access_key_secret) + )); + } + if !storage.oss.oss_endpoint.is_empty() { + options.push(format!( + "ENDPOINT='{}'", + escape_sql_literal(&storage.oss.oss_endpoint) + )); + } + + let secrets = vec![access_key_id, access_key_secret]; + let connection = if options.is_empty() { + String::new() + } else { + format!(" CONNECTION ({})", options.join(", ")) + }; + (connection, secrets) +} + +fn build_gcs_connection( + storage: &ObjectStoreConfig, + snapshot_uri: &str, +) -> Result<(String, Vec>)> { + let credential_path = expose_optional_secret(&storage.gcs.gcs_credential_path); + let credential = expose_optional_secret(&storage.gcs.gcs_credential); + + if credential.is_none() && credential_path.is_some() { + return InvalidUriSnafu { + uri: snapshot_uri, + reason: "gcs_credential_path is not supported for server-side COPY; provide gcs_credential or rely on server-side ADC", + } + .fail(); + } + + let mut options = Vec::new(); + if let Some(credential) = &credential { + options.push(format!("CREDENTIAL='{}'", escape_sql_literal(credential))); + } + if !storage.gcs.gcs_scope.is_empty() { + options.push(format!( + "SCOPE='{}'", + escape_sql_literal(&storage.gcs.gcs_scope) + )); + } + if !storage.gcs.gcs_endpoint.is_empty() { + options.push(format!( + "ENDPOINT='{}'", + escape_sql_literal(&storage.gcs.gcs_endpoint) + )); + } + + let connection = if options.is_empty() { + String::new() + } else { + format!(" CONNECTION ({})", options.join(", ")) + }; + let secrets = vec![credential_path, credential]; + Ok((connection, secrets)) +} + +fn build_azblob_connection(storage: &ObjectStoreConfig) -> (String, Vec>) { + let account_name = expose_optional_secret(&storage.azblob.azblob_account_name); + let account_key = expose_optional_secret(&storage.azblob.azblob_account_key); + let sas_token = storage.azblob.azblob_sas_token.clone(); + + let mut options = Vec::new(); + if let Some(account_name) = &account_name { + options.push(format!( + "ACCOUNT_NAME='{}'", + escape_sql_literal(account_name) + )); + } + if let Some(account_key) = &account_key { + options.push(format!("ACCOUNT_KEY='{}'", escape_sql_literal(account_key))); + } + if let Some(sas_token) = &sas_token { + options.push(format!("SAS_TOKEN='{}'", escape_sql_literal(sas_token))); + } + if !storage.azblob.azblob_endpoint.is_empty() { + options.push(format!( + "ENDPOINT='{}'", + escape_sql_literal(&storage.azblob.azblob_endpoint) + )); + } + + let secrets = vec![account_name, account_key, sas_token]; + let connection = if options.is_empty() { + String::new() + } else { + format!(" CONNECTION ({})", options.join(", ")) + }; + (connection, secrets) +} + +fn expose_optional_secret(secret: &Option) -> Option { + secret.as_ref().map(|s| s.expose_secret().to_owned()) +} + +fn mask_secrets(sql: &str, secrets: &[Option]) -> String { + let mut masked = sql.to_string(); + for secret in secrets { + if let Some(secret) = secret + && !secret.is_empty() + { + masked = masked.replace(secret, "[REDACTED]"); + } + } + masked +} + +#[cfg(test)] +mod tests { + use common_base::secrets::SecretString; + + use super::*; + use crate::common::{PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection}; + + #[test] + fn test_build_oss_connection_includes_endpoint() { + let storage = ObjectStoreConfig { + oss: PrefixedOssConnection { + oss_endpoint: "https://oss.example.com".to_string(), + oss_access_key_id: Some(SecretString::from("key_id".to_string())), + oss_access_key_secret: Some(SecretString::from("key_secret".to_string())), + ..Default::default() + }, + ..Default::default() + }; + + let (connection, _) = build_oss_connection(&storage); + assert!(connection.contains("ENDPOINT='https://oss.example.com'")); + } + + #[test] + fn test_build_gcs_connection_uses_scope_and_inline_credential() { + let storage = ObjectStoreConfig { + gcs: PrefixedGcsConnection { + gcs_scope: "scope-a".to_string(), + gcs_endpoint: "https://storage.googleapis.com".to_string(), + gcs_credential: Some(SecretString::from("credential-json".to_string())), + ..Default::default() + }, + ..Default::default() + }; + + let (connection, _) = build_gcs_connection(&storage, "gcs://bucket/root").unwrap(); + assert!(connection.contains("CREDENTIAL='credential-json'")); + assert!(connection.contains("SCOPE='scope-a'")); + assert!(connection.contains("ENDPOINT='https://storage.googleapis.com'")); + assert!(!connection.contains("CREDENTIAL_PATH")); + } + + #[test] + fn test_build_gcs_connection_rejects_credential_path_only() { + let storage = ObjectStoreConfig { + gcs: PrefixedGcsConnection { + gcs_scope: "scope-a".to_string(), + gcs_credential_path: Some(SecretString::from("/tmp/creds.json".to_string())), + ..Default::default() + }, + ..Default::default() + }; + + let error = build_gcs_connection(&storage, "gcs://bucket/root") + .expect_err("credential_path-only should be rejected") + .to_string(); + assert!(error.contains("gcs_credential_path is not supported")); + } + + #[test] + fn test_build_azblob_connection_includes_endpoint() { + let storage = ObjectStoreConfig { + azblob: PrefixedAzblobConnection { + azblob_account_name: Some(SecretString::from("account".to_string())), + azblob_account_key: Some(SecretString::from("key".to_string())), + azblob_endpoint: "https://blob.example.com".to_string(), + ..Default::default() + }, + ..Default::default() + }; + + let (connection, _) = build_azblob_connection(&storage); + assert!(connection.contains("ENDPOINT='https://blob.example.com'")); + } + + #[test] + fn test_build_azblob_connection_redacts_sas_token() { + let storage = ObjectStoreConfig { + azblob: PrefixedAzblobConnection { + azblob_account_name: Some(SecretString::from("account".to_string())), + azblob_account_key: Some(SecretString::from("key".to_string())), + azblob_sas_token: Some("sig=secret-token".to_string()), + ..Default::default() + }, + ..Default::default() + }; + + let (connection, secrets) = build_azblob_connection(&storage); + let masked = mask_secrets(&connection, &secrets); + + assert!(connection.contains("SAS_TOKEN='sig=secret-token'")); + assert!(masked.contains("SAS_TOKEN='[REDACTED]'")); + assert!(!masked.contains("sig=secret-token")); + } + + #[test] + fn test_build_copy_target_decodes_file_uri_path() { + let storage = ObjectStoreConfig::default(); + let target = build_copy_target("file:///tmp/my%20backup", &storage, "public", 7) + .expect("file:// copy target should be built"); + + assert_eq!(target.location, "/tmp/my backup/data/public/7/"); + } +} diff --git a/src/cli/src/data/export_v2/error.rs b/src/cli/src/data/export_v2/error.rs index 2db71d5326..ec860fecfa 100644 --- a/src/cli/src/data/export_v2/error.rs +++ b/src/cli/src/data/export_v2/error.rs @@ -72,17 +72,55 @@ pub enum Error { }, #[snafu(display( - "Cannot resume schema-only snapshot with data export. Use --force to recreate." + "Cannot resume snapshot with a different schema_only mode (existing: {}, requested: {}). Use --force to recreate.", + existing_schema_only, + requested_schema_only ))] - CannotResumeSchemaOnly { + SchemaOnlyModeMismatch { + existing_schema_only: bool, + requested_schema_only: bool, #[snafu(implicit)] location: Location, }, #[snafu(display( - "Data export is not implemented yet. Use --schema-only to create a schema snapshot." + "Cannot resume snapshot with different {} (existing: {}, requested: {}). Use --force to recreate.", + field, + existing, + requested ))] - DataExportNotImplemented { + ResumeConfigMismatch { + field: String, + existing: String, + requested: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to parse time: invalid format: {}", input))] + TimeParseInvalidFormat { + input: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to parse time: end_time is before start_time"))] + TimeParseEndBeforeStart { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "chunk_time_window requires both --start-time and --end-time to be specified" + ))] + ChunkTimeWindowRequiresBounds { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("--schema-only cannot be used with data export arguments: {}", args))] + SchemaOnlyArgsNotAllowed { + args: String, #[snafu(implicit)] location: Location, }, @@ -154,9 +192,13 @@ impl ErrorExt for Error { match self { Error::InvalidUri { .. } | Error::UnsupportedScheme { .. } - | Error::CannotResumeSchemaOnly { .. } - | Error::DataExportNotImplemented { .. } - | Error::ManifestVersionMismatch { .. } => StatusCode::InvalidArguments, + | Error::SchemaOnlyModeMismatch { .. } + | Error::ResumeConfigMismatch { .. } + | Error::ManifestVersionMismatch { .. } + | Error::SchemaOnlyArgsNotAllowed { .. } => StatusCode::InvalidArguments, + Error::TimeParseInvalidFormat { .. } + | Error::TimeParseEndBeforeStart { .. } + | Error::ChunkTimeWindowRequiresBounds { .. } => StatusCode::InvalidArguments, Error::StorageOperation { .. } | Error::ManifestParse { .. } diff --git a/src/cli/src/data/export_v2/manifest.rs b/src/cli/src/data/export_v2/manifest.rs index 0ebf753fa4..918288bb51 100644 --- a/src/cli/src/data/export_v2/manifest.rs +++ b/src/cli/src/data/export_v2/manifest.rs @@ -14,12 +14,19 @@ //! Manifest data structures for Export/Import V2. +use std::time::Duration; use std::{fmt, str}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use uuid::Uuid; +use crate::data::export_v2::chunker::generate_chunks; +use crate::data::export_v2::error::{ + ChunkTimeWindowRequiresBoundsSnafu, Result as ExportResult, TimeParseEndBeforeStartSnafu, + TimeParseInvalidFormatSnafu, +}; + /// Current manifest format version. pub const MANIFEST_VERSION: u32 = 1; @@ -55,6 +62,31 @@ impl TimeRange { pub fn is_unbounded(&self) -> bool { self.start.is_none() && self.end.is_none() } + + /// Returns true if both bounds are specified. + pub fn is_bounded(&self) -> bool { + self.start.is_some() && self.end.is_some() + } + + /// Parses a time range from optional RFC3339 strings. + pub fn parse(start: Option<&str>, end: Option<&str>) -> ExportResult { + let start = start.map(parse_time).transpose()?; + let end = end.map(parse_time).transpose()?; + + if let (Some(start), Some(end)) = (start, end) + && end < start + { + return TimeParseEndBeforeStartSnafu.fail(); + } + + Ok(Self::new(start, end)) + } +} + +fn parse_time(input: &str) -> ExportResult> { + DateTime::parse_from_rfc3339(input) + .map(|dt| dt.with_timezone(&Utc)) + .map_err(|_| TimeParseInvalidFormatSnafu { input }.build()) } impl Default for TimeRange { @@ -74,6 +106,8 @@ pub enum ChunkStatus { InProgress, /// Chunk export completed successfully. Completed, + /// Chunk had no data to export. + Skipped, /// Chunk export failed. Failed, } @@ -111,6 +145,13 @@ impl ChunkMeta { } } + /// Creates a skipped chunk with the given id and time range. + pub fn skipped(id: u32, time_range: TimeRange) -> Self { + let mut chunk = Self::new(id, time_range); + chunk.mark_skipped(); + chunk + } + /// Marks this chunk as in progress. pub fn mark_in_progress(&mut self) { self.status = ChunkStatus::InProgress; @@ -125,6 +166,14 @@ impl ChunkMeta { self.error = None; } + /// Marks this chunk as skipped because no data files were produced. + pub fn mark_skipped(&mut self) { + self.status = ChunkStatus::Skipped; + self.files.clear(); + self.checksum = None; + self.error = None; + } + /// Marks this chunk as failed with the given error message. pub fn mark_failed(&mut self, error: String) { self.status = ChunkStatus::Failed; @@ -210,6 +259,35 @@ pub struct Manifest { } impl Manifest { + pub fn new_for_export( + catalog: String, + schemas: Vec, + schema_only: bool, + time_range: TimeRange, + format: DataFormat, + chunk_time_window: Option, + ) -> ExportResult { + if chunk_time_window.is_some() && !time_range.is_bounded() { + return ChunkTimeWindowRequiresBoundsSnafu.fail(); + } + + let mut manifest = if schema_only { + Self::new_schema_only(catalog, schemas) + } else { + Self::new_full(catalog, schemas, time_range, format) + }; + + if !schema_only { + manifest.chunks = match chunk_time_window { + Some(window) => generate_chunks(&manifest.time_range, window), + None => generate_single_chunk(&manifest.time_range), + }; + manifest.touch(); + } + + Ok(manifest) + } + /// Creates a new manifest for schema-only export. pub fn new_schema_only(catalog: String, schemas: Vec) -> Self { let now = Utc::now(); @@ -258,7 +336,7 @@ impl Manifest { && self .chunks .iter() - .all(|c| c.status == ChunkStatus::Completed)) + .all(|c| matches!(c.status, ChunkStatus::Completed | ChunkStatus::Skipped))) } /// Returns the number of pending chunks. @@ -285,6 +363,14 @@ impl Manifest { .count() } + /// Returns the number of skipped chunks. + pub fn skipped_count(&self) -> usize { + self.chunks + .iter() + .filter(|c| c.status == ChunkStatus::Skipped) + .count() + } + /// Returns the number of failed chunks. pub fn failed_count(&self) -> usize { self.chunks @@ -313,8 +399,24 @@ impl Manifest { } } +fn generate_single_chunk(time_range: &TimeRange) -> Vec { + if let (Some(start), Some(end)) = (time_range.start, time_range.end) { + if start == end { + return vec![ChunkMeta::skipped(1, time_range.clone())]; + } + if start > end { + return Vec::new(); + } + } + vec![ChunkMeta::new(1, time_range.clone())] +} + #[cfg(test)] mod tests { + use std::time::Duration; + + use chrono::{TimeZone, Utc}; + use super::*; #[test] @@ -338,6 +440,26 @@ mod tests { assert!(manifest.is_complete()); } + #[test] + fn test_generate_single_chunk_zero_width_range_is_skipped() { + let ts = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let chunks = generate_single_chunk(&TimeRange::new(Some(ts), Some(ts))); + + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].status, ChunkStatus::Skipped); + assert_eq!(chunks[0].time_range.start, Some(ts)); + assert_eq!(chunks[0].time_range.end, Some(ts)); + } + + #[test] + fn test_generate_single_chunk_invalid_range_is_empty() { + let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap(); + let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let chunks = generate_single_chunk(&TimeRange::new(Some(start), Some(end))); + + assert!(chunks.is_empty()); + } + #[test] fn test_manifest_full() { let manifest = Manifest::new_full( @@ -377,5 +499,71 @@ mod tests { ); assert_eq!(chunk.status, ChunkStatus::Completed); assert_eq!(chunk.files.len(), 1); + + chunk.mark_skipped(); + assert_eq!(chunk.status, ChunkStatus::Skipped); + assert!(chunk.files.is_empty()); + } + + #[test] + fn test_manifest_is_complete_when_chunks_are_completed_or_skipped() { + let mut manifest = Manifest::new_full( + "greptime".to_string(), + vec!["public".to_string()], + TimeRange::unbounded(), + DataFormat::Parquet, + ); + manifest.add_chunk(ChunkMeta::new(1, TimeRange::unbounded())); + manifest.add_chunk(ChunkMeta::new(2, TimeRange::unbounded())); + + manifest.update_chunk(1, |chunk| { + chunk.mark_completed(vec!["a.parquet".to_string()], None) + }); + manifest.update_chunk(2, |chunk| chunk.mark_skipped()); + + assert!(manifest.is_complete()); + assert_eq!(manifest.completed_count(), 1); + assert_eq!(manifest.skipped_count(), 1); + } + + #[test] + fn test_manifest_chunk_time_window_none_single_chunk() { + let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let end = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap(); + let range = TimeRange::new(Some(start), Some(end)); + let manifest = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string()], + false, + range.clone(), + DataFormat::Parquet, + None, + ) + .unwrap(); + + assert_eq!(manifest.chunks.len(), 1); + assert_eq!(manifest.chunks[0].time_range, range); + } + + #[test] + fn test_time_range_parse_requires_order() { + let result = TimeRange::parse(Some("2025-01-02T00:00:00Z"), Some("2025-01-01T00:00:00Z")); + assert!(result.is_err()); + } + + #[test] + fn test_new_for_export_with_chunk_window_requires_bounded_range() { + let result = Manifest::new_for_export( + "greptime".to_string(), + vec!["public".to_string()], + false, + TimeRange::new( + None, + Some(Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()), + ), + DataFormat::Parquet, + Some(Duration::from_secs(3600)), + ); + assert!(result.is_err()); } } diff --git a/src/cli/src/data/import_v2/command.rs b/src/cli/src/data/import_v2/command.rs index 544763d92b..6a9d440071 100644 --- a/src/cli/src/data/import_v2/command.rs +++ b/src/cli/src/data/import_v2/command.rs @@ -27,7 +27,8 @@ use crate::Tool; use crate::common::ObjectStoreConfig; use crate::data::export_v2::manifest::MANIFEST_VERSION; use crate::data::import_v2::error::{ - ManifestVersionMismatchSnafu, Result, SchemaNotInSnapshotSnafu, SnapshotStorageSnafu, + FullSnapshotImportNotSupportedSnafu, ManifestVersionMismatchSnafu, Result, + SchemaNotInSnapshotSnafu, SnapshotStorageSnafu, }; use crate::data::import_v2::executor::{DdlExecutor, DdlStatement}; use crate::data::path::ddl_path_for_schema; @@ -58,10 +59,6 @@ pub struct ImportV2Command { #[clap(long)] dry_run: bool, - /// Concurrency level (for future use). - #[clap(long, default_value = "1")] - parallelism: usize, - /// Basic authentication (user:password). #[clap(long)] auth_basic: Option, @@ -121,7 +118,6 @@ impl ImportV2Command { Ok(Box::new(Import { schemas, dry_run: self.dry_run, - _parallelism: self.parallelism, storage: Box::new(storage), database_client, })) @@ -132,7 +128,6 @@ impl ImportV2Command { pub struct Import { schemas: Option>, dry_run: bool, - _parallelism: usize, storage: Box, database_client: DatabaseClient, } @@ -169,6 +164,13 @@ impl Import { info!("Snapshot contains {} schema(s)", manifest.schemas.len()); + if !manifest.schema_only && !manifest.chunks.is_empty() { + return FullSnapshotImportNotSupportedSnafu { + chunk_count: manifest.chunks.len(), + } + .fail(); + } + // 2. Determine schemas to import let schemas_to_import = match &self.schemas { Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?, @@ -203,14 +205,6 @@ impl Import { ddl_statements.len() ); - // 6. Data import would happen here for non-schema-only snapshots (M2/M3) - if !manifest.schema_only && !manifest.chunks.is_empty() { - info!( - "Data import not yet implemented (M3). {} chunks pending.", - manifest.chunks.len() - ); - } - Ok(()) } @@ -403,7 +397,114 @@ fn canonicalize_schema_filter( #[cfg(test)] mod tests { + use std::time::Duration; + + use async_trait::async_trait; + use super::*; + use crate::Tool; + use crate::data::export_v2::manifest::{ChunkMeta, DataFormat, Manifest, TimeRange}; + use crate::data::export_v2::schema::SchemaSnapshot; + use crate::data::snapshot_storage::SnapshotStorage; + use crate::database::DatabaseClient; + + struct StubStorage { + manifest: Manifest, + } + + #[async_trait] + impl SnapshotStorage for StubStorage { + async fn exists(&self) -> crate::data::export_v2::error::Result { + Ok(true) + } + + async fn read_manifest(&self) -> crate::data::export_v2::error::Result { + Ok(self.manifest.clone()) + } + + async fn write_manifest( + &self, + _manifest: &Manifest, + ) -> crate::data::export_v2::error::Result<()> { + unimplemented!("not needed in import_v2::command tests") + } + + async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result { + unimplemented!("not needed in import_v2::command tests") + } + + async fn write_text( + &self, + _path: &str, + _content: &str, + ) -> crate::data::export_v2::error::Result<()> { + unimplemented!("not needed in import_v2::command tests") + } + + async fn write_schema( + &self, + _snapshot: &SchemaSnapshot, + ) -> crate::data::export_v2::error::Result<()> { + unimplemented!("not needed in import_v2::command tests") + } + + async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> { + unimplemented!("not needed in import_v2::command tests") + } + + async fn list_files_recursive( + &self, + _prefix: &str, + ) -> crate::data::export_v2::error::Result> { + unimplemented!("not needed in import_v2::command tests") + } + + async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> { + unimplemented!("not needed in import_v2::command tests") + } + } + + fn test_database_client() -> DatabaseClient { + DatabaseClient::new( + "127.0.0.1:4000".to_string(), + "greptime".to_string(), + None, + Duration::from_secs(1), + None, + false, + ) + } + + #[tokio::test] + async fn test_import_rejects_full_snapshot_before_schema_execution() { + let mut manifest = Manifest::new_full( + "greptime".to_string(), + vec!["public".to_string()], + TimeRange::unbounded(), + DataFormat::Parquet, + ); + manifest + .chunks + .push(ChunkMeta::new(1, TimeRange::unbounded())); + + let import = Import { + schemas: None, + dry_run: false, + storage: Box::new(StubStorage { manifest }), + database_client: test_database_client(), + }; + + let error = import + .do_work() + .await + .expect_err("full snapshot import should fail"); + + assert!( + error + .to_string() + .contains("Importing data from full snapshots is not implemented yet") + ); + } #[test] fn test_parse_ddl_statements() { diff --git a/src/cli/src/data/import_v2/error.rs b/src/cli/src/data/import_v2/error.rs index 5ae3db1583..169f11c0fa 100644 --- a/src/cli/src/data/import_v2/error.rs +++ b/src/cli/src/data/import_v2/error.rs @@ -45,6 +45,16 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Importing data from full snapshots is not implemented yet (snapshot has {} chunk(s))", + chunk_count + ))] + FullSnapshotImportNotSupported { + chunk_count: usize, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Snapshot storage error"))] SnapshotStorage { #[snafu(source)] @@ -67,10 +77,10 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::SnapshotNotFound { .. } | Error::SchemaNotInSnapshot { .. } => { - StatusCode::InvalidArguments - } - Error::ManifestVersionMismatch { .. } => StatusCode::InvalidArguments, + Error::SnapshotNotFound { .. } + | Error::SchemaNotInSnapshot { .. } + | Error::ManifestVersionMismatch { .. } + | Error::FullSnapshotImportNotSupported { .. } => StatusCode::InvalidArguments, Error::Database { error, .. } => error.status_code(), Error::SnapshotStorage { error, .. } => error.status_code(), } diff --git a/src/cli/src/data/path.rs b/src/cli/src/data/path.rs index 2e0f5d3f1a..2df81f62c8 100644 --- a/src/cli/src/data/path.rs +++ b/src/cli/src/data/path.rs @@ -25,6 +25,10 @@ pub(crate) fn ddl_path_for_schema(schema: &str) -> String { ) } +pub(crate) fn data_dir_for_schema_chunk(schema: &str, chunk_id: u32) -> String { + format!("data/{}/{}/", encode_path_segment(schema), chunk_id) +} + pub(crate) fn encode_path_segment(value: &str) -> String { let mut encoded = String::with_capacity(value.len()); for byte in value.bytes() { @@ -73,4 +77,13 @@ mod tests { "schema/ddl/%2E%2E%2Fevil.sql" ); } + + #[test] + fn test_data_dir_for_schema_chunk_encodes_schema_segment() { + assert_eq!(data_dir_for_schema_chunk("public", 1), "data/public/1/"); + assert_eq!( + data_dir_for_schema_chunk("../evil", 7), + "data/%2E%2E%2Fevil/7/" + ); + } } diff --git a/src/cli/src/data/snapshot_storage.rs b/src/cli/src/data/snapshot_storage.rs index 50c8734a67..6bc71153df 100644 --- a/src/cli/src/data/snapshot_storage.rs +++ b/src/cli/src/data/snapshot_storage.rs @@ -18,9 +18,12 @@ //! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem). use async_trait::async_trait; +use futures::TryStreamExt; use object_store::services::{Azblob, Fs, Gcs, Oss, S3}; use object_store::util::{with_instrument_layers, with_retry_layers}; -use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection}; +use object_store::{ + AzblobConnection, ErrorKind, GcsConnection, ObjectStore, OssConnection, S3Connection, +}; use snafu::ResultExt; use url::Url; @@ -139,14 +142,14 @@ fn extract_file_path_from_uri(uri: &str) -> Result { .fail(), _ => url .to_file_path() - .map(|path| path.to_string_lossy().into_owned()) .map_err(|_| { InvalidUriSnafu { uri, - reason: "file:// URI must use a valid absolute filesystem path", + reason: "file:// URI must use an absolute path like file:///tmp/backup", } .build() - }), + }) + .map(|path| path.to_string_lossy().into_owned()), } } @@ -184,6 +187,12 @@ pub trait SnapshotStorage: Send + Sync { /// Reads a text file from a relative path under the snapshot root. async fn read_text(&self, path: &str) -> Result; + /// Creates a directory-like prefix under the snapshot root when needed by the backend. + async fn create_dir_all(&self, path: &str) -> Result<()>; + + /// Lists files recursively under a relative prefix. + async fn list_files_recursive(&self, prefix: &str) -> Result>; + /// Deletes the entire snapshot (for --force). async fn delete_snapshot(&self) -> Result<()>; } @@ -443,6 +452,38 @@ impl SnapshotStorage for OpenDalStorage { String::from_utf8(data).context(TextDecodeSnafu) } + async fn create_dir_all(&self, path: &str) -> Result<()> { + self.object_store + .create_dir(path) + .await + .context(StorageOperationSnafu { + operation: format!("create dir {}", path), + }) + } + + async fn list_files_recursive(&self, prefix: &str) -> Result> { + let mut lister = match self.object_store.lister_with(prefix).recursive(true).await { + Ok(lister) => lister, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()), + Err(error) => { + return Err(error).context(StorageOperationSnafu { + operation: format!("list {}", prefix), + }); + } + }; + + let mut files = Vec::new(); + while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu { + operation: format!("list {}", prefix), + })? { + if entry.metadata().is_dir() { + continue; + } + files.push(entry.path().to_string()); + } + Ok(files) + } + async fn delete_snapshot(&self) -> Result<()> { self.object_store .remove_all("/") @@ -533,6 +574,14 @@ mod tests { extract_file_path_from_uri("file://localhost/tmp/backup").unwrap(), "/tmp/backup" ); + assert_eq!( + extract_file_path_from_uri("file:///tmp/my%20backup").unwrap(), + "/tmp/my backup" + ); + assert_eq!( + extract_file_path_from_uri("file://localhost/tmp/my%20backup").unwrap(), + "/tmp/my backup" + ); } #[test]