From a049b68c2615d0abfac1c0efea3da21f822a6717 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 6 Nov 2025 15:33:33 +0800 Subject: [PATCH] feat: import backup data from local files (#7180) * feat: import backup data from local files Signed-off-by: Ruihang Xia * add unit tests Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/cli/src/data.rs | 2 + src/cli/src/data/export.rs | 26 +++++++-- src/cli/src/data/import.rs | 113 +++++++++++++++++++++++++++++++++++-- 3 files changed, 131 insertions(+), 10 deletions(-) diff --git a/src/cli/src/data.rs b/src/cli/src/data.rs index 86d2b43a98..be623f63a2 100644 --- a/src/cli/src/data.rs +++ b/src/cli/src/data.rs @@ -23,6 +23,8 @@ use crate::Tool; use crate::data::export::ExportCommand; use crate::data::import::ImportCommand; +pub(crate) const COPY_PATH_PLACEHOLDER: &str = ""; + /// Command for data operations including exporting data from and importing data into GreptimeDB. #[derive(Subcommand)] pub enum DataCommand { diff --git a/src/cli/src/data/export.rs b/src/cli/src/data/export.rs index 33ce5a7746..5ddc2a39bc 100644 --- a/src/cli/src/data/export.rs +++ b/src/cli/src/data/export.rs @@ -30,7 +30,7 @@ use snafu::{OptionExt, ResultExt}; use tokio::sync::Semaphore; use tokio::time::Instant; -use crate::data::default_database; +use crate::data::{COPY_PATH_PLACEHOLDER, default_database}; use crate::database::{DatabaseClient, parse_proxy_opts}; use crate::error::{ EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu, @@ -668,10 +668,26 @@ impl Export { ); // Create copy_from.sql file - let copy_database_from_sql = format!( - r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#, - export_self.catalog, schema, path, with_options_clone, connection_part - ); + let copy_database_from_sql = { + let command_without_connection = format!( + r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#, + export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone + ); + + if connection_part.is_empty() { + command_without_connection + } else { + let command_with_connection = format!( + r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#, + export_self.catalog, schema, path, with_options_clone, connection_part + ); + + format!( + "-- {}\n{}", + command_with_connection, command_without_connection + ) + } + }; let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql"); export_self diff --git a/src/cli/src/data/import.rs b/src/cli/src/data/import.rs index db2fd42e37..908f3d4c9f 100644 --- a/src/cli/src/data/import.rs +++ b/src/cli/src/data/import.rs @@ -21,13 +21,13 @@ use clap::{Parser, ValueEnum}; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_error::ext::BoxedError; use common_telemetry::{error, info, warn}; -use snafu::{OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt, ensure}; use tokio::sync::Semaphore; use tokio::time::Instant; -use crate::data::default_database; +use crate::data::{COPY_PATH_PLACEHOLDER, default_database}; use crate::database::{DatabaseClient, parse_proxy_opts}; -use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; +use crate::error::{Error, FileIoSnafu, InvalidArgumentsSnafu, Result, SchemaNotFoundSnafu}; use crate::{Tool, database}; #[derive(Debug, Default, Clone, ValueEnum)] @@ -148,12 +148,15 @@ impl Import { let _permit = semaphore_moved.acquire().await.unwrap(); let database_input_dir = self.catalog_path().join(&schema); let sql_file = database_input_dir.join(filename); - let sql = tokio::fs::read_to_string(sql_file) + let mut sql = tokio::fs::read_to_string(sql_file) .await .context(FileIoSnafu)?; - if sql.is_empty() { + if sql.trim().is_empty() { info!("Empty `{filename}` {database_input_dir:?}"); } else { + if filename == "copy_from.sql" { + sql = self.rewrite_copy_database_sql(&schema, &sql)?; + } let db = exec_db.unwrap_or(&schema); self.database_client.sql(&sql, db).await?; info!("Imported `{filename}` for database {schema}"); @@ -226,6 +229,57 @@ impl Import { } Ok(db_names) } + + fn rewrite_copy_database_sql(&self, schema: &str, sql: &str) -> Result { + let target_location = self.build_copy_database_location(schema); + let escaped_location = target_location.replace('\'', "''"); + + let mut first_stmt_checked = false; + for line in sql.lines() { + let trimmed = line.trim_start(); + if trimmed.is_empty() || trimmed.starts_with("--") { + continue; + } + + ensure!( + trimmed.starts_with("COPY DATABASE"), + InvalidArgumentsSnafu { + msg: "Expected COPY DATABASE statement at start of copy_from.sql" + } + ); + first_stmt_checked = true; + break; + } + + ensure!( + first_stmt_checked, + InvalidArgumentsSnafu { + msg: "COPY DATABASE statement not found in copy_from.sql" + } + ); + + ensure!( + sql.contains(COPY_PATH_PLACEHOLDER), + InvalidArgumentsSnafu { + msg: format!( + "Placeholder `{}` not found in COPY DATABASE statement", + COPY_PATH_PLACEHOLDER + ) + } + ); + + Ok(sql.replacen(COPY_PATH_PLACEHOLDER, &escaped_location, 1)) + } + + fn build_copy_database_location(&self, schema: &str) -> String { + let mut path = self.catalog_path(); + path.push(schema); + let mut path_str = path.to_string_lossy().into_owned(); + if !path_str.ends_with('/') { + path_str.push('/'); + } + path_str + } } #[async_trait] @@ -241,3 +295,52 @@ impl Tool for Import { } } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + fn build_import(input_dir: &str) -> Import { + Import { + catalog: "catalog".to_string(), + schema: None, + database_client: DatabaseClient::new( + "127.0.0.1:4000".to_string(), + "catalog".to_string(), + None, + Duration::from_secs(0), + None, + ), + input_dir: input_dir.to_string(), + parallelism: 1, + target: ImportTarget::Data, + } + } + + #[test] + fn rewrite_copy_database_sql_replaces_placeholder() { + let import = build_import("/tmp/export-path"); + let comment = "-- COPY DATABASE \"catalog\".\"schema\" FROM 's3://bucket/demo/' WITH (format = 'parquet') CONNECTION (region = 'us-west-2')"; + let sql = format!( + "{comment}\nCOPY DATABASE \"catalog\".\"schema\" FROM '{}' WITH (format = 'parquet');", + COPY_PATH_PLACEHOLDER + ); + + let rewritten = import.rewrite_copy_database_sql("schema", &sql).unwrap(); + let expected_location = import.build_copy_database_location("schema"); + let escaped = expected_location.replace('\'', "''"); + + assert!(rewritten.starts_with(comment)); + assert!(rewritten.contains(&format!("FROM '{escaped}'"))); + assert!(!rewritten.contains(COPY_PATH_PLACEHOLDER)); + } + + #[test] + fn rewrite_copy_database_sql_requires_placeholder() { + let import = build_import("/tmp/export-path"); + let sql = "COPY DATABASE \"catalog\".\"schema\" FROM '/tmp/export-path/catalog/schema/' WITH (format = 'parquet');"; + assert!(import.rewrite_copy_database_sql("schema", sql).is_err()); + } +}