mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 14:22:58 +00:00
feat: import backup data from local files (#7180)
* feat: import backup data from local files Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add unit tests Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -23,6 +23,8 @@ use crate::Tool;
|
||||
use crate::data::export::ExportCommand;
|
||||
use crate::data::import::ImportCommand;
|
||||
|
||||
pub(crate) const COPY_PATH_PLACEHOLDER: &str = "<PATH/TO/FILES>";
|
||||
|
||||
/// Command for data operations including exporting data from and importing data into GreptimeDB.
|
||||
#[derive(Subcommand)]
|
||||
pub enum DataCommand {
|
||||
|
||||
@@ -30,7 +30,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::data::default_database;
|
||||
use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
|
||||
use crate::database::{DatabaseClient, parse_proxy_opts};
|
||||
use crate::error::{
|
||||
EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
|
||||
@@ -668,10 +668,26 @@ impl Export {
|
||||
);
|
||||
|
||||
// Create copy_from.sql file
|
||||
let copy_database_from_sql = format!(
|
||||
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
|
||||
export_self.catalog, schema, path, with_options_clone, connection_part
|
||||
);
|
||||
let copy_database_from_sql = {
|
||||
let command_without_connection = format!(
|
||||
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
|
||||
export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
|
||||
);
|
||||
|
||||
if connection_part.is_empty() {
|
||||
command_without_connection
|
||||
} else {
|
||||
let command_with_connection = format!(
|
||||
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
|
||||
export_self.catalog, schema, path, with_options_clone, connection_part
|
||||
);
|
||||
|
||||
format!(
|
||||
"-- {}\n{}",
|
||||
command_with_connection, command_without_connection
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
|
||||
export_self
|
||||
|
||||
@@ -21,13 +21,13 @@ use clap::{Parser, ValueEnum};
|
||||
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::data::default_database;
|
||||
use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
|
||||
use crate::database::{DatabaseClient, parse_proxy_opts};
|
||||
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
|
||||
use crate::error::{Error, FileIoSnafu, InvalidArgumentsSnafu, Result, SchemaNotFoundSnafu};
|
||||
use crate::{Tool, database};
|
||||
|
||||
#[derive(Debug, Default, Clone, ValueEnum)]
|
||||
@@ -148,12 +148,15 @@ impl Import {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
let database_input_dir = self.catalog_path().join(&schema);
|
||||
let sql_file = database_input_dir.join(filename);
|
||||
let sql = tokio::fs::read_to_string(sql_file)
|
||||
let mut sql = tokio::fs::read_to_string(sql_file)
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
if sql.is_empty() {
|
||||
if sql.trim().is_empty() {
|
||||
info!("Empty `{filename}` {database_input_dir:?}");
|
||||
} else {
|
||||
if filename == "copy_from.sql" {
|
||||
sql = self.rewrite_copy_database_sql(&schema, &sql)?;
|
||||
}
|
||||
let db = exec_db.unwrap_or(&schema);
|
||||
self.database_client.sql(&sql, db).await?;
|
||||
info!("Imported `{filename}` for database {schema}");
|
||||
@@ -226,6 +229,57 @@ impl Import {
|
||||
}
|
||||
Ok(db_names)
|
||||
}
|
||||
|
||||
fn rewrite_copy_database_sql(&self, schema: &str, sql: &str) -> Result<String> {
|
||||
let target_location = self.build_copy_database_location(schema);
|
||||
let escaped_location = target_location.replace('\'', "''");
|
||||
|
||||
let mut first_stmt_checked = false;
|
||||
for line in sql.lines() {
|
||||
let trimmed = line.trim_start();
|
||||
if trimmed.is_empty() || trimmed.starts_with("--") {
|
||||
continue;
|
||||
}
|
||||
|
||||
ensure!(
|
||||
trimmed.starts_with("COPY DATABASE"),
|
||||
InvalidArgumentsSnafu {
|
||||
msg: "Expected COPY DATABASE statement at start of copy_from.sql"
|
||||
}
|
||||
);
|
||||
first_stmt_checked = true;
|
||||
break;
|
||||
}
|
||||
|
||||
ensure!(
|
||||
first_stmt_checked,
|
||||
InvalidArgumentsSnafu {
|
||||
msg: "COPY DATABASE statement not found in copy_from.sql"
|
||||
}
|
||||
);
|
||||
|
||||
ensure!(
|
||||
sql.contains(COPY_PATH_PLACEHOLDER),
|
||||
InvalidArgumentsSnafu {
|
||||
msg: format!(
|
||||
"Placeholder `{}` not found in COPY DATABASE statement",
|
||||
COPY_PATH_PLACEHOLDER
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
Ok(sql.replacen(COPY_PATH_PLACEHOLDER, &escaped_location, 1))
|
||||
}
|
||||
|
||||
fn build_copy_database_location(&self, schema: &str) -> String {
|
||||
let mut path = self.catalog_path();
|
||||
path.push(schema);
|
||||
let mut path_str = path.to_string_lossy().into_owned();
|
||||
if !path_str.ends_with('/') {
|
||||
path_str.push('/');
|
||||
}
|
||||
path_str
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -241,3 +295,52 @@ impl Tool for Import {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn build_import(input_dir: &str) -> Import {
|
||||
Import {
|
||||
catalog: "catalog".to_string(),
|
||||
schema: None,
|
||||
database_client: DatabaseClient::new(
|
||||
"127.0.0.1:4000".to_string(),
|
||||
"catalog".to_string(),
|
||||
None,
|
||||
Duration::from_secs(0),
|
||||
None,
|
||||
),
|
||||
input_dir: input_dir.to_string(),
|
||||
parallelism: 1,
|
||||
target: ImportTarget::Data,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rewrite_copy_database_sql_replaces_placeholder() {
|
||||
let import = build_import("/tmp/export-path");
|
||||
let comment = "-- COPY DATABASE \"catalog\".\"schema\" FROM 's3://bucket/demo/' WITH (format = 'parquet') CONNECTION (region = 'us-west-2')";
|
||||
let sql = format!(
|
||||
"{comment}\nCOPY DATABASE \"catalog\".\"schema\" FROM '{}' WITH (format = 'parquet');",
|
||||
COPY_PATH_PLACEHOLDER
|
||||
);
|
||||
|
||||
let rewritten = import.rewrite_copy_database_sql("schema", &sql).unwrap();
|
||||
let expected_location = import.build_copy_database_location("schema");
|
||||
let escaped = expected_location.replace('\'', "''");
|
||||
|
||||
assert!(rewritten.starts_with(comment));
|
||||
assert!(rewritten.contains(&format!("FROM '{escaped}'")));
|
||||
assert!(!rewritten.contains(COPY_PATH_PLACEHOLDER));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rewrite_copy_database_sql_requires_placeholder() {
|
||||
let import = build_import("/tmp/export-path");
|
||||
let sql = "COPY DATABASE \"catalog\".\"schema\" FROM '/tmp/export-path/catalog/schema/' WITH (format = 'parquet');";
|
||||
assert!(import.rewrite_copy_database_sql("schema", sql).is_err());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user