feat: implement export-v2 chunked data export flow (#7841)

* feat: implement export-v2 chunked data export flow

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by codex comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by gemini comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: clippy

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: handle empty export ranges consistently

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: validate resume config

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: file-uri paths

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: check args on schema-only mode

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-03-31 15:43:40 -07:00
committed by GitHub
parent 358524566a
commit ab10696657
11 changed files with 1577 additions and 69 deletions

View File

@@ -30,7 +30,7 @@
//! --to file:///tmp/snapshot \
//! --schema-only
//!
//! # Export with time range (M2)
//! # Export with time range
//! greptime cli data export-v2 create \
//! --addr 127.0.0.1:4000 \
//! --to s3://bucket/snapshots/prod-20250101 \
@@ -38,7 +38,10 @@
//! --end-time 2025-01-31T23:59:59Z
//! ```
mod chunker;
mod command;
mod coordinator;
mod data;
pub mod error;
pub mod extractor;
pub mod manifest;

View File

@@ -0,0 +1,103 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use chrono::Duration as ChronoDuration;
use crate::data::export_v2::manifest::{ChunkMeta, TimeRange};
pub fn generate_chunks(time_range: &TimeRange, window: Duration) -> Vec<ChunkMeta> {
let (Some(start), Some(end)) = (time_range.start, time_range.end) else {
return vec![ChunkMeta::new(1, time_range.clone())];
};
if start == end {
return vec![ChunkMeta::skipped(1, time_range.clone())];
}
if start > end {
return Vec::new();
}
let window = match ChronoDuration::from_std(window) {
Ok(window) if window > ChronoDuration::zero() => window,
_ => return vec![ChunkMeta::new(1, time_range.clone())],
};
let mut chunks = Vec::new();
let mut cursor = start;
let mut id = 1;
while cursor < end {
let next = cursor
.checked_add_signed(window)
.map_or(end, |timestamp| timestamp.min(end));
chunks.push(ChunkMeta::new(id, TimeRange::new(Some(cursor), Some(next))));
id += 1;
cursor = next;
}
chunks
}
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use super::*;
use crate::data::export_v2::manifest::ChunkStatus;
#[test]
fn test_generate_chunks_unbounded() {
let range = TimeRange::unbounded();
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].time_range, range);
}
#[test]
fn test_generate_chunks_split() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(end));
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert_eq!(chunks.len(), 3);
assert_eq!(chunks[0].time_range.start, Some(start));
assert_eq!(
chunks[2].time_range.end,
Some(Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap())
);
}
#[test]
fn test_generate_chunks_empty_range() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(start));
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].status, ChunkStatus::Skipped);
assert_eq!(chunks[0].time_range, range);
}
#[test]
fn test_generate_chunks_invalid_range_is_empty() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(end));
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert!(chunks.is_empty());
}
}

View File

@@ -26,12 +26,16 @@ use snafu::{OptionExt, ResultExt};
use crate::Tool;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::coordinator::export_data;
use crate::data::export_v2::error::{
CannotResumeSchemaOnlySnafu, DataExportNotImplementedSnafu, DatabaseSnafu, EmptyResultSnafu,
ManifestVersionMismatchSnafu, Result, UnexpectedValueTypeSnafu,
ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu,
ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu,
SchemaOnlyModeMismatchSnafu, UnexpectedValueTypeSnafu,
};
use crate::data::export_v2::extractor::SchemaExtractor;
use crate::data::export_v2::manifest::{DataFormat, MANIFEST_VERSION, Manifest};
use crate::data::export_v2::manifest::{
ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange,
};
use crate::data::path::ddl_path_for_schema;
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
@@ -84,6 +88,11 @@ pub struct ExportCreateCommand {
#[clap(long)]
end_time: Option<String>,
/// Chunk time window (e.g., 1h, 6h, 1d, 7d).
/// Requires both --start-time and --end-time when specified.
#[clap(long, value_parser = humantime::parse_duration)]
chunk_time_window: Option<Duration>,
/// Data format: parquet, csv, json.
#[clap(long, value_enum, default_value = "parquet")]
format: DataFormat,
@@ -92,7 +101,7 @@ pub struct ExportCreateCommand {
#[clap(long)]
force: bool,
/// Concurrency level (for future use).
/// Parallelism for COPY DATABASE execution (server-side, per schema per chunk).
#[clap(long, default_value = "1")]
parallelism: usize,
@@ -127,11 +136,38 @@ impl ExportCreateCommand {
// Validate URI format
validate_uri(&self.to).map_err(BoxedError::new)?;
if !self.schema_only {
return DataExportNotImplementedSnafu
let time_range = TimeRange::parse(self.start_time.as_deref(), self.end_time.as_deref())
.map_err(BoxedError::new)?;
if self.chunk_time_window.is_some() && !time_range.is_bounded() {
return ChunkTimeWindowRequiresBoundsSnafu
.fail()
.map_err(BoxedError::new);
}
if self.schema_only {
let mut invalid_args = Vec::new();
if self.start_time.is_some() {
invalid_args.push("--start-time");
}
if self.end_time.is_some() {
invalid_args.push("--end-time");
}
if self.chunk_time_window.is_some() {
invalid_args.push("--chunk-time-window");
}
if self.format != DataFormat::Parquet {
invalid_args.push("--format");
}
if self.parallelism != 1 {
invalid_args.push("--parallelism");
}
if !invalid_args.is_empty() {
return SchemaOnlyArgsNotAllowedSnafu {
args: invalid_args.join(", "),
}
.fail()
.map_err(BoxedError::new);
}
}
// Parse schemas (empty vec means all schemas)
let schemas = if self.schemas.is_empty() {
@@ -155,12 +191,18 @@ impl ExportCreateCommand {
);
Ok(Box::new(ExportCreate {
catalog: self.catalog.clone(),
schemas,
schema_only: self.schema_only,
_format: self.format,
force: self.force,
_parallelism: self.parallelism,
config: ExportConfig {
catalog: self.catalog.clone(),
schemas,
schema_only: self.schema_only,
format: self.format,
force: self.force,
time_range,
chunk_time_window: self.chunk_time_window,
parallelism: self.parallelism,
snapshot_uri: self.to.clone(),
storage_config: self.storage.clone(),
},
storage: Box::new(storage),
database_client,
}))
@@ -169,14 +211,22 @@ impl ExportCreateCommand {
/// Export tool implementation.
pub struct ExportCreate {
config: ExportConfig,
storage: Box<dyn SnapshotStorage>,
database_client: DatabaseClient,
}
struct ExportConfig {
catalog: String,
schemas: Option<Vec<String>>,
schema_only: bool,
_format: DataFormat,
format: DataFormat,
force: bool,
_parallelism: usize,
storage: Box<dyn SnapshotStorage>,
database_client: DatabaseClient,
time_range: TimeRange,
chunk_time_window: Option<Duration>,
parallelism: usize,
snapshot_uri: String,
storage_config: ObjectStoreConfig,
}
#[async_trait]
@@ -192,12 +242,12 @@ impl ExportCreate {
let exists = self.storage.exists().await?;
if exists {
if self.force {
if self.config.force {
info!("Deleting existing snapshot (--force)");
self.storage.delete_snapshot().await?;
} else {
// Resume mode - read existing manifest
let manifest = self.storage.read_manifest().await?;
let mut manifest = self.storage.read_manifest().await?;
// Check version compatibility
if manifest.version != MANIFEST_VERSION {
@@ -208,10 +258,7 @@ impl ExportCreate {
.fail();
}
// Cannot resume schema-only with data export
if manifest.schema_only && !self.schema_only {
return CannotResumeSchemaOnlySnafu.fail();
}
validate_resume_config(&manifest, &self.config)?;
info!(
"Resuming existing snapshot: {} (completed: {}/{} chunks)",
@@ -220,22 +267,31 @@ impl ExportCreate {
manifest.chunks.len()
);
// For M1, we only handle schema-only exports
// M2 will add chunk resume logic
if manifest.is_complete() {
info!("Snapshot is already complete");
return Ok(());
}
// TODO: Resume data export in M2
info!("Data export resume not yet implemented (M2)");
if manifest.schema_only {
return Ok(());
}
export_data(
self.storage.as_ref(),
&self.database_client,
&self.config.snapshot_uri,
&self.config.storage_config,
&mut manifest,
self.config.parallelism,
)
.await?;
return Ok(());
}
}
// 2. Get schema list
let extractor = SchemaExtractor::new(&self.database_client, &self.catalog);
let schema_snapshot = extractor.extract(self.schemas.as_deref()).await?;
let extractor = SchemaExtractor::new(&self.database_client, &self.config.catalog);
let schema_snapshot = extractor.extract(self.config.schemas.as_deref()).await?;
let schema_names: Vec<String> = schema_snapshot
.schemas
@@ -245,7 +301,14 @@ impl ExportCreate {
info!("Exporting schemas: {:?}", schema_names);
// 3. Create manifest
let manifest = Manifest::new_schema_only(self.catalog.clone(), schema_names.clone());
let mut manifest = Manifest::new_for_export(
self.config.catalog.clone(),
schema_names.clone(),
self.config.schema_only,
self.config.time_range.clone(),
self.config.format,
self.config.chunk_time_window,
)?;
// 4. Write schema files
self.storage.write_schema(&schema_snapshot).await?;
@@ -259,14 +322,28 @@ impl ExportCreate {
info!("Exported DDL for schema {} to {}", schema, ddl_path);
}
// 6. Write manifest last.
// 6. Write manifest after schema artifacts and before any data export.
//
// The manifest is the snapshot commit point: only write it after the schema
// index and all DDL files are durable, so a crash cannot leave a "valid"
// snapshot that is missing required schema artifacts.
// snapshot that is missing required schema artifacts. For full exports we
// still need the manifest before data copy starts, because chunk resume is
// tracked by updating this manifest in place.
self.storage.write_manifest(&manifest).await?;
info!("Snapshot created: {}", manifest.snapshot_id);
if !self.config.schema_only {
export_data(
self.storage.as_ref(),
&self.database_client,
&self.config.snapshot_uri,
&self.config.storage_config,
&mut manifest,
self.config.parallelism,
)
.await?;
}
Ok(())
}
@@ -321,7 +398,7 @@ impl ExportCreate {
"SELECT table_name, table_type FROM information_schema.tables \
WHERE table_catalog = '{}' AND table_schema = '{}' \
AND (table_type = 'BASE TABLE' OR table_type = 'VIEW')",
escape_sql_literal(&self.catalog),
escape_sql_literal(&self.config.catalog),
escape_sql_literal(schema)
);
let records: Option<Vec<Vec<Value>>> = self
@@ -359,7 +436,7 @@ impl ExportCreate {
let sql = format!(
"SELECT DISTINCT table_name FROM information_schema.columns \
WHERE table_catalog = '{}' AND table_schema = '{}' AND column_name = '__tsid'",
escape_sql_literal(&self.catalog),
escape_sql_literal(&self.config.catalog),
escape_sql_literal(schema)
);
let records: Option<Vec<Vec<Value>>> = self
@@ -392,14 +469,14 @@ impl ExportCreate {
Some(table) => format!(
r#"SHOW CREATE {} "{}"."{}"."{}""#,
show_type,
escape_sql_identifier(&self.catalog),
escape_sql_identifier(&self.config.catalog),
escape_sql_identifier(schema),
escape_sql_identifier(table)
),
None => format!(
r#"SHOW CREATE {} "{}"."{}""#,
show_type,
escape_sql_identifier(&self.catalog),
escape_sql_identifier(&self.config.catalog),
escape_sql_identifier(schema)
),
};
@@ -442,8 +519,118 @@ fn build_schema_ddl(
ddl
}
fn validate_resume_config(manifest: &Manifest, config: &ExportConfig) -> Result<()> {
if manifest.schema_only != config.schema_only {
return SchemaOnlyModeMismatchSnafu {
existing_schema_only: manifest.schema_only,
requested_schema_only: config.schema_only,
}
.fail();
}
if manifest.catalog != config.catalog {
return ResumeConfigMismatchSnafu {
field: "catalog",
existing: manifest.catalog.clone(),
requested: config.catalog.clone(),
}
.fail();
}
// If no schema filter is provided on resume, inherit the existing snapshot
// selection instead of reinterpreting the request as "all schemas".
if let Some(requested_schemas) = &config.schemas
&& !schema_selection_matches(&manifest.schemas, requested_schemas)
{
return ResumeConfigMismatchSnafu {
field: "schemas",
existing: format_schema_selection(&manifest.schemas),
requested: format_schema_selection(requested_schemas),
}
.fail();
}
if manifest.time_range != config.time_range {
return ResumeConfigMismatchSnafu {
field: "time_range",
existing: format!("{:?}", manifest.time_range),
requested: format!("{:?}", config.time_range),
}
.fail();
}
if manifest.format != config.format {
return ResumeConfigMismatchSnafu {
field: "format",
existing: manifest.format.to_string(),
requested: config.format.to_string(),
}
.fail();
}
let expected_plan = Manifest::new_for_export(
manifest.catalog.clone(),
manifest.schemas.clone(),
config.schema_only,
config.time_range.clone(),
config.format,
config.chunk_time_window,
)?;
if !chunk_plan_matches(manifest, &expected_plan) {
return ResumeConfigMismatchSnafu {
field: "chunk plan",
existing: format_chunk_plan(&manifest.chunks),
requested: format_chunk_plan(&expected_plan.chunks),
}
.fail();
}
Ok(())
}
fn schema_selection_matches(existing: &[String], requested: &[String]) -> bool {
canonical_schema_selection(existing) == canonical_schema_selection(requested)
}
fn canonical_schema_selection(schemas: &[String]) -> Vec<String> {
let mut canonicalized = Vec::new();
let mut seen = HashSet::new();
for schema in schemas {
let normalized = schema.to_ascii_lowercase();
if seen.insert(normalized.clone()) {
canonicalized.push(normalized);
}
}
canonicalized.sort();
canonicalized
}
fn format_schema_selection(schemas: &[String]) -> String {
format!("[{}]", schemas.join(", "))
}
fn chunk_plan_matches(existing: &Manifest, expected: &Manifest) -> bool {
existing.chunks.len() == expected.chunks.len()
&& existing
.chunks
.iter()
.zip(&expected.chunks)
.all(|(left, right)| left.id == right.id && left.time_range == right.time_range)
}
fn format_chunk_plan(chunks: &[ChunkMeta]) -> String {
let items = chunks
.iter()
.map(|chunk| format!("#{}:{:?}", chunk.id, chunk.time_range))
.collect::<Vec<_>>();
format!("[{}]", items.join(", "))
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use clap::Parser;
use super::*;
@@ -478,19 +665,225 @@ mod tests {
}
#[tokio::test]
async fn test_build_rejects_non_schema_only_export() {
async fn test_build_rejects_chunk_window_without_bounds() {
let cmd = ExportCreateCommand::parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
"--chunk-time-window",
"1h",
]);
let result = cmd.build().await;
assert!(result.is_err());
let error = result.err().unwrap().to_string();
assert!(error.contains("Data export is not implemented yet"));
assert!(error.contains("chunk_time_window requires both --start-time and --end-time"));
}
#[tokio::test]
async fn test_build_rejects_data_export_args_in_schema_only_mode() {
let cmd = ExportCreateCommand::parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
"--schema-only",
"--start-time",
"2024-01-01T00:00:00Z",
"--end-time",
"2024-01-02T00:00:00Z",
"--chunk-time-window",
"1h",
"--format",
"csv",
"--parallelism",
"2",
]);
let error = cmd.build().await.err().unwrap().to_string();
assert!(error.contains("--schema-only cannot be used with data export arguments"));
assert!(error.contains("--start-time"));
assert!(error.contains("--end-time"));
assert!(error.contains("--chunk-time-window"));
assert!(error.contains("--format"));
assert!(error.contains("--parallelism"));
}
#[test]
fn test_schema_only_mode_mismatch_error_message() {
let error = crate::data::export_v2::error::SchemaOnlyModeMismatchSnafu {
existing_schema_only: false,
requested_schema_only: true,
}
.build()
.to_string();
assert!(error.contains("existing: false"));
assert!(error.contains("requested: true"));
}
#[test]
fn test_validate_resume_config_rejects_catalog_mismatch() {
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "other".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("catalog"));
}
#[test]
fn test_validate_resume_config_accepts_schema_selection_with_different_case_and_order() {
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string(), "analytics".to_string()],
false,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: Some(vec![
"ANALYTICS".to_string(),
"PUBLIC".to_string(),
"public".to_string(),
]),
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
assert!(validate_resume_config(&manifest, &config).is_ok());
}
#[test]
fn test_validate_resume_config_rejects_chunk_plan_mismatch() {
let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap();
let time_range = TimeRange::new(Some(start), Some(end));
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
time_range.clone(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range,
chunk_time_window: Some(Duration::from_secs(3600)),
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("chunk plan"));
}
#[test]
fn test_validate_resume_config_rejects_format_mismatch() {
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Csv,
force: false,
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("format"));
}
#[test]
fn test_validate_resume_config_rejects_time_range_mismatch() {
let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::new(Some(start), Some(end)),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range: TimeRange::new(Some(start), Some(start)),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("time_range"));
}
}

View File

@@ -0,0 +1,166 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::info;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::data::{CopyOptions, build_copy_target, execute_copy_database};
use crate::data::export_v2::error::Result;
use crate::data::export_v2::manifest::{ChunkStatus, DataFormat, Manifest, TimeRange};
use crate::data::path::data_dir_for_schema_chunk;
use crate::data::snapshot_storage::{SnapshotStorage, StorageScheme};
use crate::database::DatabaseClient;
struct ExportContext<'a> {
storage: &'a dyn SnapshotStorage,
database_client: &'a DatabaseClient,
snapshot_uri: &'a str,
storage_config: &'a ObjectStoreConfig,
catalog: &'a str,
schemas: &'a [String],
format: DataFormat,
parallelism: usize,
}
pub async fn export_data(
storage: &dyn SnapshotStorage,
database_client: &DatabaseClient,
snapshot_uri: &str,
storage_config: &ObjectStoreConfig,
manifest: &mut Manifest,
parallelism: usize,
) -> Result<()> {
if manifest.chunks.is_empty() {
return Ok(());
}
for idx in 0..manifest.chunks.len() {
if matches!(
manifest.chunks[idx].status,
ChunkStatus::Completed | ChunkStatus::Skipped
) {
continue;
}
let (chunk_id, time_range) = mark_chunk_in_progress(manifest, idx);
manifest.touch();
storage.write_manifest(manifest).await?;
let context = ExportContext {
storage,
database_client,
snapshot_uri,
storage_config,
catalog: &manifest.catalog,
schemas: &manifest.schemas,
format: manifest.format,
parallelism,
};
let export_result = export_chunk(&context, chunk_id, time_range).await;
let result = match export_result {
Ok(files) => {
mark_chunk_completed(manifest, idx, files);
Ok(())
}
Err(err) => {
mark_chunk_failed(manifest, idx, err.to_string());
Err(err)
}
};
manifest.touch();
storage.write_manifest(manifest).await?;
result?;
}
Ok(())
}
fn mark_chunk_in_progress(manifest: &mut Manifest, idx: usize) -> (u32, TimeRange) {
let chunk = &mut manifest.chunks[idx];
chunk.mark_in_progress();
(chunk.id, chunk.time_range.clone())
}
fn mark_chunk_completed(manifest: &mut Manifest, idx: usize, files: Vec<String>) {
let chunk = &mut manifest.chunks[idx];
if files.is_empty() {
chunk.mark_skipped();
} else {
chunk.mark_completed(files, None);
}
}
fn mark_chunk_failed(manifest: &mut Manifest, idx: usize, error: String) {
let chunk = &mut manifest.chunks[idx];
chunk.mark_failed(error);
}
async fn export_chunk(
context: &ExportContext<'_>,
chunk_id: u32,
time_range: TimeRange,
) -> Result<Vec<String>> {
let scheme = StorageScheme::from_uri(context.snapshot_uri)?;
let needs_dir = matches!(scheme, StorageScheme::File);
let copy_options = CopyOptions {
format: context.format,
time_range,
parallelism: context.parallelism,
};
for schema in context.schemas {
let prefix = data_dir_for_schema_chunk(schema, chunk_id);
if needs_dir {
context.storage.create_dir_all(&prefix).await?;
}
let target = build_copy_target(
context.snapshot_uri,
context.storage_config,
schema,
chunk_id,
)?;
execute_copy_database(
context.database_client,
context.catalog,
schema,
&target,
&copy_options,
)
.await?;
}
let files = list_chunk_files(context.storage, context.schemas, chunk_id).await?;
info!("Collected {} files for chunk {}", files.len(), chunk_id);
Ok(files)
}
async fn list_chunk_files(
storage: &dyn SnapshotStorage,
schemas: &[String],
chunk_id: u32,
) -> Result<Vec<String>> {
let mut files = Vec::new();
for schema in schemas {
let prefix = data_dir_for_schema_chunk(schema, chunk_id);
files.extend(storage.list_files_recursive(&prefix).await?);
}
files.sort();
Ok(files)
}

View File

@@ -0,0 +1,440 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::{ExposeSecret, SecretString};
use common_telemetry::info;
use object_store::util::{join_path, normalize_path};
use snafu::ResultExt;
use url::Url;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::error::{DatabaseSnafu, InvalidUriSnafu, Result, UrlParseSnafu};
use crate::data::export_v2::manifest::{DataFormat, TimeRange};
use crate::data::path::data_dir_for_schema_chunk;
use crate::data::snapshot_storage::StorageScheme;
use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
use crate::database::DatabaseClient;
pub(super) struct CopyOptions {
pub(super) format: DataFormat,
pub(super) time_range: TimeRange,
pub(super) parallelism: usize,
}
pub(super) struct CopyTarget {
pub(super) location: String,
pub(super) connection: String,
secrets: Vec<Option<String>>,
}
impl CopyTarget {
fn mask_sql(&self, sql: &str) -> String {
mask_secrets(sql, &self.secrets)
}
}
pub(super) fn build_copy_target(
snapshot_uri: &str,
storage: &ObjectStoreConfig,
schema: &str,
chunk_id: u32,
) -> Result<CopyTarget> {
let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?;
let scheme = StorageScheme::from_uri(snapshot_uri)?;
let suffix = data_dir_for_schema_chunk(schema, chunk_id);
match scheme {
StorageScheme::File => {
let root = url.to_file_path().map_err(|_| {
InvalidUriSnafu {
uri: snapshot_uri,
reason: "file:// URI must use an absolute path like file:///tmp/backup",
}
.build()
})?;
let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix));
Ok(CopyTarget {
location,
connection: String::new(),
secrets: Vec::new(),
})
}
StorageScheme::S3 => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_s3_connection(storage);
Ok(CopyTarget {
location,
connection,
secrets,
})
}
StorageScheme::Oss => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_oss_connection(storage);
Ok(CopyTarget {
location,
connection,
secrets,
})
}
StorageScheme::Gcs => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?;
Ok(CopyTarget {
location,
connection,
secrets,
})
}
StorageScheme::Azblob => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_azblob_connection(storage);
Ok(CopyTarget {
location,
connection,
secrets,
})
}
}
}
pub(super) async fn execute_copy_database(
database_client: &DatabaseClient,
catalog: &str,
schema: &str,
target: &CopyTarget,
options: &CopyOptions,
) -> Result<()> {
let with_options = build_with_options(options);
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
escape_sql_identifier(catalog),
escape_sql_identifier(schema),
escape_sql_literal(&target.location),
with_options,
target.connection
);
let safe_sql = target.mask_sql(&sql);
info!("Executing sql: {}", safe_sql);
database_client
.sql_in_public(&sql)
.await
.context(DatabaseSnafu)?;
Ok(())
}
fn build_with_options(options: &CopyOptions) -> String {
let mut parts = vec![format!("FORMAT='{}'", options.format)];
if let Some(start) = options.time_range.start {
parts.push(format!(
"START_TIME='{}'",
escape_sql_literal(&start.to_rfc3339())
));
}
if let Some(end) = options.time_range.end {
parts.push(format!(
"END_TIME='{}'",
escape_sql_literal(&end.to_rfc3339())
));
}
parts.push(format!("PARALLELISM={}", options.parallelism));
parts.join(", ")
}
fn extract_bucket_root(url: &Url, snapshot_uri: &str) -> Result<(String, String)> {
let bucket = url.host_str().unwrap_or("").to_string();
if bucket.is_empty() {
return InvalidUriSnafu {
uri: snapshot_uri,
reason: "URI must include bucket/container in host",
}
.fail();
}
let root = url
.path()
.trim_start_matches('/')
.trim_end_matches('/')
.to_string();
Ok((bucket, root))
}
fn join_root(root: &str, suffix: &str) -> String {
join_path(root, suffix).trim_start_matches('/').to_string()
}
fn build_s3_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
let access_key_id = expose_optional_secret(&storage.s3.s3_access_key_id);
let secret_access_key = expose_optional_secret(&storage.s3.s3_secret_access_key);
let mut options = Vec::new();
if let Some(access_key_id) = &access_key_id {
options.push(format!(
"ACCESS_KEY_ID='{}'",
escape_sql_literal(access_key_id)
));
}
if let Some(secret_access_key) = &secret_access_key {
options.push(format!(
"SECRET_ACCESS_KEY='{}'",
escape_sql_literal(secret_access_key)
));
}
if let Some(region) = &storage.s3.s3_region {
options.push(format!("REGION='{}'", escape_sql_literal(region)));
}
if let Some(endpoint) = &storage.s3.s3_endpoint {
options.push(format!("ENDPOINT='{}'", escape_sql_literal(endpoint)));
}
let secrets = vec![access_key_id, secret_access_key];
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
(connection, secrets)
}
fn build_oss_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
let access_key_id = expose_optional_secret(&storage.oss.oss_access_key_id);
let access_key_secret = expose_optional_secret(&storage.oss.oss_access_key_secret);
let mut options = Vec::new();
if let Some(access_key_id) = &access_key_id {
options.push(format!(
"ACCESS_KEY_ID='{}'",
escape_sql_literal(access_key_id)
));
}
if let Some(access_key_secret) = &access_key_secret {
options.push(format!(
"ACCESS_KEY_SECRET='{}'",
escape_sql_literal(access_key_secret)
));
}
if !storage.oss.oss_endpoint.is_empty() {
options.push(format!(
"ENDPOINT='{}'",
escape_sql_literal(&storage.oss.oss_endpoint)
));
}
let secrets = vec![access_key_id, access_key_secret];
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
(connection, secrets)
}
fn build_gcs_connection(
storage: &ObjectStoreConfig,
snapshot_uri: &str,
) -> Result<(String, Vec<Option<String>>)> {
let credential_path = expose_optional_secret(&storage.gcs.gcs_credential_path);
let credential = expose_optional_secret(&storage.gcs.gcs_credential);
if credential.is_none() && credential_path.is_some() {
return InvalidUriSnafu {
uri: snapshot_uri,
reason: "gcs_credential_path is not supported for server-side COPY; provide gcs_credential or rely on server-side ADC",
}
.fail();
}
let mut options = Vec::new();
if let Some(credential) = &credential {
options.push(format!("CREDENTIAL='{}'", escape_sql_literal(credential)));
}
if !storage.gcs.gcs_scope.is_empty() {
options.push(format!(
"SCOPE='{}'",
escape_sql_literal(&storage.gcs.gcs_scope)
));
}
if !storage.gcs.gcs_endpoint.is_empty() {
options.push(format!(
"ENDPOINT='{}'",
escape_sql_literal(&storage.gcs.gcs_endpoint)
));
}
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
let secrets = vec![credential_path, credential];
Ok((connection, secrets))
}
fn build_azblob_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
let account_name = expose_optional_secret(&storage.azblob.azblob_account_name);
let account_key = expose_optional_secret(&storage.azblob.azblob_account_key);
let sas_token = storage.azblob.azblob_sas_token.clone();
let mut options = Vec::new();
if let Some(account_name) = &account_name {
options.push(format!(
"ACCOUNT_NAME='{}'",
escape_sql_literal(account_name)
));
}
if let Some(account_key) = &account_key {
options.push(format!("ACCOUNT_KEY='{}'", escape_sql_literal(account_key)));
}
if let Some(sas_token) = &sas_token {
options.push(format!("SAS_TOKEN='{}'", escape_sql_literal(sas_token)));
}
if !storage.azblob.azblob_endpoint.is_empty() {
options.push(format!(
"ENDPOINT='{}'",
escape_sql_literal(&storage.azblob.azblob_endpoint)
));
}
let secrets = vec![account_name, account_key, sas_token];
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
(connection, secrets)
}
fn expose_optional_secret(secret: &Option<SecretString>) -> Option<String> {
secret.as_ref().map(|s| s.expose_secret().to_owned())
}
fn mask_secrets(sql: &str, secrets: &[Option<String>]) -> String {
let mut masked = sql.to_string();
for secret in secrets {
if let Some(secret) = secret
&& !secret.is_empty()
{
masked = masked.replace(secret, "[REDACTED]");
}
}
masked
}
#[cfg(test)]
mod tests {
use common_base::secrets::SecretString;
use super::*;
use crate::common::{PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection};
#[test]
fn test_build_oss_connection_includes_endpoint() {
let storage = ObjectStoreConfig {
oss: PrefixedOssConnection {
oss_endpoint: "https://oss.example.com".to_string(),
oss_access_key_id: Some(SecretString::from("key_id".to_string())),
oss_access_key_secret: Some(SecretString::from("key_secret".to_string())),
..Default::default()
},
..Default::default()
};
let (connection, _) = build_oss_connection(&storage);
assert!(connection.contains("ENDPOINT='https://oss.example.com'"));
}
#[test]
fn test_build_gcs_connection_uses_scope_and_inline_credential() {
let storage = ObjectStoreConfig {
gcs: PrefixedGcsConnection {
gcs_scope: "scope-a".to_string(),
gcs_endpoint: "https://storage.googleapis.com".to_string(),
gcs_credential: Some(SecretString::from("credential-json".to_string())),
..Default::default()
},
..Default::default()
};
let (connection, _) = build_gcs_connection(&storage, "gcs://bucket/root").unwrap();
assert!(connection.contains("CREDENTIAL='credential-json'"));
assert!(connection.contains("SCOPE='scope-a'"));
assert!(connection.contains("ENDPOINT='https://storage.googleapis.com'"));
assert!(!connection.contains("CREDENTIAL_PATH"));
}
#[test]
fn test_build_gcs_connection_rejects_credential_path_only() {
let storage = ObjectStoreConfig {
gcs: PrefixedGcsConnection {
gcs_scope: "scope-a".to_string(),
gcs_credential_path: Some(SecretString::from("/tmp/creds.json".to_string())),
..Default::default()
},
..Default::default()
};
let error = build_gcs_connection(&storage, "gcs://bucket/root")
.expect_err("credential_path-only should be rejected")
.to_string();
assert!(error.contains("gcs_credential_path is not supported"));
}
#[test]
fn test_build_azblob_connection_includes_endpoint() {
let storage = ObjectStoreConfig {
azblob: PrefixedAzblobConnection {
azblob_account_name: Some(SecretString::from("account".to_string())),
azblob_account_key: Some(SecretString::from("key".to_string())),
azblob_endpoint: "https://blob.example.com".to_string(),
..Default::default()
},
..Default::default()
};
let (connection, _) = build_azblob_connection(&storage);
assert!(connection.contains("ENDPOINT='https://blob.example.com'"));
}
#[test]
fn test_build_azblob_connection_redacts_sas_token() {
let storage = ObjectStoreConfig {
azblob: PrefixedAzblobConnection {
azblob_account_name: Some(SecretString::from("account".to_string())),
azblob_account_key: Some(SecretString::from("key".to_string())),
azblob_sas_token: Some("sig=secret-token".to_string()),
..Default::default()
},
..Default::default()
};
let (connection, secrets) = build_azblob_connection(&storage);
let masked = mask_secrets(&connection, &secrets);
assert!(connection.contains("SAS_TOKEN='sig=secret-token'"));
assert!(masked.contains("SAS_TOKEN='[REDACTED]'"));
assert!(!masked.contains("sig=secret-token"));
}
#[test]
fn test_build_copy_target_decodes_file_uri_path() {
let storage = ObjectStoreConfig::default();
let target = build_copy_target("file:///tmp/my%20backup", &storage, "public", 7)
.expect("file:// copy target should be built");
assert_eq!(target.location, "/tmp/my backup/data/public/7/");
}
}

View File

@@ -72,17 +72,55 @@ pub enum Error {
},
#[snafu(display(
"Cannot resume schema-only snapshot with data export. Use --force to recreate."
"Cannot resume snapshot with a different schema_only mode (existing: {}, requested: {}). Use --force to recreate.",
existing_schema_only,
requested_schema_only
))]
CannotResumeSchemaOnly {
SchemaOnlyModeMismatch {
existing_schema_only: bool,
requested_schema_only: bool,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data export is not implemented yet. Use --schema-only to create a schema snapshot."
"Cannot resume snapshot with different {} (existing: {}, requested: {}). Use --force to recreate.",
field,
existing,
requested
))]
DataExportNotImplemented {
ResumeConfigMismatch {
field: String,
existing: String,
requested: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse time: invalid format: {}", input))]
TimeParseInvalidFormat {
input: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse time: end_time is before start_time"))]
TimeParseEndBeforeStart {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"chunk_time_window requires both --start-time and --end-time to be specified"
))]
ChunkTimeWindowRequiresBounds {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("--schema-only cannot be used with data export arguments: {}", args))]
SchemaOnlyArgsNotAllowed {
args: String,
#[snafu(implicit)]
location: Location,
},
@@ -154,9 +192,13 @@ impl ErrorExt for Error {
match self {
Error::InvalidUri { .. }
| Error::UnsupportedScheme { .. }
| Error::CannotResumeSchemaOnly { .. }
| Error::DataExportNotImplemented { .. }
| Error::ManifestVersionMismatch { .. } => StatusCode::InvalidArguments,
| Error::SchemaOnlyModeMismatch { .. }
| Error::ResumeConfigMismatch { .. }
| Error::ManifestVersionMismatch { .. }
| Error::SchemaOnlyArgsNotAllowed { .. } => StatusCode::InvalidArguments,
Error::TimeParseInvalidFormat { .. }
| Error::TimeParseEndBeforeStart { .. }
| Error::ChunkTimeWindowRequiresBounds { .. } => StatusCode::InvalidArguments,
Error::StorageOperation { .. }
| Error::ManifestParse { .. }

View File

@@ -14,12 +14,19 @@
//! Manifest data structures for Export/Import V2.
use std::time::Duration;
use std::{fmt, str};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::data::export_v2::chunker::generate_chunks;
use crate::data::export_v2::error::{
ChunkTimeWindowRequiresBoundsSnafu, Result as ExportResult, TimeParseEndBeforeStartSnafu,
TimeParseInvalidFormatSnafu,
};
/// Current manifest format version.
pub const MANIFEST_VERSION: u32 = 1;
@@ -55,6 +62,31 @@ impl TimeRange {
pub fn is_unbounded(&self) -> bool {
self.start.is_none() && self.end.is_none()
}
/// Returns true if both bounds are specified.
pub fn is_bounded(&self) -> bool {
self.start.is_some() && self.end.is_some()
}
/// Parses a time range from optional RFC3339 strings.
pub fn parse(start: Option<&str>, end: Option<&str>) -> ExportResult<Self> {
let start = start.map(parse_time).transpose()?;
let end = end.map(parse_time).transpose()?;
if let (Some(start), Some(end)) = (start, end)
&& end < start
{
return TimeParseEndBeforeStartSnafu.fail();
}
Ok(Self::new(start, end))
}
}
fn parse_time(input: &str) -> ExportResult<DateTime<Utc>> {
DateTime::parse_from_rfc3339(input)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|_| TimeParseInvalidFormatSnafu { input }.build())
}
impl Default for TimeRange {
@@ -74,6 +106,8 @@ pub enum ChunkStatus {
InProgress,
/// Chunk export completed successfully.
Completed,
/// Chunk had no data to export.
Skipped,
/// Chunk export failed.
Failed,
}
@@ -111,6 +145,13 @@ impl ChunkMeta {
}
}
/// Creates a skipped chunk with the given id and time range.
pub fn skipped(id: u32, time_range: TimeRange) -> Self {
let mut chunk = Self::new(id, time_range);
chunk.mark_skipped();
chunk
}
/// Marks this chunk as in progress.
pub fn mark_in_progress(&mut self) {
self.status = ChunkStatus::InProgress;
@@ -125,6 +166,14 @@ impl ChunkMeta {
self.error = None;
}
/// Marks this chunk as skipped because no data files were produced.
pub fn mark_skipped(&mut self) {
self.status = ChunkStatus::Skipped;
self.files.clear();
self.checksum = None;
self.error = None;
}
/// Marks this chunk as failed with the given error message.
pub fn mark_failed(&mut self, error: String) {
self.status = ChunkStatus::Failed;
@@ -210,6 +259,35 @@ pub struct Manifest {
}
impl Manifest {
pub fn new_for_export(
catalog: String,
schemas: Vec<String>,
schema_only: bool,
time_range: TimeRange,
format: DataFormat,
chunk_time_window: Option<Duration>,
) -> ExportResult<Self> {
if chunk_time_window.is_some() && !time_range.is_bounded() {
return ChunkTimeWindowRequiresBoundsSnafu.fail();
}
let mut manifest = if schema_only {
Self::new_schema_only(catalog, schemas)
} else {
Self::new_full(catalog, schemas, time_range, format)
};
if !schema_only {
manifest.chunks = match chunk_time_window {
Some(window) => generate_chunks(&manifest.time_range, window),
None => generate_single_chunk(&manifest.time_range),
};
manifest.touch();
}
Ok(manifest)
}
/// Creates a new manifest for schema-only export.
pub fn new_schema_only(catalog: String, schemas: Vec<String>) -> Self {
let now = Utc::now();
@@ -258,7 +336,7 @@ impl Manifest {
&& self
.chunks
.iter()
.all(|c| c.status == ChunkStatus::Completed))
.all(|c| matches!(c.status, ChunkStatus::Completed | ChunkStatus::Skipped)))
}
/// Returns the number of pending chunks.
@@ -285,6 +363,14 @@ impl Manifest {
.count()
}
/// Returns the number of skipped chunks.
pub fn skipped_count(&self) -> usize {
self.chunks
.iter()
.filter(|c| c.status == ChunkStatus::Skipped)
.count()
}
/// Returns the number of failed chunks.
pub fn failed_count(&self) -> usize {
self.chunks
@@ -313,8 +399,24 @@ impl Manifest {
}
}
fn generate_single_chunk(time_range: &TimeRange) -> Vec<ChunkMeta> {
if let (Some(start), Some(end)) = (time_range.start, time_range.end) {
if start == end {
return vec![ChunkMeta::skipped(1, time_range.clone())];
}
if start > end {
return Vec::new();
}
}
vec![ChunkMeta::new(1, time_range.clone())]
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use chrono::{TimeZone, Utc};
use super::*;
#[test]
@@ -338,6 +440,26 @@ mod tests {
assert!(manifest.is_complete());
}
#[test]
fn test_generate_single_chunk_zero_width_range_is_skipped() {
let ts = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let chunks = generate_single_chunk(&TimeRange::new(Some(ts), Some(ts)));
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].status, ChunkStatus::Skipped);
assert_eq!(chunks[0].time_range.start, Some(ts));
assert_eq!(chunks[0].time_range.end, Some(ts));
}
#[test]
fn test_generate_single_chunk_invalid_range_is_empty() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let chunks = generate_single_chunk(&TimeRange::new(Some(start), Some(end)));
assert!(chunks.is_empty());
}
#[test]
fn test_manifest_full() {
let manifest = Manifest::new_full(
@@ -377,5 +499,71 @@ mod tests {
);
assert_eq!(chunk.status, ChunkStatus::Completed);
assert_eq!(chunk.files.len(), 1);
chunk.mark_skipped();
assert_eq!(chunk.status, ChunkStatus::Skipped);
assert!(chunk.files.is_empty());
}
#[test]
fn test_manifest_is_complete_when_chunks_are_completed_or_skipped() {
let mut manifest = Manifest::new_full(
"greptime".to_string(),
vec!["public".to_string()],
TimeRange::unbounded(),
DataFormat::Parquet,
);
manifest.add_chunk(ChunkMeta::new(1, TimeRange::unbounded()));
manifest.add_chunk(ChunkMeta::new(2, TimeRange::unbounded()));
manifest.update_chunk(1, |chunk| {
chunk.mark_completed(vec!["a.parquet".to_string()], None)
});
manifest.update_chunk(2, |chunk| chunk.mark_skipped());
assert!(manifest.is_complete());
assert_eq!(manifest.completed_count(), 1);
assert_eq!(manifest.skipped_count(), 1);
}
#[test]
fn test_manifest_chunk_time_window_none_single_chunk() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(end));
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
range.clone(),
DataFormat::Parquet,
None,
)
.unwrap();
assert_eq!(manifest.chunks.len(), 1);
assert_eq!(manifest.chunks[0].time_range, range);
}
#[test]
fn test_time_range_parse_requires_order() {
let result = TimeRange::parse(Some("2025-01-02T00:00:00Z"), Some("2025-01-01T00:00:00Z"));
assert!(result.is_err());
}
#[test]
fn test_new_for_export_with_chunk_window_requires_bounded_range() {
let result = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::new(
None,
Some(Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
),
DataFormat::Parquet,
Some(Duration::from_secs(3600)),
);
assert!(result.is_err());
}
}

View File

@@ -27,7 +27,8 @@ use crate::Tool;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::manifest::MANIFEST_VERSION;
use crate::data::import_v2::error::{
ManifestVersionMismatchSnafu, Result, SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
FullSnapshotImportNotSupportedSnafu, ManifestVersionMismatchSnafu, Result,
SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
};
use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
use crate::data::path::ddl_path_for_schema;
@@ -58,10 +59,6 @@ pub struct ImportV2Command {
#[clap(long)]
dry_run: bool,
/// Concurrency level (for future use).
#[clap(long, default_value = "1")]
parallelism: usize,
/// Basic authentication (user:password).
#[clap(long)]
auth_basic: Option<String>,
@@ -121,7 +118,6 @@ impl ImportV2Command {
Ok(Box::new(Import {
schemas,
dry_run: self.dry_run,
_parallelism: self.parallelism,
storage: Box::new(storage),
database_client,
}))
@@ -132,7 +128,6 @@ impl ImportV2Command {
pub struct Import {
schemas: Option<Vec<String>>,
dry_run: bool,
_parallelism: usize,
storage: Box<dyn SnapshotStorage>,
database_client: DatabaseClient,
}
@@ -169,6 +164,13 @@ impl Import {
info!("Snapshot contains {} schema(s)", manifest.schemas.len());
if !manifest.schema_only && !manifest.chunks.is_empty() {
return FullSnapshotImportNotSupportedSnafu {
chunk_count: manifest.chunks.len(),
}
.fail();
}
// 2. Determine schemas to import
let schemas_to_import = match &self.schemas {
Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
@@ -203,14 +205,6 @@ impl Import {
ddl_statements.len()
);
// 6. Data import would happen here for non-schema-only snapshots (M2/M3)
if !manifest.schema_only && !manifest.chunks.is_empty() {
info!(
"Data import not yet implemented (M3). {} chunks pending.",
manifest.chunks.len()
);
}
Ok(())
}
@@ -403,7 +397,114 @@ fn canonicalize_schema_filter(
#[cfg(test)]
mod tests {
use std::time::Duration;
use async_trait::async_trait;
use super::*;
use crate::Tool;
use crate::data::export_v2::manifest::{ChunkMeta, DataFormat, Manifest, TimeRange};
use crate::data::export_v2::schema::SchemaSnapshot;
use crate::data::snapshot_storage::SnapshotStorage;
use crate::database::DatabaseClient;
struct StubStorage {
manifest: Manifest,
}
#[async_trait]
impl SnapshotStorage for StubStorage {
async fn exists(&self) -> crate::data::export_v2::error::Result<bool> {
Ok(true)
}
async fn read_manifest(&self) -> crate::data::export_v2::error::Result<Manifest> {
Ok(self.manifest.clone())
}
async fn write_manifest(
&self,
_manifest: &Manifest,
) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result<String> {
unimplemented!("not needed in import_v2::command tests")
}
async fn write_text(
&self,
_path: &str,
_content: &str,
) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn write_schema(
&self,
_snapshot: &SchemaSnapshot,
) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn list_files_recursive(
&self,
_prefix: &str,
) -> crate::data::export_v2::error::Result<Vec<String>> {
unimplemented!("not needed in import_v2::command tests")
}
async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
}
fn test_database_client() -> DatabaseClient {
DatabaseClient::new(
"127.0.0.1:4000".to_string(),
"greptime".to_string(),
None,
Duration::from_secs(1),
None,
false,
)
}
#[tokio::test]
async fn test_import_rejects_full_snapshot_before_schema_execution() {
let mut manifest = Manifest::new_full(
"greptime".to_string(),
vec!["public".to_string()],
TimeRange::unbounded(),
DataFormat::Parquet,
);
manifest
.chunks
.push(ChunkMeta::new(1, TimeRange::unbounded()));
let import = Import {
schemas: None,
dry_run: false,
storage: Box::new(StubStorage { manifest }),
database_client: test_database_client(),
};
let error = import
.do_work()
.await
.expect_err("full snapshot import should fail");
assert!(
error
.to_string()
.contains("Importing data from full snapshots is not implemented yet")
);
}
#[test]
fn test_parse_ddl_statements() {

View File

@@ -45,6 +45,16 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Importing data from full snapshots is not implemented yet (snapshot has {} chunk(s))",
chunk_count
))]
FullSnapshotImportNotSupported {
chunk_count: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Snapshot storage error"))]
SnapshotStorage {
#[snafu(source)]
@@ -67,10 +77,10 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::SnapshotNotFound { .. } | Error::SchemaNotInSnapshot { .. } => {
StatusCode::InvalidArguments
}
Error::ManifestVersionMismatch { .. } => StatusCode::InvalidArguments,
Error::SnapshotNotFound { .. }
| Error::SchemaNotInSnapshot { .. }
| Error::ManifestVersionMismatch { .. }
| Error::FullSnapshotImportNotSupported { .. } => StatusCode::InvalidArguments,
Error::Database { error, .. } => error.status_code(),
Error::SnapshotStorage { error, .. } => error.status_code(),
}

View File

@@ -25,6 +25,10 @@ pub(crate) fn ddl_path_for_schema(schema: &str) -> String {
)
}
pub(crate) fn data_dir_for_schema_chunk(schema: &str, chunk_id: u32) -> String {
format!("data/{}/{}/", encode_path_segment(schema), chunk_id)
}
pub(crate) fn encode_path_segment(value: &str) -> String {
let mut encoded = String::with_capacity(value.len());
for byte in value.bytes() {
@@ -73,4 +77,13 @@ mod tests {
"schema/ddl/%2E%2E%2Fevil.sql"
);
}
#[test]
fn test_data_dir_for_schema_chunk_encodes_schema_segment() {
assert_eq!(data_dir_for_schema_chunk("public", 1), "data/public/1/");
assert_eq!(
data_dir_for_schema_chunk("../evil", 7),
"data/%2E%2E%2Fevil/7/"
);
}
}

View File

@@ -18,9 +18,12 @@
//! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem).
use async_trait::async_trait;
use futures::TryStreamExt;
use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
use object_store::util::{with_instrument_layers, with_retry_layers};
use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
use object_store::{
AzblobConnection, ErrorKind, GcsConnection, ObjectStore, OssConnection, S3Connection,
};
use snafu::ResultExt;
use url::Url;
@@ -139,14 +142,14 @@ fn extract_file_path_from_uri(uri: &str) -> Result<String> {
.fail(),
_ => url
.to_file_path()
.map(|path| path.to_string_lossy().into_owned())
.map_err(|_| {
InvalidUriSnafu {
uri,
reason: "file:// URI must use a valid absolute filesystem path",
reason: "file:// URI must use an absolute path like file:///tmp/backup",
}
.build()
}),
})
.map(|path| path.to_string_lossy().into_owned()),
}
}
@@ -184,6 +187,12 @@ pub trait SnapshotStorage: Send + Sync {
/// Reads a text file from a relative path under the snapshot root.
async fn read_text(&self, path: &str) -> Result<String>;
/// Creates a directory-like prefix under the snapshot root when needed by the backend.
async fn create_dir_all(&self, path: &str) -> Result<()>;
/// Lists files recursively under a relative prefix.
async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>>;
/// Deletes the entire snapshot (for --force).
async fn delete_snapshot(&self) -> Result<()>;
}
@@ -443,6 +452,38 @@ impl SnapshotStorage for OpenDalStorage {
String::from_utf8(data).context(TextDecodeSnafu)
}
async fn create_dir_all(&self, path: &str) -> Result<()> {
self.object_store
.create_dir(path)
.await
.context(StorageOperationSnafu {
operation: format!("create dir {}", path),
})
}
async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>> {
let mut lister = match self.object_store.lister_with(prefix).recursive(true).await {
Ok(lister) => lister,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(error) => {
return Err(error).context(StorageOperationSnafu {
operation: format!("list {}", prefix),
});
}
};
let mut files = Vec::new();
while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu {
operation: format!("list {}", prefix),
})? {
if entry.metadata().is_dir() {
continue;
}
files.push(entry.path().to_string());
}
Ok(files)
}
async fn delete_snapshot(&self) -> Result<()> {
self.object_store
.remove_all("/")
@@ -533,6 +574,14 @@ mod tests {
extract_file_path_from_uri("file://localhost/tmp/backup").unwrap(),
"/tmp/backup"
);
assert_eq!(
extract_file_path_from_uri("file:///tmp/my%20backup").unwrap(),
"/tmp/my backup"
);
assert_eq!(
extract_file_path_from_uri("file://localhost/tmp/my%20backup").unwrap(),
"/tmp/my backup"
);
}
#[test]