feat: export database data (#4382)

* feat: export database data

* feat: export data with time range

* feat: refactor the data dir

* feat: by comment
This commit is contained in:
Jeremyhi
2024-07-19 17:29:45 +08:00
committed by GitHub
parent 9fa9156bde
commit b90267dd80

View File

@@ -46,7 +46,10 @@ enum ExportTarget {
#[default]
CreateTable,
/// Corresponding to `EXPORT TABLE`
#[deprecated(note = "Please use `DatabaseData` instead.")]
TableData,
/// Corresponding to `EXPORT DATABASE`
DatabaseData,
}
#[derive(Debug, Default, Parser)]
@@ -75,7 +78,17 @@ pub struct ExportCommand {
#[clap(long, short = 't', value_enum)]
target: ExportTarget,
/// basic authentication for connecting to the server
/// A half-open time range: [start_time, end_time).
/// The start of the time range (time-index column) for data export.
#[clap(long)]
start_time: Option<String>,
/// A half-open time range: [start_time, end_time).
/// The end of the time range (time-index column) for data export.
#[clap(long)]
end_time: Option<String>,
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
}
@@ -99,6 +112,8 @@ impl ExportCommand {
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
auth_header,
}),
guard,
@@ -113,6 +128,8 @@ pub struct Export {
output_dir: String,
parallelism: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
auth_header: Option<String>,
}
@@ -167,7 +184,7 @@ impl Export {
};
let mut result = Vec::with_capacity(records.len());
for value in records {
let serde_json::Value::String(schema) = &value[0] else {
let Value::String(schema) = &value[0] else {
unreachable!()
};
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
@@ -256,7 +273,7 @@ impl Export {
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let serde_json::Value::String(create_table) = &records[0][1] else {
let Value::String(create_table) = &records[0][1] else {
unreachable!()
};
@@ -276,11 +293,13 @@ impl Export {
let (metric_physical_tables, remaining_tables) =
self.get_table_list(&catalog, &schema).await?;
let table_count = metric_physical_tables.len() + remaining_tables.len();
tokio::fs::create_dir_all(&self.output_dir)
let output_dir = Path::new(&self.output_dir)
.join(&catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
.await
.context(FileIoSnafu)?;
let output_file =
Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql"));
let output_file = Path::new(&output_dir).join("create_tables.sql");
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
match self.show_create_table(&c, &s, &t).await {
@@ -294,7 +313,12 @@ impl Export {
}
}
}
info!("finished exporting {catalog}.{schema} with {table_count} tables",);
info!(
"Finished exporting {catalog}.{schema} with {table_count} table schemas to path: {}",
output_dir.to_string_lossy()
);
Ok::<(), Error>(())
});
}
@@ -409,14 +433,106 @@ impl Export {
Ok(())
}
async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for (catalog, schema) in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let output_dir = Path::new(&self.output_dir)
.join(&catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
.await
.context(FileIoSnafu)?;
let with_options = match (&self.start_time, &self.end_time) {
(Some(start_time), Some(end_time)) => {
format!(
"WITH (FORMAT='parquet', start_time='{}', end_time='{}')",
start_time, end_time
)
}
(Some(start_time), None) => {
format!("WITH (FORMAT='parquet', start_time='{}')", start_time)
}
(None, Some(end_time)) => {
format!("WITH (FORMAT='parquet', end_time='{}')", end_time)
}
(None, None) => "WITH (FORMAT='parquet')".to_string(),
};
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
catalog,
schema,
output_dir.to_str().unwrap(),
with_options
);
info!("Executing sql: {sql}");
self.sql(&sql).await?;
info!(
"Finished exporting {catalog}.{schema} data into path: {}",
output_dir.to_string_lossy()
);
// The export copy from sql
let copy_from_file = output_dir.join("copy_from.sql");
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
catalog,
schema,
output_dir.to_str().unwrap()
);
writer
.write(copy_database_from_sql.as_bytes())
.await
.context(FileIoSnafu)?;
writer.flush().await.context(FileIoSnafu)?;
info!("Finished exporting {catalog}.{schema} copy_from.sql");
Ok::<(), Error>(())
})
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export database job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);
Ok(())
}
}
#[allow(deprecated)]
#[async_trait]
impl Tool for Export {
async fn do_work(&self) -> Result<()> {
match self.target {
ExportTarget::CreateTable => self.export_create_table().await,
ExportTarget::TableData => self.export_table_data().await,
ExportTarget::DatabaseData => self.export_database_data().await,
}
}
}
@@ -496,7 +612,9 @@ mod tests {
let output_file = output_dir
.path()
.join("greptime-cli.export.create_table.sql");
.join("greptime")
.join("cli.export.create_table")
.join("create_tables.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,