From 81eab74b904741f57e476a77200e04101173f535 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 24 Feb 2023 11:06:14 +0800 Subject: [PATCH] refactor: remove grpc client constructor with default catalog/schema (#1060) * refactor: remove grpc client with default catalog/schema * refactor: re-export consts in client module --- benchmarks/src/bin/nyc-taxi.rs | 4 ++-- src/client/examples/logical.rs | 3 ++- src/client/src/database.rs | 5 ++--- src/client/src/lib.rs | 1 + src/frontend/src/instance/distributed.rs | 2 +- src/frontend/src/table.rs | 2 +- src/servers/tests/grpc/mod.rs | 4 ++-- tests-integration/tests/grpc.rs | 8 ++++---- tests/runner/src/env.rs | 8 +++++--- 9 files changed, 20 insertions(+), 17 deletions(-) diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 368dde8ce3..62de8bc0eb 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch; use clap::Parser; use client::api::v1::column::Values; use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId}; -use client::{Client, Database}; +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::task::JoinSet; @@ -422,7 +422,7 @@ fn main() { .unwrap() .block_on(async { let client = Client::with_urls(vec![&args.endpoint]); - let db = Database::with_client(client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); if !args.skip_write { do_write(&args, &db).await; diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 07debec679..e3cd1f7747 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -14,6 +14,7 @@ use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId}; use client::{Client, Database}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use prost_09::Message; use substrait_proto::protobuf::plan_rel::RelType as PlanRelType; use substrait_proto::protobuf::read_rel::{NamedTable, ReadType}; @@ -65,7 +66,7 @@ async fn run() { region_ids: vec![0], }; - let db = Database::with_client(client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); let result = db.create(create_table_expr).await.unwrap(); event!(Level::INFO, "create table result: {:#?}", result); diff --git a/src/client/src/database.rs b/src/client/src/database.rs index efc1eacfa8..7e0505dd04 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -23,7 +23,6 @@ use api::v1::{ InsertRequest, QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::*; use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage}; use common_query::Output; @@ -56,8 +55,8 @@ impl Database { } } - pub fn with_client(client: Client) -> Self { - Self::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client) + pub fn set_catalog(&mut self, catalog: impl Into) { + self.catalog = catalog.into(); } pub fn set_schema(&mut self, schema: impl Into) { diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index fbee1356d9..2b8942e21f 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -18,6 +18,7 @@ mod error; pub mod load_balance; pub use api; +pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; pub use self::client::Client; pub use self::database::Database; diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 0394c3d914..8ab2179842 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -186,7 +186,7 @@ impl DistInstance { for datanode in table_route.find_leaders() { let client = self.datanode_clients.get_client(&datanode).await; - let client = Database::with_client(client); + let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client); let regions = table_route.find_leader_regions(&datanode); let mut create_expr_for_region = create_table.clone(); diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 65a18ef615..a264182257 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -258,7 +258,7 @@ impl DistTable { ); for datanode in leaders { let client = self.datanode_clients.get_client(&datanode).await; - let db = Database::with_client(client); + let db = Database::new(&expr.catalog_name, &expr.schema_name, client); debug!("Sending {:?} to {:?}", expr, db); let result = db .alter(expr.clone()) diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index c2b943dfba..e4a6606b63 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -19,7 +19,7 @@ use api::v1::auth_header::AuthScheme; use api::v1::Basic; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; -use client::{Client, Database}; +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::auth::UserProviderRef; use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu}; @@ -126,7 +126,7 @@ async fn test_grpc_query() { assert!(re.is_ok()); let grpc_client = Client::with_urls(vec![re.unwrap().to_string()]); - let mut db = Database::with_client(grpc_client); + let mut db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); let re = db.sql("select * from numbers").await; assert!(re.is_err()); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 15e2687b42..77ac062ef9 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -17,8 +17,8 @@ use api::v1::{ column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId, }; -use client::{Client, Database}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::MIN_USER_TABLE_ID; use common_query::Output; use servers::server::Server; use tests_integration::test_util::{setup_grpc_server, StorageType}; @@ -65,7 +65,7 @@ pub async fn test_auto_create_table(store_type: StorageType) { setup_grpc_server(store_type, "auto_create_table").await; let grpc_client = Client::with_urls(vec![addr]); - let db = Database::with_client(grpc_client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); insert_and_assert(&db).await; let _ = fe_grpc_server.shutdown().await; guard.remove_all().await; @@ -131,7 +131,7 @@ pub async fn test_insert_and_select(store_type: StorageType) { setup_grpc_server(store_type, "insert_and_select").await; let grpc_client = Client::with_urls(vec![addr]); - let db = Database::with_client(grpc_client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); // create let expr = testing_create_expr(); diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 98b2142fc0..840ec5a9b8 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -19,7 +19,9 @@ use std::process::Stdio; use std::time::Duration; use async_trait::async_trait; -use client::{Client, Database as DB, Error as ClientError}; +use client::{ + Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; use common_error::ext::ErrorExt; use common_error::snafu::ErrorCompat; use common_query::Output; @@ -110,7 +112,7 @@ impl Env { println!("Started, going to test. Log will be write to {SERVER_LOG_FILE}"); let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::with_client(client); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { server_process, @@ -180,7 +182,7 @@ impl Env { } let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::with_client(client); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { server_process: frontend,