refactor: relocate CLI to a dedicated directory (#5101)

* refactor: relocate CLI to a dedicated directory

* chore: expose method and const

* refactor: use BoxedError

* chore: expose DatabaseClient

* chore: add clone derive
This commit is contained in:
Weny Xu
2024-12-05 20:12:24 +08:00
committed by GitHub
parent cf0c84bed1
commit 01a308fe6b
27 changed files with 817 additions and 280 deletions

65
src/cli/Cargo.toml Normal file
View File

@@ -0,0 +1,65 @@
[package]
name = "cli"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
clap.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry = { workspace = true, features = [
"deadlock_detection",
] }
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
futures.workspace = true
humantime.workspace = true
meta-client.workspace = true
nu-ansi-term = "0.46"
query.workspace = true
rand.workspace = true
reqwest.workspace = true
rustyline = "10.1"
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tracing-appender.workspace = true
[dev-dependencies]
client = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-version.workspace = true
serde.workspace = true
temp-env = "0.3"
tempfile.workspace = true

172
src/cli/src/bench.rs Normal file
View File

@@ -0,0 +1,172 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use table::table_name::TableName;
use self::metadata::TableMetadataBencher;
use crate::Tool;
mod metadata;
async fn bench_self_recorded<F, Fut>(desc: &str, f: F, count: u32)
where
F: Fn(u32) -> Fut,
Fut: Future<Output = Duration>,
{
let mut total = Duration::default();
for i in 1..=count {
total += f(i).await;
}
let cost = total.as_millis() as f64 / count as f64;
info!("{desc}, average operation cost: {cost:.2} ms");
}
#[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand {
#[clap(long)]
etcd_addr: String,
#[clap(long)]
count: u32,
}
impl BenchTableMetadataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();
let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));
let tool = BenchTableMetadata {
table_metadata_manager,
count: self.count,
};
Ok(Box::new(tool))
}
}
struct BenchTableMetadata {
table_metadata_manager: TableMetadataManagerRef,
count: u32,
}
#[async_trait]
impl Tool for BenchTableMetadata {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let bencher = TableMetadataBencher::new(self.table_metadata_manager.clone(), self.count);
bencher.bench_create().await;
bencher.bench_get().await;
bencher.bench_rename().await;
bencher.bench_delete().await;
Ok(())
}
}
fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
let columns = 100;
let mut column_schemas = Vec::with_capacity(columns);
column_schemas.push(
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true),
);
for i in 1..columns {
let column_name = format!("my_column_{i}");
column_schemas.push(ColumnSchema::new(
column_name,
ConcreteDataType::string_datatype(),
true,
));
}
let meta = RawTableMeta {
schema: RawSchema::new(column_schemas),
engine: "mito".to_string(),
created_on: chrono::DateTime::default(),
primary_key_indices: vec![],
next_column_id: columns as u32 + 1,
value_indices: vec![],
options: Default::default(),
region_numbers: (1..=100).collect(),
partition_key_indices: vec![],
};
RawTableInfo {
ident: TableIdent {
table_id,
version: 1,
},
name: table_name.table_name,
desc: Some("blah".to_string()),
catalog_name: table_name.catalog_name,
schema_name: table_name.schema_name,
meta,
table_type: TableType::Base,
}
}
fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
let mut region_routes = Vec::with_capacity(100);
let mut rng = rand::thread_rng();
for region_id in regions.into_iter().map(u64::from) {
region_routes.push(RegionRoute {
region: Region {
id: region_id.into(),
name: String::new(),
partition: None,
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer {
id: rng.gen_range(0..10),
addr: String::new(),
}),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
});
}
region_routes
}
fn create_region_wal_options(regions: Vec<RegionNumber>) -> HashMap<RegionNumber, String> {
// TODO(niebayes): construct region wal options for benchmark.
let _ = regions;
HashMap::default()
}

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use table::table_name::TableName;
use crate::bench::{
bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info,
};
pub struct TableMetadataBencher {
table_metadata_manager: TableMetadataManagerRef,
count: u32,
}
impl TableMetadataBencher {
pub fn new(table_metadata_manager: TableMetadataManagerRef, count: u32) -> Self {
Self {
table_metadata_manager,
count,
}
}
pub async fn bench_create(&self) {
let desc = format!(
"TableMetadataBencher: creating {} table metadata",
self.count
);
bench_self_recorded(
&desc,
|i| async move {
let table_name = format!("bench_table_name_{}", i);
let table_name = TableName::new("bench_catalog", "bench_schema", table_name);
let table_info = create_table_info(i, table_name);
let regions: Vec<_> = (0..64).collect();
let region_routes = create_region_routes(regions.clone());
let region_wal_options = create_region_wal_options(regions);
let start = Instant::now();
self.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
region_wal_options,
)
.await
.unwrap();
start.elapsed()
},
self.count,
)
.await;
}
pub async fn bench_get(&self) {
let desc = format!(
"TableMetadataBencher: getting {} table info and region routes",
self.count
);
bench_self_recorded(
&desc,
|i| async move {
let start = Instant::now();
self.table_metadata_manager
.get_full_table_info(i)
.await
.unwrap();
start.elapsed()
},
self.count,
)
.await;
}
pub async fn bench_delete(&self) {
let desc = format!(
"TableMetadataBencher: deleting {} table metadata",
self.count
);
bench_self_recorded(
&desc,
|i| async move {
let (table_info, table_route) = self
.table_metadata_manager
.get_full_table_info(i)
.await
.unwrap();
let start = Instant::now();
let table_info = table_info.unwrap();
let table_route = table_route.unwrap();
let table_id = table_info.table_info.ident.table_id;
let _ = self
.table_metadata_manager
.delete_table_metadata(table_id, &table_info.table_name(), &table_route)
.await;
start.elapsed()
},
self.count,
)
.await;
}
pub async fn bench_rename(&self) {
let desc = format!("TableMetadataBencher: renaming {} table", self.count);
bench_self_recorded(
&desc,
|i| async move {
let (table_info, _) = self
.table_metadata_manager
.get_full_table_info(i)
.await
.unwrap();
let new_table_name = format!("renamed_{}", i);
let start = Instant::now();
let _ = self
.table_metadata_manager
.rename_table(&table_info.unwrap(), new_table_name)
.await;
start.elapsed()
},
self.count,
)
.await;
}
}

154
src/cli/src/cmd.rs Normal file
View File

@@ -0,0 +1,154 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, InvalidReplCommandSnafu, Result};
/// Represents the parsed command from the user (which may be over many lines)
#[derive(Debug, PartialEq)]
pub(crate) enum ReplCommand {
Help,
UseDatabase { db_name: String },
Sql { sql: String },
Exit,
}
impl TryFrom<&str> for ReplCommand {
type Error = Error;
fn try_from(input: &str) -> Result<Self> {
let input = input.trim();
if input.is_empty() {
return InvalidReplCommandSnafu {
reason: "No command specified".to_string(),
}
.fail();
}
// If line ends with ';', it must be treated as a complete input.
// However, the opposite is not true.
let input_is_completed = input.ends_with(';');
let input = input.strip_suffix(';').map(|x| x.trim()).unwrap_or(input);
let lowercase = input.to_lowercase();
match lowercase.as_str() {
"help" => Ok(Self::Help),
"exit" | "quit" => Ok(Self::Exit),
_ => match input.split_once(' ') {
Some((maybe_use, database)) if maybe_use.to_lowercase() == "use" => {
Ok(Self::UseDatabase {
db_name: database.trim().to_string(),
})
}
// Any valid SQL must contains at least one whitespace.
Some(_) if input_is_completed => Ok(Self::Sql {
sql: input.to_string(),
}),
_ => InvalidReplCommandSnafu {
reason: format!("unknown command '{input}', maybe input is not completed"),
}
.fail(),
},
}
}
}
impl ReplCommand {
pub fn help() -> &'static str {
r#"
Available commands (case insensitive):
- 'help': print this help
- 'exit' or 'quit': exit the REPL
- 'use <your database name>': switch to another database/schema context
- Other typed in text will be treated as SQL.
You can enter new line while typing, just remember to end it with ';'.
"#
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error::InvalidReplCommand;
#[test]
fn test_from_str() {
fn test_ok(s: &str, expected: ReplCommand) {
let actual: ReplCommand = s.try_into().unwrap();
assert_eq!(expected, actual, "'{}'", s);
}
fn test_err(s: &str) {
let result: Result<ReplCommand> = s.try_into();
assert!(matches!(result, Err(InvalidReplCommand { .. })))
}
test_err("");
test_err(" ");
test_err("\t");
test_ok("help", ReplCommand::Help);
test_ok("help", ReplCommand::Help);
test_ok(" help", ReplCommand::Help);
test_ok(" help ", ReplCommand::Help);
test_ok(" HELP ", ReplCommand::Help);
test_ok(" Help; ", ReplCommand::Help);
test_ok(" help ; ", ReplCommand::Help);
test_ok("exit", ReplCommand::Exit);
test_ok("exit;", ReplCommand::Exit);
test_ok("exit ;", ReplCommand::Exit);
test_ok("EXIT", ReplCommand::Exit);
test_ok("quit", ReplCommand::Exit);
test_ok("quit;", ReplCommand::Exit);
test_ok("quit ;", ReplCommand::Exit);
test_ok("QUIT", ReplCommand::Exit);
test_ok(
"use Foo",
ReplCommand::UseDatabase {
db_name: "Foo".to_string(),
},
);
test_ok(
" use Foo ; ",
ReplCommand::UseDatabase {
db_name: "Foo".to_string(),
},
);
// ensure that database name is case sensitive
test_ok(
" use FOO ; ",
ReplCommand::UseDatabase {
db_name: "FOO".to_string(),
},
);
// ensure that we aren't messing with capitalization
test_ok(
"SELECT * from foo;",
ReplCommand::Sql {
sql: "SELECT * from foo".to_string(),
},
);
// Input line (that don't belong to any other cases above) must ends with ';' to make it a valid SQL.
test_err("insert blah");
test_ok(
"insert blah;",
ReplCommand::Sql {
sql: "insert blah".to_string(),
},
);
}
}

136
src/cli/src/database.rs Normal file
View File

@@ -0,0 +1,136 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use humantime::format_duration;
use serde_json::Value;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu};
#[derive(Debug, Clone)]
pub struct DatabaseClient {
addr: String,
catalog: String,
auth_header: Option<String>,
timeout: Duration,
}
impl DatabaseClient {
pub fn new(
addr: String,
catalog: String,
auth_basic: Option<String>,
timeout: Duration,
) -> Self {
let auth_header = if let Some(basic) = auth_basic {
let encoded = general_purpose::STANDARD.encode(basic);
Some(format!("basic {}", encoded))
} else {
None
};
Self {
addr,
catalog,
auth_header,
timeout,
}
}
pub async fn sql_in_public(&self, sql: &str) -> Result<Option<Vec<Vec<Value>>>> {
self.sql(sql, DEFAULT_SCHEMA_NAME).await
}
/// Execute sql query.
pub async fn sql(&self, sql: &str, schema: &str) -> Result<Option<Vec<Vec<Value>>>> {
let url = format!("http://{}/v1/sql", self.addr);
let params = [
("db", format!("{}-{}", self.catalog, schema)),
("sql", sql.to_string()),
];
let mut request = reqwest::Client::new()
.post(&url)
.form(&params)
.header("Content-Type", "application/x-www-form-urlencoded");
if let Some(ref auth) = self.auth_header {
request = request.header("Authorization", auth);
}
request = request.header(
GREPTIME_DB_HEADER_TIMEOUT,
format_duration(self.timeout).to_string(),
);
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
reason: format!("bad url: {}", url),
})?;
let response = response
.error_for_status()
.with_context(|_| HttpQuerySqlSnafu {
reason: format!("query failed: {}", sql),
})?;
let text = response.text().await.with_context(|_| HttpQuerySqlSnafu {
reason: "cannot get response text".to_string(),
})?;
let body = serde_json::from_str::<GreptimedbV1Response>(&text).context(SerdeJsonSnafu)?;
Ok(body.output().first().and_then(|output| match output {
GreptimeQueryOutput::Records(records) => Some(records.rows().clone()),
GreptimeQueryOutput::AffectedRows(_) => None,
}))
}
}
/// Split at `-`.
pub(crate) fn split_database(database: &str) -> Result<(String, Option<String>)> {
let (catalog, schema) = match database.split_once('-') {
Some((catalog, schema)) => (catalog, schema),
None => (DEFAULT_CATALOG_NAME, database),
};
if schema == "*" {
Ok((catalog.to_string(), None))
} else {
Ok((catalog.to_string(), Some(schema.to_string())))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_split_database() {
let result = split_database("catalog-schema").unwrap();
assert_eq!(result, ("catalog".to_string(), Some("schema".to_string())));
let result = split_database("schema").unwrap();
assert_eq!(result, ("greptime".to_string(), Some("schema".to_string())));
let result = split_database("catalog-*").unwrap();
assert_eq!(result, ("catalog".to_string(), None));
let result = split_database("*").unwrap();
assert_eq!(result, ("greptime".to_string(), None));
}
}

316
src/cli/src/error.rs Normal file
View File

@@ -0,0 +1,316 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use rustyline::error::ReadlineError;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to install ring crypto provider: {}", msg))]
InitTlsProvider {
#[snafu(implicit)]
location: Location,
msg: String,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to init DDL manager"))]
InitDdlManager {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to init default timezone"))]
InitTimezone {
#[snafu(implicit)]
location: Location,
source: common_time::error::Error,
},
#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::error::Error,
},
#[snafu(display("Failed to stop procedure manager"))]
StopProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Missing config, msg: {}", msg))]
MissingConfig {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Illegal config: {}", msg))]
IllegalConfig {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
#[snafu(display("Cannot create REPL"))]
ReplCreation {
#[snafu(source)]
error: ReadlineError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error reading command"))]
Readline {
#[snafu(source)]
error: ReadlineError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to request database, sql: {sql}"))]
RequestDatabase {
sql: String,
#[snafu(source)]
source: client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect RecordBatches"))]
CollectRecordBatches {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to pretty print Recordbatches"))]
PrettyPrintRecordBatches {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to start Meta client"))]
StartMetaClient {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
#[snafu(display("Failed to parse SQL: {}", sql))]
ParseSql {
sql: String,
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},
#[snafu(display("Failed to plan statement"))]
PlanStatement {
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},
#[snafu(display("Failed to encode logical plan in substrait"))]
SubstraitEncodeLogicalPlan {
#[snafu(implicit)]
location: Location,
source: substrait::error::Error,
},
#[snafu(display("Failed to load layered config"))]
LoadLayeredConfig {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to connect to Etcd at {etcd_addr}"))]
ConnectEtcd {
etcd_addr: String,
#[snafu(source)]
error: etcd_client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))]
SerdeJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to run http request: {reason}"))]
HttpQuerySql {
reason: String,
#[snafu(source)]
error: reqwest::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty result from output"))]
EmptyResult {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to manipulate file"))]
FileIo {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to create directory {}", dir))]
CreateDir {
dir: String,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to spawn thread"))]
SpawnThread {
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Other error"))]
Other {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build runtime"))]
BuildRuntime {
#[snafu(implicit)]
location: Location,
source: common_runtime::error::Error,
},
#[snafu(display("Failed to get cache from cache registry: {}", name))]
CacheRequired {
#[snafu(implicit)]
location: Location,
name: String,
},
#[snafu(display("Failed to build cache registry"))]
BuildCacheRegistry {
#[snafu(implicit)]
location: Location,
source: cache::error::Error,
},
#[snafu(display("Failed to initialize meta client"))]
MetaClientInit {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
#[snafu(display("Cannot find schema {schema} in catalog {catalog}"))]
SchemaNotFound {
catalog: String,
schema: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => {
StatusCode::Internal
}
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }
| Error::PrettyPrintRecordBatches { source, .. } => source.status_code(),
Error::StartMetaClient { source, .. } => source.status_code(),
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()
}
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
Error::SerdeJson { .. }
| Error::FileIo { .. }
| Error::SpawnThread { .. }
| Error::InitTlsProvider { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),
Error::BuildRuntime { source, .. } => source.status_code(),
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

484
src/cli/src/export.rs Normal file
View File

@@ -0,0 +1,484 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::DatabaseClient;
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
type TableReference = (String, String, String);
#[derive(Debug, Default, Clone, ValueEnum)]
enum ExportTarget {
/// Export all table schemas, corresponding to `SHOW CREATE TABLE`.
Schema,
/// Export all table data, corresponding to `COPY DATABASE TO`.
Data,
/// Export all table schemas and data at once.
#[default]
All,
}
#[derive(Debug, Default, Parser)]
pub struct ExportCommand {
/// Server address to connect
#[clap(long)]
addr: String,
/// Directory to put the exported data. E.g.: /tmp/greptimedb-export
#[clap(long)]
output_dir: String,
/// The name of the catalog to export.
#[clap(long, default_value = "greptime-*")]
database: String,
/// Parallelism of the export.
#[clap(long, short = 'j', default_value = "1")]
export_jobs: usize,
/// Max retry times for each job.
#[clap(long, default_value = "3")]
max_retry: usize,
/// Things to export
#[clap(long, short = 't', value_enum, default_value = "all")]
target: ExportTarget,
/// A half-open time range: [start_time, end_time).
/// The start of the time range (time-index column) for data export.
#[clap(long)]
start_time: Option<String>,
/// A half-open time range: [start_time, end_time).
/// The end of the time range (time-index column) for data export.
#[clap(long)]
end_time: Option<String>,
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
impl ExportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Box::new(Export {
catalog,
schema,
database_client,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
}))
}
}
pub struct Export {
catalog: String,
schema: Option<String>,
database_client: DatabaseClient,
output_dir: String,
parallelism: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
}
impl Export {
fn catalog_path(&self) -> PathBuf {
PathBuf::from(&self.output_dir).join(&self.catalog)
}
async fn get_db_names(&self) -> Result<Vec<String>> {
let db_names = self.all_db_names().await?;
let Some(schema) = &self.schema else {
return Ok(db_names);
};
// Check if the schema exists
db_names
.into_iter()
.find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
.map(|name| vec![name])
.context(SchemaNotFoundSnafu {
catalog: &self.catalog,
schema,
})
}
/// Iterate over all db names.
async fn all_db_names(&self) -> Result<Vec<String>> {
let records = self
.database_client
.sql_in_public("SHOW DATABASES")
.await?
.context(EmptyResultSnafu)?;
let mut result = Vec::with_capacity(records.len());
for value in records {
let Value::String(schema) = &value[0] else {
unreachable!()
};
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
continue;
}
if schema == common_catalog::consts::PG_CATALOG_NAME {
continue;
}
result.push(schema.clone());
}
Ok(result)
}
/// Return a list of [`TableReference`] to be exported.
/// Includes all tables under the given `catalog` and `schema`.
async fn get_table_list(
&self,
catalog: &str,
schema: &str,
) -> Result<(
Vec<TableReference>,
Vec<TableReference>,
Vec<TableReference>,
)> {
// Puts all metric table first
let sql = format!(
"SELECT table_catalog, table_schema, table_name \
FROM information_schema.columns \
WHERE column_name = '__tsid' \
and table_catalog = \'{catalog}\' \
and table_schema = \'{schema}\'"
);
let records = self
.database_client
.sql_in_public(&sql)
.await?
.context(EmptyResultSnafu)?;
let mut metric_physical_tables = HashSet::with_capacity(records.len());
for value in records {
let mut t = Vec::with_capacity(3);
for v in &value {
let Value::String(value) = v else {
unreachable!()
};
t.push(value);
}
metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
}
let sql = format!(
"SELECT table_catalog, table_schema, table_name, table_type \
FROM information_schema.tables \
WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
and table_catalog = \'{catalog}\' \
and table_schema = \'{schema}\'",
);
let records = self
.database_client
.sql_in_public(&sql)
.await?
.context(EmptyResultSnafu)?;
debug!("Fetched table/view list: {:?}", records);
if records.is_empty() {
return Ok((vec![], vec![], vec![]));
}
let mut remaining_tables = Vec::with_capacity(records.len());
let mut views = Vec::new();
for value in records {
let mut t = Vec::with_capacity(4);
for v in &value {
let Value::String(value) = v else {
unreachable!()
};
t.push(value);
}
let table = (t[0].clone(), t[1].clone(), t[2].clone());
let table_type = t[3].as_str();
// Ignores the physical table
if !metric_physical_tables.contains(&table) {
if table_type == "VIEW" {
views.push(table);
} else {
remaining_tables.push(table);
}
}
}
Ok((
metric_physical_tables.into_iter().collect(),
remaining_tables,
views,
))
}
async fn show_create(
&self,
show_type: &str,
catalog: &str,
schema: &str,
table: Option<&str>,
) -> Result<String> {
let sql = match table {
Some(table) => format!(
r#"SHOW CREATE {} "{}"."{}"."{}""#,
show_type, catalog, schema, table
),
None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
};
let records = self
.database_client
.sql_in_public(&sql)
.await?
.context(EmptyResultSnafu)?;
let Value::String(create) = &records[0][1] else {
unreachable!()
};
Ok(format!("{};\n", create))
}
async fn export_create_database(&self) -> Result<()> {
let timer = Instant::now();
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
for schema in db_names {
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = db_dir.join("create_database.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
let create_database = self
.show_create("DATABASE", &self.catalog, &schema, None)
.await?;
file.write_all(create_database.as_bytes())
.await
.context(FileIoSnafu)?;
}
let elapsed = timer.elapsed();
info!("Success {db_count} jobs, cost: {elapsed:?}");
Ok(())
}
async fn export_create_table(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for schema in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let (metric_physical_tables, remaining_tables, views) =
self.get_table_list(&self.catalog, &schema).await?;
let table_count =
metric_physical_tables.len() + remaining_tables.len() + views.len();
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = db_dir.join("create_tables.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
let create_table = self.show_create("TABLE", &c, &s, Some(&t)).await?;
file.write_all(create_table.as_bytes())
.await
.context(FileIoSnafu)?;
}
for (c, s, v) in views {
let create_view = self.show_create("VIEW", &c, &s, Some(&v)).await?;
file.write_all(create_view.as_bytes())
.await
.context(FileIoSnafu)?;
}
info!(
"Finished exporting {}.{schema} with {table_count} table schemas to path: {}",
self.catalog,
db_dir.to_string_lossy()
);
Ok::<(), Error>(())
});
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export schema job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
Ok(())
}
async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_count);
for schema in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let with_options = match (&self.start_time, &self.end_time) {
(Some(start_time), Some(end_time)) => {
format!(
"WITH (FORMAT='parquet', start_time='{}', end_time='{}')",
start_time, end_time
)
}
(Some(start_time), None) => {
format!("WITH (FORMAT='parquet', start_time='{}')", start_time)
}
(None, Some(end_time)) => {
format!("WITH (FORMAT='parquet', end_time='{}')", end_time)
}
(None, None) => "WITH (FORMAT='parquet')".to_string(),
};
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
self.catalog,
schema,
db_dir.to_str().unwrap(),
with_options
);
info!("Executing sql: {sql}");
self.database_client.sql_in_public(&sql).await?;
info!(
"Finished exporting {}.{schema} data into path: {}",
self.catalog,
db_dir.to_string_lossy()
);
// The export copy from sql
let copy_from_file = db_dir.join("copy_from.sql");
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
self.catalog,
schema,
db_dir.to_str().unwrap()
);
writer
.write(copy_database_from_sql.as_bytes())
.await
.context(FileIoSnafu)?;
writer.flush().await.context(FileIoSnafu)?;
info!("Finished exporting {}.{schema} copy_from.sql", self.catalog);
Ok::<(), Error>(())
})
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export database job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
Ok(())
}
}
#[async_trait]
impl Tool for Export {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
match self.target {
ExportTarget::Schema => {
self.export_create_database()
.await
.map_err(BoxedError::new)?;
self.export_create_table().await.map_err(BoxedError::new)
}
ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
ExportTarget::All => {
self.export_create_database()
.await
.map_err(BoxedError::new)?;
self.export_create_table().await.map_err(BoxedError::new)?;
self.export_database_data().await.map_err(BoxedError::new)
}
}
}
}

112
src/cli/src/helper.rs Normal file
View File

@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use rustyline::completion::Completer;
use rustyline::highlight::{Highlighter, MatchingBracketHighlighter};
use rustyline::hint::{Hinter, HistoryHinter};
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
use crate::cmd::ReplCommand;
pub(crate) struct RustylineHelper {
hinter: HistoryHinter,
highlighter: MatchingBracketHighlighter,
}
impl Default for RustylineHelper {
fn default() -> Self {
Self {
hinter: HistoryHinter {},
highlighter: MatchingBracketHighlighter::default(),
}
}
}
impl rustyline::Helper for RustylineHelper {}
impl Validator for RustylineHelper {
fn validate(&self, ctx: &mut ValidationContext<'_>) -> rustyline::Result<ValidationResult> {
let input = ctx.input();
match ReplCommand::try_from(input) {
Ok(_) => Ok(ValidationResult::Valid(None)),
Err(e) => {
if input.trim_end().ends_with(';') {
// If line ends with ';', it HAS to be a valid command.
Ok(ValidationResult::Invalid(Some(e.to_string())))
} else {
Ok(ValidationResult::Incomplete)
}
}
}
}
}
impl Hinter for RustylineHelper {
type Hint = String;
fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option<Self::Hint> {
self.hinter.hint(line, pos, ctx)
}
}
impl Highlighter for RustylineHelper {
fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> {
self.highlighter.highlight(line, pos)
}
fn highlight_prompt<'b, 's: 'b, 'p: 'b>(
&'s self,
prompt: &'p str,
default: bool,
) -> Cow<'b, str> {
self.highlighter.highlight_prompt(prompt, default)
}
fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> {
use nu_ansi_term::Style;
Cow::Owned(Style::new().dimmed().paint(hint).to_string())
}
fn highlight_candidate<'c>(
&self,
candidate: &'c str,
completion: rustyline::CompletionType,
) -> Cow<'c, str> {
self.highlighter.highlight_candidate(candidate, completion)
}
fn highlight_char(&self, line: &str, pos: usize) -> bool {
self.highlighter.highlight_char(line, pos)
}
}
impl Completer for RustylineHelper {
type Candidate = String;
fn complete(
&self,
line: &str,
pos: usize,
ctx: &rustyline::Context<'_>,
) -> rustyline::Result<(usize, Vec<Self::Candidate>)> {
// If there is a hint, use that as the auto-complete when user hits `tab`
if let Some(hint) = self.hinter.hint(line, pos, ctx) {
Ok((pos, vec![hint]))
} else {
Ok((0, vec![]))
}
}
}

229
src/cli/src/import.rs Normal file
View File

@@ -0,0 +1,229 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_error::ext::BoxedError;
use common_telemetry::{error, info, warn};
use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::DatabaseClient;
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
#[derive(Debug, Default, Clone, ValueEnum)]
enum ImportTarget {
/// Import all table schemas into the database.
Schema,
/// Import all table data into the database.
Data,
/// Export all table schemas and data at once.
#[default]
All,
}
#[derive(Debug, Default, Parser)]
pub struct ImportCommand {
/// Server address to connect
#[clap(long)]
addr: String,
/// Directory of the data. E.g.: /tmp/greptimedb-backup
#[clap(long)]
input_dir: String,
/// The name of the catalog to import.
#[clap(long, default_value = "greptime-*")]
database: String,
/// Parallelism of the import.
#[clap(long, short = 'j', default_value = "1")]
import_jobs: usize,
/// Max retry times for each job.
#[clap(long, default_value = "3")]
max_retry: usize,
/// Things to export
#[clap(long, short = 't', value_enum, default_value = "all")]
target: ImportTarget,
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
impl ImportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Box::new(Import {
catalog,
schema,
database_client,
input_dir: self.input_dir.clone(),
parallelism: self.import_jobs,
target: self.target.clone(),
}))
}
}
pub struct Import {
catalog: String,
schema: Option<String>,
database_client: DatabaseClient,
input_dir: String,
parallelism: usize,
target: ImportTarget,
}
impl Import {
async fn import_create_table(&self) -> Result<()> {
// Use default db to creates other dbs
self.do_sql_job("create_database.sql", Some(DEFAULT_SCHEMA_NAME))
.await?;
self.do_sql_job("create_tables.sql", None).await
}
async fn import_database_data(&self) -> Result<()> {
self.do_sql_job("copy_from.sql", None).await
}
async fn do_sql_job(&self, filename: &str, exec_db: Option<&str>) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_count);
for schema in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let database_input_dir = self.catalog_path().join(&schema);
let sql_file = database_input_dir.join(filename);
let sql = tokio::fs::read_to_string(sql_file)
.await
.context(FileIoSnafu)?;
if sql.is_empty() {
info!("Empty `{filename}` {database_input_dir:?}");
} else {
let db = exec_db.unwrap_or(&schema);
self.database_client.sql(&sql, db).await?;
info!("Imported `{filename}` for database {schema}");
}
Ok::<(), Error>(())
})
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "import {filename} job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} `{filename}` jobs, cost: {elapsed:?}");
Ok(())
}
fn catalog_path(&self) -> PathBuf {
PathBuf::from(&self.input_dir).join(&self.catalog)
}
async fn get_db_names(&self) -> Result<Vec<String>> {
let db_names = self.all_db_names().await?;
let Some(schema) = &self.schema else {
return Ok(db_names);
};
// Check if the schema exists
db_names
.into_iter()
.find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
.map(|name| vec![name])
.context(SchemaNotFoundSnafu {
catalog: &self.catalog,
schema,
})
}
// Get all database names in the input directory.
// The directory structure should be like:
// /tmp/greptimedb-backup
// ├── greptime-1
// │ ├── db1
// │ └── db2
async fn all_db_names(&self) -> Result<Vec<String>> {
let mut db_names = vec![];
let path = self.catalog_path();
let mut entries = tokio::fs::read_dir(path).await.context(FileIoSnafu)?;
while let Some(entry) = entries.next_entry().await.context(FileIoSnafu)? {
let path = entry.path();
if path.is_dir() {
let db_name = match path.file_name() {
Some(name) => name.to_string_lossy().to_string(),
None => {
warn!("Failed to get the file name of {:?}", path);
continue;
}
};
db_names.push(db_name);
}
}
Ok(db_names)
}
}
#[async_trait]
impl Tool for Import {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
match self.target {
ImportTarget::Schema => self.import_create_table().await.map_err(BoxedError::new),
ImportTarget::Data => self.import_database_data().await.map_err(BoxedError::new),
ImportTarget::All => {
self.import_create_table().await.map_err(BoxedError::new)?;
self.import_database_data().await.map_err(BoxedError::new)
}
}
}
}

60
src/cli/src/lib.rs Normal file
View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod bench;
pub mod error;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
#[allow(unused)]
mod repl;
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
pub use database::DatabaseClient;
use error::Result;
pub use repl::Repl;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
#[async_trait]
pub trait Tool: Send + Sync {
async fn do_work(&self) -> std::result::Result<(), BoxedError>;
}
#[derive(Debug, Parser)]
pub(crate) struct AttachCommand {
#[clap(long)]
pub(crate) grpc_addr: String,
#[clap(long)]
pub(crate) meta_addr: Option<String>,
#[clap(long, action)]
pub(crate) disable_helper: bool,
}
impl AttachCommand {
#[allow(dead_code)]
async fn build(self) -> Result<Box<dyn Tool>> {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}

299
src/cli/src/repl.rs Normal file
View File

@@ -0,0 +1,299 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME,
TABLE_ROUTE_CACHE_NAME,
};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::parser::QueryLanguageParser;
use query::query_engine::{DefaultSerializer, QueryEngineState};
use query::QueryEngine;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use crate::cmd::ReplCommand;
use crate::error::{
CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu,
ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu,
SubstraitEncodeLogicalPlanSnafu,
};
use crate::helper::RustylineHelper;
use crate::{error, AttachCommand};
/// Captures the state of the repl, gathers commands and executes them one by one
pub struct Repl {
/// Rustyline editor for interacting with user on command line
rl: Editor<RustylineHelper>,
/// Current prompt
prompt: String,
/// Client for interacting with GreptimeDB
database: Database,
query_engine: Option<DatafusionQueryEngine>,
}
#[allow(clippy::print_stdout)]
impl Repl {
fn print_help(&self) {
println!("{}", ReplCommand::help())
}
pub(crate) async fn try_new(cmd: &AttachCommand) -> Result<Self> {
let mut rl = Editor::new().context(ReplCreationSnafu)?;
if !cmd.disable_helper {
rl.set_helper(Some(RustylineHelper::default()));
let history_file = history_file();
if let Err(e) = rl.load_history(&history_file) {
debug!(
"failed to load history file on {}, error: {e}",
history_file.display()
);
}
}
let client = Client::with_urls([&cmd.grpc_addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let query_engine = if let Some(meta_addr) = &cmd.meta_addr {
create_query_engine(meta_addr).await.map(Some)?
} else {
None
};
Ok(Self {
rl,
prompt: "> ".to_string(),
database,
query_engine,
})
}
/// Parse the next command
fn next_command(&mut self) -> Result<ReplCommand> {
match self.rl.readline(&self.prompt) {
Ok(ref line) => {
let request = line.trim();
let _ = self.rl.add_history_entry(request.to_string());
request.try_into()
}
Err(ReadlineError::Eof) | Err(ReadlineError::Interrupted) => Ok(ReplCommand::Exit),
// Some sort of real underlying error
Err(e) => Err(e).context(ReadlineSnafu),
}
}
/// Read Evaluate Print Loop (interactive command line) for GreptimeDB
///
/// Inspired / based on repl.rs from InfluxDB IOX
pub(crate) async fn run(&mut self) -> Result<()> {
println!("Ready for commands. (Hint: try 'help')");
loop {
match self.next_command()? {
ReplCommand::Help => {
self.print_help();
}
ReplCommand::UseDatabase { db_name } => {
if self.execute_sql(format!("USE {db_name}")).await {
println!("Using {db_name}");
self.database.set_schema(&db_name);
self.prompt = format!("[{db_name}] > ");
}
}
ReplCommand::Sql { sql } => {
let _ = self.execute_sql(sql).await;
}
ReplCommand::Exit => {
return Ok(());
}
}
}
}
async fn execute_sql(&self, sql: String) -> bool {
self.do_execute_sql(sql)
.await
.map_err(|e| {
let status_code = e.status_code();
let root_cause = e.output_msg();
println!("Error: {}({status_code}), {root_cause}", status_code as u32)
})
.is_ok()
}
async fn do_execute_sql(&self, sql: String) -> Result<()> {
let start = Instant::now();
let output = if let Some(query_engine) = &self.query_engine {
let query_ctx = Arc::new(QueryContext::with(
self.database.catalog(),
self.database.schema(),
));
let stmt = QueryLanguageParser::parse_sql(&sql, &query_ctx)
.with_context(|_| ParseSqlSnafu { sql: sql.clone() })?;
let plan = query_engine
.planner()
.plan(&stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
let plan = query_engine
.optimize(&query_engine.engine_context(query_ctx), &plan)
.context(PlanStatementSnafu)?;
let plan = DFLogicalSubstraitConvertor {}
.encode(&plan, DefaultSerializer)
.context(SubstraitEncodeLogicalPlanSnafu)?;
self.database.logical_plan(plan.to_vec()).await
} else {
self.database.sql(&sql).await
}
.context(RequestDatabaseSnafu { sql: &sql })?;
let either = match output.data {
OutputData::Stream(s) => {
let x = RecordBatches::try_collect(s)
.await
.context(CollectRecordBatchesSnafu)?;
Either::Left(x)
}
OutputData::RecordBatches(x) => Either::Left(x),
OutputData::AffectedRows(rows) => Either::Right(rows),
};
let end = Instant::now();
match either {
Either::Left(recordbatches) => {
let total_rows: usize = recordbatches.iter().map(|x| x.num_rows()).sum();
if total_rows > 0 {
println!(
"{}",
recordbatches
.pretty_print()
.context(PrettyPrintRecordBatchesSnafu)?
);
}
println!("Total Rows: {total_rows}")
}
Either::Right(rows) => println!("Affected Rows: {rows}"),
};
println!("Cost {} ms", (end - start).as_millis());
Ok(())
}
}
impl Drop for Repl {
fn drop(&mut self) {
if self.rl.helper().is_some() {
let history_file = history_file();
if let Err(e) = self.rl.save_history(&history_file) {
debug!(
"failed to save history file on {}, error: {e}",
history_file.display()
);
}
}
}
}
/// Return the location of the history file (defaults to $HOME/".greptimedb_cli_history")
fn history_file() -> PathBuf {
let mut buf = match std::env::var("HOME") {
Ok(home) => PathBuf::from(home),
Err(_) => PathBuf::new(),
};
buf.push(".greptimedb_cli_history");
buf
}
async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let mut meta_client = MetaClientBuilder::default().enable_store().build();
meta_client
.start([meta_addr])
.await
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.build(),
);
let information_extension = Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let catalog_manager = KvBackendCatalogManager::new(
information_extension,
cached_meta_backend.clone(),
layered_cache_registry,
None,
);
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_manager,
None,
None,
None,
None,
false,
plugins.clone(),
));
Ok(DatafusionQueryEngine::new(state, plugins))
}