feat(cli): export metric physical tables first (#3949)

* feat: export metric physical tables first

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-05-16 15:30:20 +09:00
committed by GitHub
parent dff7ba7598
commit c915916b62
3 changed files with 56 additions and 11 deletions

View File

@@ -64,6 +64,10 @@ impl App for Instance {
self.tool.do_work().await
}
fn wait_signal(&self) -> bool {
false
}
async fn stop(&self) -> Result<()> {
Ok(())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
@@ -28,6 +29,7 @@ use snafu::{OptionExt, ResultExt};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::cli::{Instance, Tool};
use crate::error::{
@@ -176,6 +178,28 @@ impl Export {
/// Return a list of [`TableReference`] to be exported.
/// Includes all tables under the given `catalog` and `schema`
async fn get_table_list(&self, catalog: &str, schema: &str) -> Result<Vec<TableReference>> {
// Puts all metric table first
let sql = format!(
"select table_catalog, table_schema, table_name from \
information_schema.columns where column_name = '__tsid' \
and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'"
);
let result = self.sql(&sql).await?;
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let mut metric_physical_tables = HashSet::with_capacity(records.len());
for value in records {
let mut t = Vec::with_capacity(3);
for v in &value {
let serde_json::Value::String(value) = v else {
unreachable!()
};
t.push(value);
}
metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
}
// TODO: SQL injection hurts
let sql = format!(
"select table_catalog, table_schema, table_name from \
@@ -193,7 +217,7 @@ impl Export {
return Ok(vec![]);
}
let mut result = Vec::with_capacity(records.len());
let mut remaining_tables = Vec::with_capacity(records.len());
for value in records {
let mut t = Vec::with_capacity(3);
for v in &value {
@@ -202,10 +226,17 @@ impl Export {
};
t.push(value);
}
result.push((t[0].clone(), t[1].clone(), t[2].clone()));
let table = (t[0].clone(), t[1].clone(), t[2].clone());
// Ignores the physical table
if !metric_physical_tables.contains(&table) {
remaining_tables.push(table);
}
}
let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len());
tables.extend(metric_physical_tables.into_iter());
tables.extend(remaining_tables);
Ok(result)
Ok(tables)
}
async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
@@ -225,6 +256,7 @@ impl Export {
}
async fn export_create_table(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
@@ -270,12 +302,14 @@ impl Export {
})
.count();
info!("success {success}/{db_count} jobs");
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, cost: {:?}", elapsed);
Ok(())
}
async fn export_table_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
@@ -351,8 +385,8 @@ impl Export {
}
})
.count();
info!("success {success}/{db_count} jobs");
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);
Ok(())
}

View File

@@ -41,6 +41,11 @@ pub trait App: Send {
async fn start(&mut self) -> error::Result<()>;
/// Waits the quit signal by default.
fn wait_signal(&self) -> bool {
true
}
async fn stop(&self) -> error::Result<()>;
}
@@ -51,11 +56,13 @@ pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
app.start().await?;
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
if app.wait_signal() {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}
}
app.stop().await?;