From 11bfb17328d87b18e51bf7d032e4f0bf65e90f9c Mon Sep 17 00:00:00 2001 From: yihong Date: Tue, 11 Mar 2025 16:56:59 +0800 Subject: [PATCH] feat: support export command export data to s3 (#5585) * feat: s3 first step Signed-off-by: yihong0618 * fix: finish s3 export Signed-off-by: yihong0618 * fix: drop useless comment Signed-off-by: yihong0618 * fix: forget to create_database and copy_from Signed-off-by: yihong0618 * fix: address comment use opendal Fs Signed-off-by: yihong0618 * refactor: make the export mess code clean Signed-off-by: yihong0618 --------- Signed-off-by: yihong0618 --- Cargo.lock | 1 + src/cli/Cargo.toml | 4 + src/cli/src/error.rs | 21 +++ src/cli/src/export.rs | 410 +++++++++++++++++++++++++++++++----------- 4 files changed, 330 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f5ecef85e..e0e505f365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1693,6 +1693,7 @@ dependencies = [ "humantime", "meta-client", "nu-ansi-term", + "opendal", "query", "rand", "reqwest", diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index b27ae727b9..8904c91935 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -43,6 +43,10 @@ futures.workspace = true humantime.workspace = true meta-client.workspace = true nu-ansi-term = "0.46" +opendal = { version = "0.51.1", features = [ + "services-fs", + "services-s3", +] } query.workspace = true rand.workspace = true reqwest.workspace = true diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index 1b79ee759b..be852e7d73 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -276,6 +276,24 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("OpenDAL operator failed"))] + OpenDal { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: opendal::Error, + }, + #[snafu(display("S3 config need be set"))] + S3ConfigNotSet { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Output directory not set"))] + OutputDirNotSet { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -319,6 +337,9 @@ impl ErrorExt for Error { | Error::BuildClient { .. } => StatusCode::Unexpected, Error::Other { source, .. } => source.status_code(), + Error::OpenDal { .. } => StatusCode::Internal, + Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments, + Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments, Error::BuildRuntime { source, .. } => source.status_code(), diff --git a/src/cli/src/export.rs b/src/cli/src/export.rs index 846e2a49ad..e771df12ec 100644 --- a/src/cli/src/export.rs +++ b/src/cli/src/export.rs @@ -21,15 +21,18 @@ use async_trait::async_trait; use clap::{Parser, ValueEnum}; use common_error::ext::BoxedError; use common_telemetry::{debug, error, info}; +use opendal::layers::LoggingLayer; +use opendal::{services, Operator}; use serde_json::Value; use snafu::{OptionExt, ResultExt}; -use tokio::fs::File; -use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; use tokio::time::Instant; use crate::database::{parse_proxy_opts, DatabaseClient}; -use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; +use crate::error::{ + EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu, + SchemaNotFoundSnafu, +}; use crate::{database, Tool}; type TableReference = (String, String, String); @@ -52,8 +55,9 @@ pub struct ExportCommand { addr: String, /// Directory to put the exported data. E.g.: /tmp/greptimedb-export + /// for local export. #[clap(long)] - output_dir: String, + output_dir: Option, /// The name of the catalog to export. #[clap(long, default_value = "greptime-*")] @@ -101,10 +105,51 @@ pub struct ExportCommand { /// Disable proxy server, if set, will not use any proxy. #[clap(long)] no_proxy: bool, + + /// if export data to s3 + #[clap(long)] + s3: bool, + + /// The s3 bucket name + /// if s3 is set, this is required + #[clap(long)] + s3_bucket: Option, + + /// The s3 endpoint + /// if s3 is set, this is required + #[clap(long)] + s3_endpoint: Option, + + /// The s3 access key + /// if s3 is set, this is required + #[clap(long)] + s3_access_key: Option, + + /// The s3 secret key + /// if s3 is set, this is required + #[clap(long)] + s3_secret_key: Option, + + /// The s3 region + /// if s3 is set, this is required + #[clap(long)] + s3_region: Option, } impl ExportCommand { pub async fn build(&self) -> std::result::Result, BoxedError> { + if self.s3 + && (self.s3_bucket.is_none() + || self.s3_endpoint.is_none() + || self.s3_access_key.is_none() + || self.s3_secret_key.is_none() + || self.s3_region.is_none()) + { + return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build())); + } + if !self.s3 && self.output_dir.is_none() { + return Err(BoxedError::new(OutputDirNotSetSnafu {}.build())); + } let (catalog, schema) = database::split_database(&self.database).map_err(BoxedError::new)?; let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?; @@ -126,24 +171,43 @@ impl ExportCommand { target: self.target.clone(), start_time: self.start_time.clone(), end_time: self.end_time.clone(), + s3: self.s3, + s3_bucket: self.s3_bucket.clone(), + s3_endpoint: self.s3_endpoint.clone(), + s3_access_key: self.s3_access_key.clone(), + s3_secret_key: self.s3_secret_key.clone(), + s3_region: self.s3_region.clone(), })) } } +#[derive(Clone)] pub struct Export { catalog: String, schema: Option, database_client: DatabaseClient, - output_dir: String, + output_dir: Option, parallelism: usize, target: ExportTarget, start_time: Option, end_time: Option, + s3: bool, + s3_bucket: Option, + s3_endpoint: Option, + s3_access_key: Option, + s3_secret_key: Option, + s3_region: Option, } impl Export { fn catalog_path(&self) -> PathBuf { - PathBuf::from(&self.output_dir).join(&self.catalog) + if self.s3 { + PathBuf::from(&self.catalog) + } else if let Some(dir) = &self.output_dir { + PathBuf::from(dir).join(&self.catalog) + } else { + unreachable!("catalog_path: output_dir must be set when not using s3") + } } async fn get_db_names(&self) -> Result> { @@ -300,19 +364,23 @@ impl Export { let timer = Instant::now(); let db_names = self.get_db_names().await?; let db_count = db_names.len(); + let operator = self.build_operator().await?; + for schema in db_names { - let db_dir = self.catalog_path().join(format!("{schema}/")); - tokio::fs::create_dir_all(&db_dir) - .await - .context(FileIoSnafu)?; - let file = db_dir.join("create_database.sql"); - let mut file = File::create(file).await.context(FileIoSnafu)?; let create_database = self .show_create("DATABASE", &self.catalog, &schema, None) .await?; - file.write_all(create_database.as_bytes()) - .await - .context(FileIoSnafu)?; + + let file_path = self.get_file_path(&schema, "create_database.sql"); + self.write_to_storage(&operator, &file_path, create_database.into_bytes()) + .await?; + + info!( + "Exported {}.{} database creation SQL to {}", + self.catalog, + schema, + self.format_output_path(&file_path) + ); } let elapsed = timer.elapsed(); @@ -326,149 +394,267 @@ impl Export { let semaphore = Arc::new(Semaphore::new(self.parallelism)); let db_names = self.get_db_names().await?; let db_count = db_names.len(); + let operator = Arc::new(self.build_operator().await?); let mut tasks = Vec::with_capacity(db_names.len()); + for schema in db_names { let semaphore_moved = semaphore.clone(); + let export_self = self.clone(); + let operator = operator.clone(); tasks.push(async move { let _permit = semaphore_moved.acquire().await.unwrap(); - let (metric_physical_tables, remaining_tables, views) = - self.get_table_list(&self.catalog, &schema).await?; - let table_count = - metric_physical_tables.len() + remaining_tables.len() + views.len(); - let db_dir = self.catalog_path().join(format!("{schema}/")); - tokio::fs::create_dir_all(&db_dir) - .await - .context(FileIoSnafu)?; - let file = db_dir.join("create_tables.sql"); - let mut file = File::create(file).await.context(FileIoSnafu)?; - for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) { - let create_table = self.show_create("TABLE", &c, &s, Some(&t)).await?; - file.write_all(create_table.as_bytes()) - .await - .context(FileIoSnafu)?; - } - for (c, s, v) in views { - let create_view = self.show_create("VIEW", &c, &s, Some(&v)).await?; - file.write_all(create_view.as_bytes()) - .await - .context(FileIoSnafu)?; + let (metric_physical_tables, remaining_tables, views) = export_self + .get_table_list(&export_self.catalog, &schema) + .await?; + + // Create directory if needed for file system storage + if !export_self.s3 { + let db_dir = format!("{}/{}/", export_self.catalog, schema); + operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; } + let file_path = export_self.get_file_path(&schema, "create_tables.sql"); + let mut content = Vec::new(); + + // Add table creation SQL + for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) { + let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?; + content.extend_from_slice(create_table.as_bytes()); + } + + // Add view creation SQL + for (c, s, v) in &views { + let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?; + content.extend_from_slice(create_view.as_bytes()); + } + + // Write to storage + export_self + .write_to_storage(&operator, &file_path, content) + .await?; + info!( - "Finished exporting {}.{schema} with {table_count} table schemas to path: {}", - self.catalog, - db_dir.to_string_lossy() + "Finished exporting {}.{schema} with {} table schemas to path: {}", + export_self.catalog, + metric_physical_tables.len() + remaining_tables.len() + views.len(), + export_self.format_output_path(&file_path) ); Ok::<(), Error>(()) }); } - let success = futures::future::join_all(tasks) - .await - .into_iter() - .filter(|r| match r { - Ok(_) => true, - Err(e) => { - error!(e; "export schema job failed"); - false - } - }) - .count(); - + let success = self.execute_tasks(tasks).await; let elapsed = timer.elapsed(); info!("Success {success}/{db_count} jobs, cost: {elapsed:?}"); Ok(()) } + async fn build_operator(&self) -> Result { + if self.s3 { + self.build_s3_operator().await + } else { + self.build_fs_operator().await + } + } + + async fn build_s3_operator(&self) -> Result { + let mut builder = services::S3::default().root("").bucket( + self.s3_bucket + .as_ref() + .expect("s3_bucket must be provided when s3 is enabled"), + ); + + if let Some(endpoint) = self.s3_endpoint.as_ref() { + builder = builder.endpoint(endpoint); + } + + if let Some(region) = self.s3_region.as_ref() { + builder = builder.region(region); + } + + if let Some(key_id) = self.s3_access_key.as_ref() { + builder = builder.access_key_id(key_id); + } + + if let Some(secret_key) = self.s3_secret_key.as_ref() { + builder = builder.secret_access_key(secret_key); + } + + let op = Operator::new(builder) + .context(OpenDalSnafu)? + .layer(LoggingLayer::default()) + .finish(); + Ok(op) + } + + async fn build_fs_operator(&self) -> Result { + let root = self + .output_dir + .as_ref() + .context(OutputDirNotSetSnafu)? + .clone(); + let op = Operator::new(services::Fs::default().root(&root)) + .context(OpenDalSnafu)? + .layer(LoggingLayer::default()) + .finish(); + Ok(op) + } + async fn export_database_data(&self) -> Result<()> { let timer = Instant::now(); let semaphore = Arc::new(Semaphore::new(self.parallelism)); let db_names = self.get_db_names().await?; let db_count = db_names.len(); let mut tasks = Vec::with_capacity(db_count); + let operator = Arc::new(self.build_operator().await?); + let with_options = build_with_options(&self.start_time, &self.end_time); + for schema in db_names { let semaphore_moved = semaphore.clone(); + let export_self = self.clone(); + let with_options_clone = with_options.clone(); + let operator = operator.clone(); + tasks.push(async move { let _permit = semaphore_moved.acquire().await.unwrap(); - let db_dir = self.catalog_path().join(format!("{schema}/")); - tokio::fs::create_dir_all(&db_dir) - .await - .context(FileIoSnafu)?; - let with_options = match (&self.start_time, &self.end_time) { - (Some(start_time), Some(end_time)) => { - format!( - "WITH (FORMAT='parquet', start_time='{}', end_time='{}')", - start_time, end_time - ) - } - (Some(start_time), None) => { - format!("WITH (FORMAT='parquet', start_time='{}')", start_time) - } - (None, Some(end_time)) => { - format!("WITH (FORMAT='parquet', end_time='{}')", end_time) - } - (None, None) => "WITH (FORMAT='parquet')".to_string(), - }; + // Create directory if not using S3 + if !export_self.s3 { + let db_dir = format!("{}/{}/", export_self.catalog, schema); + operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; + } + let (path, connection_part) = export_self.get_storage_params(&schema); + + // Execute COPY DATABASE TO command let sql = format!( - r#"COPY DATABASE "{}"."{}" TO '{}' {};"#, - self.catalog, - schema, - db_dir.to_str().unwrap(), - with_options + r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#, + export_self.catalog, schema, path, with_options_clone, connection_part + ); + info!("Executing sql: {sql}"); + export_self.database_client.sql_in_public(&sql).await?; + info!( + "Finished exporting {}.{} data to {}", + export_self.catalog, schema, path ); - info!("Executing sql: {sql}"); + // Create copy_from.sql file + let copy_database_from_sql = format!( + r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#, + export_self.catalog, schema, path, with_options_clone, connection_part + ); - self.database_client.sql_in_public(&sql).await?; + let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql"); + export_self + .write_to_storage( + &operator, + ©_from_path, + copy_database_from_sql.into_bytes(), + ) + .await?; info!( - "Finished exporting {}.{schema} data into path: {}", - self.catalog, - db_dir.to_string_lossy() - ); - - // The export copy from sql - let copy_from_file = db_dir.join("copy_from.sql"); - let mut writer = - BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); - let copy_database_from_sql = format!( - r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#, - self.catalog, + "Finished exporting {}.{} copy_from.sql to {}", + export_self.catalog, schema, - db_dir.to_str().unwrap() + export_self.format_output_path(©_from_path) ); - writer - .write(copy_database_from_sql.as_bytes()) - .await - .context(FileIoSnafu)?; - writer.flush().await.context(FileIoSnafu)?; - - info!("Finished exporting {}.{schema} copy_from.sql", self.catalog); Ok::<(), Error>(()) - }) + }); } - let success = futures::future::join_all(tasks) + let success = self.execute_tasks(tasks).await; + let elapsed = timer.elapsed(); + info!("Success {success}/{db_count} jobs, costs: {elapsed:?}"); + + Ok(()) + } + + fn get_file_path(&self, schema: &str, file_name: &str) -> String { + format!("{}/{}/{}", self.catalog, schema, file_name) + } + + fn format_output_path(&self, file_path: &str) -> String { + if self.s3 { + format!( + "s3://{}/{}", + self.s3_bucket.as_ref().unwrap_or(&String::new()), + file_path + ) + } else { + format!( + "{}/{}", + self.output_dir.as_ref().unwrap_or(&String::new()), + file_path + ) + } + } + + async fn write_to_storage( + &self, + op: &Operator, + file_path: &str, + content: Vec, + ) -> Result<()> { + op.write(file_path, content).await.context(OpenDalSnafu) + } + + fn get_storage_params(&self, schema: &str) -> (String, String) { + if self.s3 { + let s3_path = format!( + "s3://{}/{}/{}/", + // Safety: s3_bucket is required when s3 is enabled + self.s3_bucket.as_ref().unwrap(), + self.catalog, + schema + ); + + // endpoint is optional + let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() { + format!(", ENDPOINT='{}'", endpoint) + } else { + String::new() + }; + + // Safety: All s3 options are required + let connection_options = format!( + "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}", + self.s3_access_key.as_ref().unwrap(), + self.s3_secret_key.as_ref().unwrap(), + self.s3_region.as_ref().unwrap(), + endpoint_option + ); + + (s3_path, format!(" CONNECTION ({})", connection_options)) + } else { + ( + self.catalog_path() + .join(format!("{schema}/")) + .to_string_lossy() + .to_string(), + String::new(), + ) + } + } + + async fn execute_tasks( + &self, + tasks: Vec>>, + ) -> usize { + futures::future::join_all(tasks) .await .into_iter() .filter(|r| match r { Ok(_) => true, Err(e) => { - error!(e; "export database job failed"); + error!(e; "export job failed"); false } }) - .count(); - let elapsed = timer.elapsed(); - - info!("Success {success}/{db_count} jobs, costs: {elapsed:?}"); - - Ok(()) + .count() } } @@ -493,3 +679,15 @@ impl Tool for Export { } } } + +/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports. +fn build_with_options(start_time: &Option, end_time: &Option) -> String { + let mut options = vec!["format = 'parquet'".to_string()]; + if let Some(start) = start_time { + options.push(format!("start_time = '{}'", start)); + } + if let Some(end) = end_time { + options.push(format!("end_time = '{}'", end)); + } + options.join(", ") +}