From d67314789cc5e8b61b607b85fb2bc999eb04fa50 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Thu, 1 Aug 2024 17:14:44 +0800 Subject: [PATCH] feat: export all schemas and data at once in export tool (#4478) * feat: export all schemas and data at onece * feat: introduce export all to export schemas and data at once * feat: default value for target * feat: refactor export target * chore: fix unit test --- src/cmd/src/cli/export.rs | 141 ++++++++------------------------------ 1 file changed, 27 insertions(+), 114 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 171a0f2fa7..90699fae77 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -22,7 +22,7 @@ use base64::Engine; use clap::{Parser, ValueEnum}; use client::DEFAULT_SCHEMA_NAME; use common_catalog::consts::DEFAULT_CATALOG_NAME; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{debug, error, info}; use serde_json::Value; use servers::http::greptime_result_v1::GreptimedbV1Response; use servers::http::GreptimeQueryOutput; @@ -42,14 +42,13 @@ type TableReference = (String, String, String); #[derive(Debug, Default, Clone, ValueEnum)] enum ExportTarget { - /// Corresponding to `SHOW CREATE TABLE` + /// Export all table schemas, corresponding to `SHOW CREATE TABLE`. + Schema, + /// Export all table data, corresponding to `COPY DATABASE TO`. + Data, + /// Export all table schemas and data at once. #[default] - CreateTable, - /// Corresponding to `EXPORT TABLE` - #[deprecated(note = "Please use `DatabaseData` instead.")] - TableData, - /// Corresponding to `EXPORT DATABASE` - DatabaseData, + All, } #[derive(Debug, Default, Parser)] @@ -75,7 +74,7 @@ pub struct ExportCommand { max_retry: usize, /// Things to export - #[clap(long, short = 't', value_enum)] + #[clap(long, short = 't', value_enum, default_value = "all")] target: ExportTarget, /// A half-open time range: [start_time, end_time). @@ -178,7 +177,7 @@ impl Export { if let Some(schema) = &self.schema { Ok(vec![(self.catalog.clone(), schema.clone())]) } else { - let result = self.sql("show databases").await?; + let result = self.sql("SHOW DATABASES").await?; let Some(records) = result else { EmptyResultSnafu.fail()? }; @@ -205,9 +204,11 @@ impl Export { ) -> Result<(Vec, Vec)> { // Puts all metric table first let sql = format!( - "select table_catalog, table_schema, table_name from \ - information_schema.columns where column_name = '__tsid' \ - and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'" + "SELECT table_catalog, table_schema, table_name \ + FROM information_schema.columns \ + WHERE column_name = '__tsid' \ + and table_catalog = \'{catalog}\' \ + and table_schema = \'{schema}\'" ); let result = self.sql(&sql).await?; let Some(records) = result else { @@ -227,9 +228,11 @@ impl Export { // TODO: SQL injection hurts let sql = format!( - "select table_catalog, table_schema, table_name from \ - information_schema.tables where table_type = \'BASE TABLE\' \ - and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'", + "SELECT table_catalog, table_schema, table_name \ + FROM information_schema.tables \ + WHERE table_type = \'BASE TABLE\' \ + and table_catalog = \'{catalog}\' \ + and table_schema = \'{schema}\'", ); let result = self.sql(&sql).await?; let Some(records) = result else { @@ -266,7 +269,7 @@ impl Export { async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result { let sql = format!( - r#"show create table "{}"."{}"."{}""#, + r#"SHOW CREATE TABLE "{}"."{}"."{}""#, catalog, schema, table ); let result = self.sql(&sql).await?; @@ -341,99 +344,6 @@ impl Export { Ok(()) } - async fn export_table_data(&self) -> Result<()> { - let timer = Instant::now(); - let semaphore = Arc::new(Semaphore::new(self.parallelism)); - let db_names = self.iter_db_names().await?; - let db_count = db_names.len(); - let mut tasks = Vec::with_capacity(db_names.len()); - for (catalog, schema) in db_names { - let semaphore_moved = semaphore.clone(); - tasks.push(async move { - let _permit = semaphore_moved.acquire().await.unwrap(); - tokio::fs::create_dir_all(&self.output_dir) - .await - .context(FileIoSnafu)?; - let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); - // Ignores metric physical tables - let (metrics_tables, table_list) = self.get_table_list(&catalog, &schema).await?; - for (_, _, table_name) in metrics_tables { - warn!("Ignores metric physical table: {table_name}"); - } - for (catalog_name, schema_name, table_name) in table_list { - // copy table to - let sql = format!( - r#"Copy "{}"."{}"."{}" TO '{}{}.parquet' WITH (format='parquet');"#, - catalog_name, - schema_name, - table_name, - output_dir.to_str().unwrap(), - table_name, - ); - info!("Executing sql: {sql}"); - self.sql(&sql).await?; - } - info!("Finished exporting {catalog}.{schema} data"); - - // export copy from sql - let dir_filenames = match output_dir.read_dir() { - Ok(dir) => dir, - Err(_) => { - warn!("empty database {catalog}.{schema}"); - return Ok(()); - } - }; - - let copy_from_file = - Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql")); - let mut writer = - BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); - - for table_file in dir_filenames { - let table_file = table_file.unwrap(); - let table_name = table_file - .file_name() - .into_string() - .unwrap() - .replace(".parquet", ""); - - writer - .write( - format!( - "copy {} from '{}' with (format='parquet');\n", - table_name, - table_file.path().to_str().unwrap() - ) - .as_bytes(), - ) - .await - .context(FileIoSnafu)?; - } - writer.flush().await.context(FileIoSnafu)?; - - info!("finished exporting {catalog}.{schema} copy_from.sql"); - - Ok::<(), Error>(()) - }); - } - - let success = futures::future::join_all(tasks) - .await - .into_iter() - .filter(|r| match r { - Ok(_) => true, - Err(e) => { - error!(e; "export job failed"); - false - } - }) - .count(); - let elapsed = timer.elapsed(); - info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed); - - Ok(()) - } - async fn export_database_data(&self) -> Result<()> { let timer = Instant::now(); let semaphore = Arc::new(Semaphore::new(self.parallelism)); @@ -530,9 +440,12 @@ impl Export { impl Tool for Export { async fn do_work(&self) -> Result<()> { match self.target { - ExportTarget::CreateTable => self.export_create_table().await, - ExportTarget::TableData => self.export_table_data().await, - ExportTarget::DatabaseData => self.export_database_data().await, + ExportTarget::Schema => self.export_create_table().await, + ExportTarget::Data => self.export_database_data().await, + ExportTarget::All => { + self.export_create_table().await?; + self.export_database_data().await + } } } } @@ -619,7 +532,7 @@ mod tests { "--output-dir", &*output_dir.path().to_string_lossy(), "--target", - "create-table", + "schema", ]); let mut cli_app = cli.build(LoggingOptions::default()).await?; cli_app.start().await?;