feat: support export command export data to s3 (#5585)

* feat: s3 first step

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: finish s3 export

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: drop useless comment

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: forget to create_database and copy_from

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: address comment use opendal Fs

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* refactor: make the export mess code clean

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
This commit is contained in:
yihong
2025-03-11 16:56:59 +08:00
committed by GitHub
parent 1d87bd2d43
commit 11bfb17328
4 changed files with 330 additions and 106 deletions

1
Cargo.lock generated
View File

@@ -1693,6 +1693,7 @@ dependencies = [
"humantime",
"meta-client",
"nu-ansi-term",
"opendal",
"query",
"rand",
"reqwest",

View File

@@ -43,6 +43,10 @@ futures.workspace = true
humantime.workspace = true
meta-client.workspace = true
nu-ansi-term = "0.46"
opendal = { version = "0.51.1", features = [
"services-fs",
"services-s3",
] }
query.workspace = true
rand.workspace = true
reqwest.workspace = true

View File

@@ -276,6 +276,24 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("OpenDAL operator failed"))]
OpenDal {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: opendal::Error,
},
#[snafu(display("S3 config need be set"))]
S3ConfigNotSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Output directory not set"))]
OutputDirNotSet {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -319,6 +337,9 @@ impl ErrorExt for Error {
| Error::BuildClient { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),
Error::OpenDal { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments,
Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments,
Error::BuildRuntime { source, .. } => source.status_code(),

View File

@@ -21,15 +21,18 @@ use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use opendal::layers::LoggingLayer;
use opendal::{services, Operator};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::error::{
EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
SchemaNotFoundSnafu,
};
use crate::{database, Tool};
type TableReference = (String, String, String);
@@ -52,8 +55,9 @@ pub struct ExportCommand {
addr: String,
/// Directory to put the exported data. E.g.: /tmp/greptimedb-export
/// for local export.
#[clap(long)]
output_dir: String,
output_dir: Option<String>,
/// The name of the catalog to export.
#[clap(long, default_value = "greptime-*")]
@@ -101,10 +105,51 @@ pub struct ExportCommand {
/// Disable proxy server, if set, will not use any proxy.
#[clap(long)]
no_proxy: bool,
/// if export data to s3
#[clap(long)]
s3: bool,
/// The s3 bucket name
/// if s3 is set, this is required
#[clap(long)]
s3_bucket: Option<String>,
/// The s3 endpoint
/// if s3 is set, this is required
#[clap(long)]
s3_endpoint: Option<String>,
/// The s3 access key
/// if s3 is set, this is required
#[clap(long)]
s3_access_key: Option<String>,
/// The s3 secret key
/// if s3 is set, this is required
#[clap(long)]
s3_secret_key: Option<String>,
/// The s3 region
/// if s3 is set, this is required
#[clap(long)]
s3_region: Option<String>,
}
impl ExportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
if self.s3
&& (self.s3_bucket.is_none()
|| self.s3_endpoint.is_none()
|| self.s3_access_key.is_none()
|| self.s3_secret_key.is_none()
|| self.s3_region.is_none())
{
return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
}
if !self.s3 && self.output_dir.is_none() {
return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
}
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
@@ -126,24 +171,43 @@ impl ExportCommand {
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
s3: self.s3,
s3_bucket: self.s3_bucket.clone(),
s3_endpoint: self.s3_endpoint.clone(),
s3_access_key: self.s3_access_key.clone(),
s3_secret_key: self.s3_secret_key.clone(),
s3_region: self.s3_region.clone(),
}))
}
}
#[derive(Clone)]
pub struct Export {
catalog: String,
schema: Option<String>,
database_client: DatabaseClient,
output_dir: String,
output_dir: Option<String>,
parallelism: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
s3: bool,
s3_bucket: Option<String>,
s3_endpoint: Option<String>,
s3_access_key: Option<String>,
s3_secret_key: Option<String>,
s3_region: Option<String>,
}
impl Export {
fn catalog_path(&self) -> PathBuf {
PathBuf::from(&self.output_dir).join(&self.catalog)
if self.s3 {
PathBuf::from(&self.catalog)
} else if let Some(dir) = &self.output_dir {
PathBuf::from(dir).join(&self.catalog)
} else {
unreachable!("catalog_path: output_dir must be set when not using s3")
}
}
async fn get_db_names(&self) -> Result<Vec<String>> {
@@ -300,19 +364,23 @@ impl Export {
let timer = Instant::now();
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let operator = self.build_operator().await?;
for schema in db_names {
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = db_dir.join("create_database.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
let create_database = self
.show_create("DATABASE", &self.catalog, &schema, None)
.await?;
file.write_all(create_database.as_bytes())
.await
.context(FileIoSnafu)?;
let file_path = self.get_file_path(&schema, "create_database.sql");
self.write_to_storage(&operator, &file_path, create_database.into_bytes())
.await?;
info!(
"Exported {}.{} database creation SQL to {}",
self.catalog,
schema,
self.format_output_path(&file_path)
);
}
let elapsed = timer.elapsed();
@@ -326,149 +394,267 @@ impl Export {
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let operator = Arc::new(self.build_operator().await?);
let mut tasks = Vec::with_capacity(db_names.len());
for schema in db_names {
let semaphore_moved = semaphore.clone();
let export_self = self.clone();
let operator = operator.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let (metric_physical_tables, remaining_tables, views) =
self.get_table_list(&self.catalog, &schema).await?;
let table_count =
metric_physical_tables.len() + remaining_tables.len() + views.len();
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = db_dir.join("create_tables.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
let create_table = self.show_create("TABLE", &c, &s, Some(&t)).await?;
file.write_all(create_table.as_bytes())
.await
.context(FileIoSnafu)?;
}
for (c, s, v) in views {
let create_view = self.show_create("VIEW", &c, &s, Some(&v)).await?;
file.write_all(create_view.as_bytes())
.await
.context(FileIoSnafu)?;
let (metric_physical_tables, remaining_tables, views) = export_self
.get_table_list(&export_self.catalog, &schema)
.await?;
// Create directory if needed for file system storage
if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
}
let file_path = export_self.get_file_path(&schema, "create_tables.sql");
let mut content = Vec::new();
// Add table creation SQL
for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
content.extend_from_slice(create_table.as_bytes());
}
// Add view creation SQL
for (c, s, v) in &views {
let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
content.extend_from_slice(create_view.as_bytes());
}
// Write to storage
export_self
.write_to_storage(&operator, &file_path, content)
.await?;
info!(
"Finished exporting {}.{schema} with {table_count} table schemas to path: {}",
self.catalog,
db_dir.to_string_lossy()
"Finished exporting {}.{schema} with {} table schemas to path: {}",
export_self.catalog,
metric_physical_tables.len() + remaining_tables.len() + views.len(),
export_self.format_output_path(&file_path)
);
Ok::<(), Error>(())
});
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export schema job failed");
false
}
})
.count();
let success = self.execute_tasks(tasks).await;
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
Ok(())
}
async fn build_operator(&self) -> Result<Operator> {
if self.s3 {
self.build_s3_operator().await
} else {
self.build_fs_operator().await
}
}
async fn build_s3_operator(&self) -> Result<Operator> {
let mut builder = services::S3::default().root("").bucket(
self.s3_bucket
.as_ref()
.expect("s3_bucket must be provided when s3 is enabled"),
);
if let Some(endpoint) = self.s3_endpoint.as_ref() {
builder = builder.endpoint(endpoint);
}
if let Some(region) = self.s3_region.as_ref() {
builder = builder.region(region);
}
if let Some(key_id) = self.s3_access_key.as_ref() {
builder = builder.access_key_id(key_id);
}
if let Some(secret_key) = self.s3_secret_key.as_ref() {
builder = builder.secret_access_key(secret_key);
}
let op = Operator::new(builder)
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
Ok(op)
}
async fn build_fs_operator(&self) -> Result<Operator> {
let root = self
.output_dir
.as_ref()
.context(OutputDirNotSetSnafu)?
.clone();
let op = Operator::new(services::Fs::default().root(&root))
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
Ok(op)
}
async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_count);
let operator = Arc::new(self.build_operator().await?);
let with_options = build_with_options(&self.start_time, &self.end_time);
for schema in db_names {
let semaphore_moved = semaphore.clone();
let export_self = self.clone();
let with_options_clone = with_options.clone();
let operator = operator.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let with_options = match (&self.start_time, &self.end_time) {
(Some(start_time), Some(end_time)) => {
format!(
"WITH (FORMAT='parquet', start_time='{}', end_time='{}')",
start_time, end_time
)
}
(Some(start_time), None) => {
format!("WITH (FORMAT='parquet', start_time='{}')", start_time)
}
(None, Some(end_time)) => {
format!("WITH (FORMAT='parquet', end_time='{}')", end_time)
}
(None, None) => "WITH (FORMAT='parquet')".to_string(),
};
// Create directory if not using S3
if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
}
let (path, connection_part) = export_self.get_storage_params(&schema);
// Execute COPY DATABASE TO command
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
self.catalog,
schema,
db_dir.to_str().unwrap(),
with_options
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part
);
info!("Executing sql: {sql}");
export_self.database_client.sql_in_public(&sql).await?;
info!(
"Finished exporting {}.{} data to {}",
export_self.catalog, schema, path
);
info!("Executing sql: {sql}");
// Create copy_from.sql file
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part
);
self.database_client.sql_in_public(&sql).await?;
let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
export_self
.write_to_storage(
&operator,
&copy_from_path,
copy_database_from_sql.into_bytes(),
)
.await?;
info!(
"Finished exporting {}.{schema} data into path: {}",
self.catalog,
db_dir.to_string_lossy()
);
// The export copy from sql
let copy_from_file = db_dir.join("copy_from.sql");
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
self.catalog,
"Finished exporting {}.{} copy_from.sql to {}",
export_self.catalog,
schema,
db_dir.to_str().unwrap()
export_self.format_output_path(&copy_from_path)
);
writer
.write(copy_database_from_sql.as_bytes())
.await
.context(FileIoSnafu)?;
writer.flush().await.context(FileIoSnafu)?;
info!("Finished exporting {}.{schema} copy_from.sql", self.catalog);
Ok::<(), Error>(())
})
});
}
let success = futures::future::join_all(tasks)
let success = self.execute_tasks(tasks).await;
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
Ok(())
}
fn get_file_path(&self, schema: &str, file_name: &str) -> String {
format!("{}/{}/{}", self.catalog, schema, file_name)
}
fn format_output_path(&self, file_path: &str) -> String {
if self.s3 {
format!(
"s3://{}/{}",
self.s3_bucket.as_ref().unwrap_or(&String::new()),
file_path
)
} else {
format!(
"{}/{}",
self.output_dir.as_ref().unwrap_or(&String::new()),
file_path
)
}
}
async fn write_to_storage(
&self,
op: &Operator,
file_path: &str,
content: Vec<u8>,
) -> Result<()> {
op.write(file_path, content).await.context(OpenDalSnafu)
}
fn get_storage_params(&self, schema: &str) -> (String, String) {
if self.s3 {
let s3_path = format!(
"s3://{}/{}/{}/",
// Safety: s3_bucket is required when s3 is enabled
self.s3_bucket.as_ref().unwrap(),
self.catalog,
schema
);
// endpoint is optional
let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
format!(", ENDPOINT='{}'", endpoint)
} else {
String::new()
};
// Safety: All s3 options are required
let connection_options = format!(
"ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
self.s3_access_key.as_ref().unwrap(),
self.s3_secret_key.as_ref().unwrap(),
self.s3_region.as_ref().unwrap(),
endpoint_option
);
(s3_path, format!(" CONNECTION ({})", connection_options))
} else {
(
self.catalog_path()
.join(format!("{schema}/"))
.to_string_lossy()
.to_string(),
String::new(),
)
}
}
async fn execute_tasks(
&self,
tasks: Vec<impl std::future::Future<Output = Result<()>>>,
) -> usize {
futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export database job failed");
error!(e; "export job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
Ok(())
.count()
}
}
@@ -493,3 +679,15 @@ impl Tool for Export {
}
}
}
/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
let mut options = vec!["format = 'parquet'".to_string()];
if let Some(start) = start_time {
options.push(format!("start_time = '{}'", start));
}
if let Some(end) = end_time {
options.push(format!("end_time = '{}'", end));
}
options.join(", ")
}