diff --git a/Cargo.lock b/Cargo.lock index 2b9b8f6384..5bca4768e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,6 +1304,7 @@ dependencies = [ "arrow-flight", "async-stream", "common-base", + "common-catalog", "common-error", "common-grpc", "common-grpc-expr", @@ -6697,6 +6698,7 @@ name = "session" version = "0.1.0" dependencies = [ "arc-swap", + "common-catalog", "common-telemetry", ] @@ -6938,9 +6940,9 @@ dependencies = [ [[package]] name = "sqlness" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ffa69a2ae10018ec72a3cb7574e3a33a3fc322ed03740f6e435fd7f0c1db4a7" +checksum = "16a494ea677f9de93e8c25ec33b1073f8f72d61466d4595ecf1462ba877fe924" dependencies = [ "async-trait", "derive_builder 0.11.2", @@ -6962,7 +6964,10 @@ dependencies = [ "common-error", "common-grpc", "common-query", + "common-time", + "serde", "sqlness", + "tinytemplate", "tokio", ] diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 4d857aa561..368dde8ce3 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -32,7 +32,6 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::task::JoinSet; -const DATABASE_NAME: &str = "greptime"; const CATALOG_NAME: &str = "greptime"; const SCHEMA_NAME: &str = "public"; const TABLE_NAME: &str = "nyc_taxi"; @@ -100,7 +99,6 @@ async fn write_data( let record_batch = record_batch.unwrap(); let (columns, row_count) = convert_record_batch(record_batch); let request = InsertRequest { - schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), region_number: 0, columns, @@ -424,7 +422,7 @@ fn main() { .unwrap() .block_on(async { let client = Client::with_urls(vec![&args.endpoint]); - let db = Database::new(DATABASE_NAME, client); + let db = Database::with_client(client); if !args.skip_write { do_write(&args, &db).await; diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 56db558293..354cd9861e 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -5,11 +5,19 @@ package greptime.v1; import "greptime/v1/ddl.proto"; import "greptime/v1/column.proto"; +message RequestHeader { + // The `catalog` that is selected to be used in this request. + string catalog = 1; + // The `schema` that is selected to be used in this request. + string schema = 2; +} + message GreptimeRequest { + RequestHeader header = 1; oneof request { - InsertRequest insert = 1; - QueryRequest query = 2; - DdlRequest ddl = 3; + InsertRequest insert = 2; + QueryRequest query = 3; + DdlRequest ddl = 4; } } @@ -21,8 +29,7 @@ message QueryRequest { } message InsertRequest { - string schema_name = 1; - string table_name = 2; + string table_name = 1; // Data is represented here. repeated Column columns = 3; diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index ac83eb2610..bf56b1ead3 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -9,6 +9,7 @@ api = { path = "../api" } arrow-flight.workspace = true async-stream.workspace = true common-base = { path = "../common/base" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-grpc-expr = { path = "../common/grpc-expr" } diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 368104d52a..07debec679 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -65,13 +65,12 @@ async fn run() { region_ids: vec![0], }; - let db = Database::new("create table", client.clone()); + let db = Database::with_client(client); let result = db.create(create_table_expr).await.unwrap(); event!(Level::INFO, "create table result: {:#?}", result); let logical = mock_logical_plan(); event!(Level::INFO, "plan size: {:#?}", logical.len()); - let db = Database::new("greptime", client); let result = db.logical_plan(logical).await.unwrap(); event!(Level::INFO, "result: {:#?}", result); diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 956c1db5b0..bdf63b748e 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -19,9 +19,10 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ AlterExpr, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest, InsertRequest, - QueryRequest, + QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::*; use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage}; use common_query::Output; @@ -34,83 +35,89 @@ use crate::{error, Client, Result}; #[derive(Clone, Debug)] pub struct Database { - name: String, + // The "catalog" and "schema" to be used in processing the requests at the server side. + // They are the "hint" or "context", just like how the "database" in "USE" statement is treated in MySQL. + // They will be carried in the request header. + catalog: String, + schema: String, + client: Client, } impl Database { - pub fn new(name: impl Into, client: Client) -> Self { + pub fn new(catalog: impl Into, schema: impl Into, client: Client) -> Self { Self { - name: name.into(), + catalog: catalog.into(), + schema: schema.into(), client, } } - pub fn name(&self) -> &str { - &self.name + pub fn with_client(client: Client) -> Self { + Self::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client) + } + + pub fn set_schema(&mut self, schema: impl Into) { + self.schema = schema.into(); } pub async fn insert(&self, request: InsertRequest) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Insert(request)), - }) - .await + self.do_get(Request::Insert(request)).await } pub async fn sql(&self, sql: &str) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Query(QueryRequest { - query: Some(Query::Sql(sql.to_string())), - })), - }) + self.do_get(Request::Query(QueryRequest { + query: Some(Query::Sql(sql.to_string())), + })) .await } pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Query(QueryRequest { - query: Some(Query::LogicalPlan(logical_plan)), - })), - }) + self.do_get(Request::Query(QueryRequest { + query: Some(Query::LogicalPlan(logical_plan)), + })) .await } pub async fn create(&self, expr: CreateTableExpr) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(expr)), - })), - }) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(expr)), + })) .await } pub async fn alter(&self, expr: AlterExpr) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(expr)), - })), - }) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(expr)), + })) .await } pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - self.do_get(GreptimeRequest { - request: Some(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(expr)), - })), - }) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(expr)), + })) .await } - async fn do_get(&self, request: GreptimeRequest) -> Result { + async fn do_get(&self, request: Request) -> Result { + let request = GreptimeRequest { + header: Some(RequestHeader { + catalog: self.catalog.clone(), + schema: self.schema.clone(), + }), + request: Some(request), + }; + let request = Ticket { + ticket: request.encode_to_vec(), + }; + let mut client = self.client.make_client()?; // TODO(LFC): Streaming get flight data. let flight_data: Vec = client .mut_inner() - .do_get(Ticket { - ticket: request.encode_to_vec(), - }) + .do_get(request) .and_then(|response| response.into_inner().try_collect()) .await .map_err(|e| { diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index a7c726e2fb..52be21f279 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -13,7 +13,7 @@ // limitations under the License. use clap::Parser; -use common_telemetry::{info, logging}; +use common_telemetry::{info, logging, warn}; use meta_srv::bootstrap; use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; @@ -58,6 +58,8 @@ struct StartCommand { config_file: Option, #[clap(short, long)] selector: Option, + #[clap(long)] + use_memory_store: bool, } impl StartCommand { @@ -100,6 +102,11 @@ impl TryFrom for MetaSrvOptions { info!("Using {} selector", selector_type); } + if cmd.use_memory_store { + warn!("Using memory store for Meta. Make sure you are in running tests."); + opts.use_memory_store = true; + } + Ok(opts) } } @@ -118,6 +125,7 @@ mod tests { store_addr: Some("127.0.0.1:2380".to_string()), config_file: None, selector: Some("LoadBased".to_string()), + use_memory_store: false, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); @@ -137,6 +145,7 @@ mod tests { "{}/../../config/metasrv.example.toml", std::env::current_dir().unwrap().as_path().to_str().unwrap() )), + use_memory_store: false, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 0c40e9f959..16fc2ca86c 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -29,16 +29,8 @@ use crate::error::{ /// Convert an [`AlterExpr`] to an [`AlterTableRequest`] pub fn alter_expr_to_request(expr: AlterExpr) -> Result { - let catalog_name = if expr.catalog_name.is_empty() { - None - } else { - Some(expr.catalog_name) - }; - let schema_name = if expr.schema_name.is_empty() { - None - } else { - Some(expr.schema_name) - }; + let catalog_name = expr.catalog_name; + let schema_name = expr.schema_name; let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?; match kind { Kind::AddColumns(add_columns) => { @@ -219,8 +211,8 @@ mod tests { }; let alter_request = alter_expr_to_request(expr).unwrap(); - assert_eq!(None, alter_request.catalog_name); - assert_eq!(None, alter_request.schema_name); + assert_eq!(alter_request.catalog_name, ""); + assert_eq!(alter_request.schema_name, ""); assert_eq!("monitor".to_string(), alter_request.table_name); let add_column = match alter_request.alter_kind { AlterKind::AddColumns { mut columns } => columns.pop().unwrap(), @@ -250,8 +242,8 @@ mod tests { }; let alter_request = alter_expr_to_request(expr).unwrap(); - assert_eq!(Some("test_catalog".to_string()), alter_request.catalog_name); - assert_eq!(Some("test_schema".to_string()), alter_request.schema_name); + assert_eq!(alter_request.catalog_name, "test_catalog"); + assert_eq!(alter_request.schema_name, "test_schema"); assert_eq!("monitor".to_string(), alter_request.table_name); let mut drop_names = match alter_request.alter_kind { diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 959f1a5a40..4295ec6f7c 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -21,7 +21,6 @@ use api::v1::{ InsertRequest as GrpcInsertRequest, }; use common_base::BitVec; -use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_time::timestamp::Timestamp; use common_time::{Date, DateTime}; use datatypes::data_type::{ConcreteDataType, DataType}; @@ -31,7 +30,7 @@ use datatypes::value::Value; use datatypes::vectors::MutableVector; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; -use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; +use table::requests::InsertRequest; use crate::error::{ ColumnDataTypeSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, @@ -81,20 +80,6 @@ pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result