From af935671b24a64faca98b2394b362af7f000a3a3 Mon Sep 17 00:00:00 2001 From: LFC Date: Thu, 2 Feb 2023 20:02:56 +0800 Subject: [PATCH] feat: support "use" in GRPC requests (#922) * feat: support "use catalog and schema"(behave like the "use" in MySQL) in GRPC requests * fix: rebase develop --- Cargo.lock | 9 +- benchmarks/src/bin/nyc-taxi.rs | 4 +- src/api/greptime/v1/database.proto | 17 +- src/client/Cargo.toml | 1 + src/client/examples/logical.rs | 3 +- src/client/src/database.rs | 85 +++---- src/cmd/src/metasrv.rs | 11 +- src/common/grpc-expr/src/alter.rs | 20 +- src/common/grpc-expr/src/insert.rs | 28 +-- src/common/grpc-expr/src/lib.rs | 4 +- src/datanode/src/error.rs | 5 +- src/datanode/src/instance.rs | 2 +- src/datanode/src/instance/grpc.rs | 169 +++++++------- src/datanode/src/instance/sql.rs | 44 ++-- src/datanode/src/sql.rs | 27 ++- src/datanode/src/sql/alter.rs | 29 +-- src/datanode/src/sql/create.rs | 12 +- src/datanode/src/sql/drop_table.rs | 9 - src/datanode/src/tests/instance_test.rs | 5 +- src/datanode/src/tests/promql_test.rs | 8 +- src/frontend/src/error.rs | 7 + src/frontend/src/instance.rs | 117 +++++----- src/frontend/src/instance/distributed.rs | 31 ++- src/frontend/src/instance/distributed/grpc.rs | 9 +- src/frontend/src/instance/grpc.rs | 209 ++++++++---------- src/frontend/src/instance/influxdb.rs | 12 +- src/frontend/src/instance/opentsdb.rs | 3 +- src/frontend/src/instance/prometheus.rs | 41 ++-- src/frontend/src/instance/standalone.rs | 6 +- src/frontend/src/table.rs | 19 +- src/frontend/src/table/insert.rs | 7 +- src/meta-srv/src/bootstrap.rs | 15 +- src/meta-srv/src/metasrv.rs | 2 + src/mito/src/engine.rs | 24 +- src/query/src/datafusion/planner.rs | 3 +- src/query/src/query_engine/state.rs | 17 +- src/query/src/sql.rs | 165 +------------- src/servers/src/grpc/flight.rs | 26 ++- src/servers/src/http.rs | 11 +- src/servers/src/http/influxdb.rs | 14 +- src/servers/src/http/prometheus.rs | 24 +- src/servers/src/influxdb.rs | 6 - src/servers/src/mysql/federated.rs | 4 +- src/servers/src/opentsdb/codec.rs | 3 - src/servers/src/prometheus.rs | 37 +--- src/servers/src/query_handler.rs | 7 +- src/servers/src/query_handler/grpc.rs | 13 +- src/servers/tests/http/influxdb_test.rs | 4 +- src/servers/tests/http/prometheus_test.rs | 8 +- src/session/Cargo.toml | 1 + src/session/src/context.rs | 49 ++-- src/sql/src/parser.rs | 45 ++-- src/sql/src/statements/describe.rs | 27 +-- src/sql/src/statements/drop.rs | 18 +- src/table/src/engine.rs | 1 + src/table/src/requests.rs | 4 +- tests-integration/tests/grpc.rs | 12 +- .../standalone/aggregate/distinct.result | 61 ++--- tests/cases/standalone/aggregate/distinct.sql | 24 +- tests/cases/standalone/aggregate/sum.result | 5 + tests/cases/standalone/aggregate/sum.sql | 2 + tests/cases/standalone/alter/add_col.result | 19 +- tests/cases/standalone/alter/add_col.sql | 14 +- .../standalone/alter/rename_table.result | 17 +- tests/cases/standalone/alter/rename_table.sql | 4 + tests/cases/standalone/catalog/schema.result | 97 ++++---- tests/cases/standalone/catalog/schema.sql | 30 +-- .../standalone/insert/insert_invalid.result | 23 +- .../standalone/insert/insert_invalid.sql | 12 +- tests/cases/standalone/limit/limit.result | 5 + tests/cases/standalone/limit/limit.sql | 2 + .../order/order_variable_size_payload.result | 11 +- .../order/order_variable_size_payload.sql | 6 +- tests/conf/standalone-test.toml.template | 18 ++ tests/runner/Cargo.toml | 5 +- tests/runner/src/env.rs | 72 ++++-- 76 files changed, 957 insertions(+), 963 deletions(-) create mode 100644 tests/conf/standalone-test.toml.template diff --git a/Cargo.lock b/Cargo.lock index 2b9b8f6384..5bca4768e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,6 +1304,7 @@ dependencies = [ "arrow-flight", "async-stream", "common-base", + "common-catalog", "common-error", "common-grpc", "common-grpc-expr", @@ -6697,6 +6698,7 @@ name = "session" version = "0.1.0" dependencies = [ "arc-swap", + "common-catalog", "common-telemetry", ] @@ -6938,9 +6940,9 @@ dependencies = [ [[package]] name = "sqlness" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ffa69a2ae10018ec72a3cb7574e3a33a3fc322ed03740f6e435fd7f0c1db4a7" +checksum = "16a494ea677f9de93e8c25ec33b1073f8f72d61466d4595ecf1462ba877fe924" dependencies = [ "async-trait", "derive_builder 0.11.2", @@ -6962,7 +6964,10 @@ dependencies = [ "common-error", "common-grpc", "common-query", + "common-time", + "serde", "sqlness", + "tinytemplate", "tokio", ] diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 4d857aa561..368dde8ce3 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -32,7 +32,6 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::task::JoinSet; -const DATABASE_NAME: &str = "greptime"; const CATALOG_NAME: &str = "greptime"; const SCHEMA_NAME: &str = "public"; const TABLE_NAME: &str = "nyc_taxi"; @@ -100,7 +99,6 @@ async fn write_data( let record_batch = record_batch.unwrap(); let (columns, row_count) = convert_record_batch(record_batch); let request = InsertRequest { - schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), region_number: 0, columns, @@ -424,7 +422,7 @@ fn main() { .unwrap() .block_on(async { let client = Client::with_urls(vec![&args.endpoint]); - let db = Database::new(DATABASE_NAME, client); + let db = Database::with_client(client); if !args.skip_write { do_write(&args, &db).await; diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 56db558293..354cd9861e 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -5,11 +5,19 @@ package greptime.v1; import "greptime/v1/ddl.proto"; import "greptime/v1/column.proto"; +message RequestHeader { + // The `catalog` that is selected to be used in this request. + string catalog = 1; + // The `schema` that is selected to be used in this request. + string schema = 2; +} + message GreptimeRequest { + RequestHeader header = 1; oneof request { - InsertRequest insert = 1; - QueryRequest query = 2; - DdlRequest ddl = 3; + InsertRequest insert = 2; + QueryRequest query = 3; + DdlRequest ddl = 4; } } @@ -21,8 +29,7 @@ message QueryRequest { } message InsertRequest { - string schema_name = 1; - string table_name = 2; + string table_name = 1; // Data is represented here. repeated Column columns = 3; diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index ac83eb2610..bf56b1ead3 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -9,6 +9,7 @@ api = { path = "../api" } arrow-flight.workspace = true async-stream.workspace = true common-base = { path = "../common/base" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-grpc-expr = { path = "../common/grpc-expr" } diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 368104d52a..07debec679 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -65,13 +65,12 @@ async fn run() { region_ids: vec![0], }; - let db = Database::new("create table", client.clone()); + let db = Database::with_client(client); let result = db.create(create_table_expr).await.unwrap(); event!(Level::INFO, "create table result: {:#?}", result); let logical = mock_logical_plan(); event!(Level::INFO, "plan size: {:#?}", logical.len()); - let db = Database::new("greptime", client); let result = db.logical_plan(logical).await.unwrap(); event!(Level::INFO, "result: {:#?}", result); diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 956c1db5b0..bdf63b748e 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -19,9 +19,10 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ AlterExpr, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest, InsertRequest, - QueryRequest, + QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::*; use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage}; use common_query::Output; @@ -34,83 +35,89 @@ use crate::{error, Client, Result}; #[derive(Clone, Debug)] pub struct Database { - name: String, + // The "catalog" and "schema" to be used in processing the requests at the server side. + // They are the "hint" or "context", just like how the "database" in "USE" statement is treated in MySQL. + // They will be carried in the request header. + catalog: String, + schema: String, + client: Client, } impl Database { - pub fn new(name: impl Into, client: Client) -> Self { + pub fn new(catalog: impl Into, schema: impl Into, client: Client) -> Self { Self { - name: name.into(), + catalog: catalog.into(), + schema: schema.into(), client, } } - pub fn name(&self) -> &str { - &self.name + pub fn with_client(client: Client) -> Self { + Self::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client) + } + + pub fn set_schema(&mut self, schema: impl Into) { + self.schema = schema.into(); } pub async fn insert(&self, request: InsertRequest) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Insert(request)), - }) - .await + self.do_get(Request::Insert(request)).await } pub async fn sql(&self, sql: &str) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Query(QueryRequest { - query: Some(Query::Sql(sql.to_string())), - })), - }) + self.do_get(Request::Query(QueryRequest { + query: Some(Query::Sql(sql.to_string())), + })) .await } pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Query(QueryRequest { - query: Some(Query::LogicalPlan(logical_plan)), - })), - }) + self.do_get(Request::Query(QueryRequest { + query: Some(Query::LogicalPlan(logical_plan)), + })) .await } pub async fn create(&self, expr: CreateTableExpr) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(expr)), - })), - }) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(expr)), + })) .await } pub async fn alter(&self, expr: AlterExpr) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(expr)), - })), - }) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(expr)), + })) .await } pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(expr)), - })), - }) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(expr)), + })) .await } - async fn do_get(&self, request: GreptimeRequest) -> Result { + async fn do_get(&self, request: Request) -> Result { + let request = GreptimeRequest { + header: Some(RequestHeader { + catalog: self.catalog.clone(), + schema: self.schema.clone(), + }), + request: Some(request), + }; + let request = Ticket { + ticket: request.encode_to_vec(), + }; + let mut client = self.client.make_client()?; // TODO(LFC): Streaming get flight data. let flight_data: Vec = client .mut_inner() - .do_get(Ticket { - ticket: request.encode_to_vec(), - }) + .do_get(request) .and_then(|response| response.into_inner().try_collect()) .await .map_err(|e| { diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index a7c726e2fb..52be21f279 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -13,7 +13,7 @@ // limitations under the License. use clap::Parser; -use common_telemetry::{info, logging}; +use common_telemetry::{info, logging, warn}; use meta_srv::bootstrap; use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; @@ -58,6 +58,8 @@ struct StartCommand { config_file: Option, #[clap(short, long)] selector: Option, + #[clap(long)] + use_memory_store: bool, } impl StartCommand { @@ -100,6 +102,11 @@ impl TryFrom for MetaSrvOptions { info!("Using {} selector", selector_type); } + if cmd.use_memory_store { + warn!("Using memory store for Meta. Make sure you are in running tests."); + opts.use_memory_store = true; + } + Ok(opts) } } @@ -118,6 +125,7 @@ mod tests { store_addr: Some("127.0.0.1:2380".to_string()), config_file: None, selector: Some("LoadBased".to_string()), + use_memory_store: false, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); @@ -137,6 +145,7 @@ mod tests { "{}/../../config/metasrv.example.toml", std::env::current_dir().unwrap().as_path().to_str().unwrap() )), + use_memory_store: false, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 0c40e9f959..16fc2ca86c 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -29,16 +29,8 @@ use crate::error::{ /// Convert an [`AlterExpr`] to an [`AlterTableRequest`] pub fn alter_expr_to_request(expr: AlterExpr) -> Result { - let catalog_name = if expr.catalog_name.is_empty() { - None - } else { - Some(expr.catalog_name) - }; - let schema_name = if expr.schema_name.is_empty() { - None - } else { - Some(expr.schema_name) - }; + let catalog_name = expr.catalog_name; + let schema_name = expr.schema_name; let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?; match kind { Kind::AddColumns(add_columns) => { @@ -219,8 +211,8 @@ mod tests { }; let alter_request = alter_expr_to_request(expr).unwrap(); - assert_eq!(None, alter_request.catalog_name); - assert_eq!(None, alter_request.schema_name); + assert_eq!(alter_request.catalog_name, ""); + assert_eq!(alter_request.schema_name, ""); assert_eq!("monitor".to_string(), alter_request.table_name); let add_column = match alter_request.alter_kind { AlterKind::AddColumns { mut columns } => columns.pop().unwrap(), @@ -250,8 +242,8 @@ mod tests { }; let alter_request = alter_expr_to_request(expr).unwrap(); - assert_eq!(Some("test_catalog".to_string()), alter_request.catalog_name); - assert_eq!(Some("test_schema".to_string()), alter_request.schema_name); + assert_eq!(alter_request.catalog_name, "test_catalog"); + assert_eq!(alter_request.schema_name, "test_schema"); assert_eq!("monitor".to_string(), alter_request.table_name); let mut drop_names = match alter_request.alter_kind { diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 959f1a5a40..4295ec6f7c 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -21,7 +21,6 @@ use api::v1::{ InsertRequest as GrpcInsertRequest, }; use common_base::BitVec; -use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_time::timestamp::Timestamp; use common_time::{Date, DateTime}; use datatypes::data_type::{ConcreteDataType, DataType}; @@ -31,7 +30,7 @@ use datatypes::value::Value; use datatypes::vectors::MutableVector; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; -use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; +use table::requests::InsertRequest; use crate::error::{ ColumnDataTypeSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, @@ -81,20 +80,6 @@ pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result