chore: export tool minor refactor (#2612)

* chore: cr issue

* chore: cr issue

* chore: skip information schema

* chore: fix clippy

* chore: add basic auth support to cli export

* chore: fix cr issue

* chore: reduce `flush` invocation

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: use BufWriter

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
shuiyisong
2023-11-07 00:47:27 +08:00
committed by GitHub
parent 51ddebdc73
commit 7e0dcfc797

View File

@@ -17,6 +17,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use client::api::v1::auth_header::AuthScheme;
use client::api::v1::Basic;
use client::{Client, Database, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::util::collect;
@@ -25,13 +27,14 @@ use datatypes::scalars::ScalarVector;
use datatypes::vectors::{StringVector, Vector};
use snafu::{OptionExt, ResultExt};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use crate::cli::{Instance, Tool};
use crate::error::{
CollectRecordBatchesSnafu, ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu,
InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, Result,
IllegalConfigSnafu, InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu,
Result,
};
type TableReference = (String, String, String);
@@ -70,6 +73,10 @@ pub struct ExportCommand {
/// Things to export
#[clap(long, short = 't', value_enum)]
target: ExportTarget,
/// basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
}
impl ExportCommand {
@@ -82,12 +89,22 @@ impl ExportCommand {
addr: self.addr.clone(),
})?;
let (catalog, schema) = split_database(&self.database)?;
let database_client = Database::new(
let mut database_client = Database::new(
catalog.clone(),
schema.clone().unwrap_or(DEFAULT_SCHEMA_NAME.to_string()),
client,
);
if let Some(auth_basic) = &self.auth_basic {
let (username, password) = auth_basic.split_once(':').context(IllegalConfigSnafu {
msg: "auth_basic cannot be split by ':'".to_string(),
})?;
database_client.set_auth(AuthScheme::Basic(Basic {
username: username.to_string(),
password: password.to_string(),
}));
}
Ok(Instance::Tool(Box::new(Export {
client: database_client,
catalog,
@@ -141,6 +158,9 @@ impl Export {
let mut result = Vec::with_capacity(schemas.len());
for i in 0..schemas.len() {
let schema = schemas.get_data(i).unwrap().to_owned();
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
continue;
}
result.push((self.catalog.clone(), schema));
}
Ok(result)
@@ -326,25 +346,30 @@ impl Export {
let copy_from_file =
Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql"));
let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?;
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_from_sql = dir_filenames
.into_iter()
.map(|file| {
let file = file.unwrap();
let filename = file.file_name().into_string().unwrap();
for table_file in dir_filenames {
let table_file = table_file.unwrap();
let table_name = table_file
.file_name()
.into_string()
.unwrap()
.replace(".parquet", "");
format!(
"copy {} from '{}' with (format='parquet');\n",
filename.replace(".parquet", ""),
file.path().to_str().unwrap()
writer
.write(
format!(
"copy {} from '{}' with (format='parquet');\n",
table_name,
table_file.path().to_str().unwrap()
)
.as_bytes(),
)
})
.collect::<Vec<_>>()
.join("");
file.write_all(copy_from_sql.as_bytes())
.await
.context(FileIoSnafu)?;
.await
.context(FileIoSnafu)?;
}
writer.flush().await.context(FileIoSnafu)?;
info!("finished exporting {catalog}.{schema} copy_from.sql");