mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 09:50:40 +00:00
feat(cli): prevent exporting physical table data (#3978)
* feat: prevent exporting physical table data * chore: apply suggestions from CR
This commit is contained in:
@@ -176,8 +176,12 @@ impl Export {
|
||||
}
|
||||
|
||||
/// Return a list of [`TableReference`] to be exported.
|
||||
/// Includes all tables under the given `catalog` and `schema`
|
||||
async fn get_table_list(&self, catalog: &str, schema: &str) -> Result<Vec<TableReference>> {
|
||||
/// Includes all tables under the given `catalog` and `schema`.
|
||||
async fn get_table_list(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
) -> Result<(Vec<TableReference>, Vec<TableReference>)> {
|
||||
// Puts all metric table first
|
||||
let sql = format!(
|
||||
"select table_catalog, table_schema, table_name from \
|
||||
@@ -214,7 +218,7 @@ impl Export {
|
||||
debug!("Fetched table list: {:?}", records);
|
||||
|
||||
if records.is_empty() {
|
||||
return Ok(vec![]);
|
||||
return Ok((vec![], vec![]));
|
||||
}
|
||||
|
||||
let mut remaining_tables = Vec::with_capacity(records.len());
|
||||
@@ -232,11 +236,11 @@ impl Export {
|
||||
remaining_tables.push(table);
|
||||
}
|
||||
}
|
||||
let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len());
|
||||
tables.extend(metric_physical_tables.into_iter());
|
||||
tables.extend(remaining_tables);
|
||||
|
||||
Ok(tables)
|
||||
Ok((
|
||||
metric_physical_tables.into_iter().collect(),
|
||||
remaining_tables,
|
||||
))
|
||||
}
|
||||
|
||||
async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
|
||||
@@ -265,15 +269,16 @@ impl Export {
|
||||
let semaphore_moved = semaphore.clone();
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
let table_list = self.get_table_list(&catalog, &schema).await?;
|
||||
let table_count = table_list.len();
|
||||
let (metric_physical_tables, remaining_tables) =
|
||||
self.get_table_list(&catalog, &schema).await?;
|
||||
let table_count = metric_physical_tables.len() + remaining_tables.len();
|
||||
tokio::fs::create_dir_all(&self.output_dir)
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
let output_file =
|
||||
Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql"));
|
||||
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
|
||||
for (c, s, t) in table_list {
|
||||
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
|
||||
match self.show_create_table(&c, &s, &t).await {
|
||||
Err(e) => {
|
||||
error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t)
|
||||
@@ -322,15 +327,25 @@ impl Export {
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));
|
||||
|
||||
// copy database to
|
||||
let sql = format!(
|
||||
"copy database {} to '{}' with (format='parquet');",
|
||||
schema,
|
||||
output_dir.to_str().unwrap()
|
||||
);
|
||||
self.sql(&sql).await?;
|
||||
info!("finished exporting {catalog}.{schema} data");
|
||||
// Ignores metric physical tables
|
||||
let (metrics_tables, table_list) = self.get_table_list(&catalog, &schema).await?;
|
||||
for (_, _, table_name) in metrics_tables {
|
||||
warn!("Ignores metric physical table: {table_name}");
|
||||
}
|
||||
for (catalog_name, schema_name, table_name) in table_list {
|
||||
// copy table to
|
||||
let sql = format!(
|
||||
r#"Copy "{}"."{}"."{}" TO '{}{}.parquet' WITH (format='parquet');"#,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
output_dir.to_str().unwrap(),
|
||||
table_name,
|
||||
);
|
||||
info!("Executing sql: {sql}");
|
||||
self.sql(&sql).await?;
|
||||
}
|
||||
info!("Finished exporting {catalog}.{schema} data");
|
||||
|
||||
// export copy from sql
|
||||
let dir_filenames = match output_dir.read_dir() {
|
||||
|
||||
Reference in New Issue
Block a user