diff --git a/Cargo.lock b/Cargo.lock index 1a3c89a186..b8947d5c2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -767,7 +767,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 2.0.28", @@ -1475,7 +1475,7 @@ version = "3.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro-error", "proc-macro2", "quote", @@ -1488,7 +1488,7 @@ version = "4.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 2.0.28", @@ -1530,6 +1530,7 @@ dependencies = [ "datanode", "datatypes", "derive-new", + "derive_builder 0.12.0", "enum_dispatch", "futures-util", "moka 0.9.9", @@ -1689,7 +1690,7 @@ dependencies = [ "paste", "regex", "snafu", - "strum 0.21.0", + "strum 0.25.0", "tokio", "tokio-util", "url", @@ -1700,7 +1701,7 @@ name = "common-error" version = "0.4.0-nightly" dependencies = [ "snafu", - "strum 0.24.1", + "strum 0.25.0", ] [[package]] @@ -4153,11 +4154,13 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=39b0ea8d086d0ab762046b0f473aa3ef8bd347f9#39b0ea8d086d0ab762046b0f473aa3ef8bd347f9" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc#ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc" dependencies = [ "prost", "serde", "serde_json", + "strum 0.25.0", + "strum_macros 0.25.2", "tonic 0.9.2", "tonic-build", ] @@ -4273,15 +4276,6 @@ dependencies = [ "http", ] -[[package]] -name = "heck" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "heck" version = "0.4.1" @@ -5345,6 +5339,7 @@ dependencies = [ "session", "snafu", "store-api", + "strum 0.25.0", "table", "tokio", "tokio-stream", @@ -5574,7 +5569,7 @@ dependencies = [ "snafu", "storage", "store-api", - "strum 0.21.0", + "strum 0.25.0", "table", "tokio", "tokio-util", @@ -5654,7 +5649,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f" dependencies = [ "darling 0.20.3", - "heck 0.4.1", + "heck", "num-bigint", "proc-macro-crate 1.3.1", "proc-macro-error", @@ -7098,7 +7093,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", - "heck 0.4.1", + "heck", "itertools 0.10.5", "lazy_static", "log", @@ -8918,7 +8913,7 @@ dependencies = [ "snafu", "snap", "sql", - "strum 0.24.1", + "strum 0.25.0", "table", "tikv-jemalloc-ctl", "tokio", @@ -9140,7 +9135,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 1.0.109", @@ -9403,7 +9398,7 @@ checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9" dependencies = [ "dotenvy", "either", - "heck 0.4.1", + "heck", "once_cell", "proc-macro2", "quote", @@ -9608,23 +9603,11 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" -[[package]] -name = "strum" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2" -dependencies = [ - "strum_macros 0.21.1", -] - [[package]] name = "strum" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" -dependencies = [ - "strum_macros 0.24.3", -] [[package]] name = "strum" @@ -9635,25 +9618,13 @@ dependencies = [ "strum_macros 0.25.2", ] -[[package]] -name = "strum_macros" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "strum_macros" version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "rustversion", @@ -9666,7 +9637,7 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "rustversion", @@ -9716,7 +9687,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3ae64fb7ad0670c7d6d53d57b1b91beb2212afc30e164cc8edb02d6b2cff32a" dependencies = [ "gix", - "heck 0.4.1", + "heck", "prettyplease 0.2.12", "prost", "prost-build", @@ -9738,7 +9709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658f6cbbd29a250869b87e1bb5a4b42db534cacfc1c03284f2536cd36b6c1617" dependencies = [ "git2", - "heck 0.4.1", + "heck", "prettyplease 0.2.12", "prost", "prost-build", @@ -10777,7 +10748,7 @@ version = "0.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95d27d749378ceab6ec22188ed7ad102205c89ddb92ab662371c850ffc71aa1a" dependencies = [ - "heck 0.4.1", + "heck", "log", "proc-macro2", "quote", @@ -10795,7 +10766,7 @@ version = "0.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8d9ecedde2fd77e975c38eeb9ca40b34ad0247b2259c6e6bbd2a8d6cc2444f" dependencies = [ - "heck 0.4.1", + "heck", "log", "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index d81105b40f..bfbc1049d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "39b0ea8d086d0ab762046b0f473aa3ef8bd347f9" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc" } itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" @@ -93,6 +93,7 @@ snafu = { version = "0.7", features = ["backtraces"] } sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4", features = [ "visitor", ] } +strum = { version = "0.25", features = ["derive"] } tempfile = "3" tokio = { version = "1.28", features = ["full"] } tokio-util = { version = "0.7", features = ["io-util", "compat"] } diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 20c27c82aa..1f91313b60 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -37,7 +37,6 @@ use greptime_proto::v1; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; -use greptime_proto::v1::region::region_request; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest, SemanticType}; use snafu::prelude::*; @@ -333,21 +332,6 @@ fn query_request_type(request: &QueryRequest) -> &'static str { } } -/// Returns the type name of the [RegionRequest]. -pub fn region_request_type(request: ®ion_request::Body) -> &'static str { - match request { - region_request::Body::Inserts(_) => "region.inserts", - region_request::Body::Deletes(_) => "region.deletes", - region_request::Body::Create(_) => "region.create", - region_request::Body::Drop(_) => "region.drop", - region_request::Body::Open(_) => "region.open", - region_request::Body::Close(_) => "region.close", - region_request::Body::Alter(_) => "region.alter", - region_request::Body::Flush(_) => "region.flush", - region_request::Body::Compact(_) => "region.compact", - } -} - /// Returns the type name of the [DdlRequest]. fn ddl_request_type(request: &DdlRequest) -> &'static str { match request.expr { diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index a0b0f99e94..aa37d26a3d 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -22,6 +22,7 @@ common-telemetry = { workspace = true } common-time = { workspace = true } datafusion.workspace = true datatypes = { workspace = true } +derive_builder.workspace = true enum_dispatch = "0.3" futures-util.workspace = true moka = { version = "0.9", features = ["future"] } diff --git a/src/client/src/client.rs b/src/client/src/client.rs index f5a686cc02..2af1d8ae8f 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::health_check_client::HealthCheckClient; use api::v1::prometheus_gateway_client::PrometheusGatewayClient; +use api::v1::region::region_client::RegionClient as PbRegionClient; use api::v1::HealthCheckRequest; use arrow_flight::flight_service_client::FlightServiceClient; use common_grpc::channel_manager::ChannelManager; @@ -82,11 +83,6 @@ impl Client { Default::default() } - pub fn with_manager(channel_manager: ChannelManager) -> Self { - let inner = Arc::new(Inner::with_manager(channel_manager)); - Self { inner } - } - pub fn with_urls(urls: A) -> Self where U: AsRef, @@ -157,6 +153,11 @@ impl Client { }) } + pub(crate) fn raw_region_client(&self) -> Result> { + let (_, channel) = self.find_channel()?; + Ok(PbRegionClient::new(channel)) + } + pub fn make_prometheus_gateway_client(&self) -> Result> { let (_, channel) = self.find_channel()?; Ok(PrometheusGatewayClient::new(channel)) diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 45ae26440b..23a67ebae1 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -18,6 +18,7 @@ mod database; pub mod error; pub mod load_balance; mod metrics; +pub mod region; mod stream_insert; pub use api; diff --git a/src/client/src/metrics.rs b/src/client/src/metrics.rs index 49ba8bf341..06a2bb791f 100644 --- a/src/client/src/metrics.rs +++ b/src/client/src/metrics.rs @@ -25,3 +25,4 @@ pub const METRIC_GRPC_FLUSH_TABLE: &str = "grpc.flush_table"; pub const METRIC_GRPC_COMPACT_TABLE: &str = "grpc.compact_table"; pub const METRIC_GRPC_TRUNCATE_TABLE: &str = "grpc.truncate_table"; pub const METRIC_GRPC_DO_GET: &str = "grpc.do_get"; +pub(crate) const METRIC_REGION_REQUEST_GRPC: &str = "grpc.region_request"; diff --git a/src/client/src/region.rs b/src/client/src/region.rs new file mode 100644 index 0000000000..b45937edeb --- /dev/null +++ b/src/client/src/region.rs @@ -0,0 +1,146 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::{region_request, RegionRequest, RegionRequestHeader, RegionResponse}; +use api::v1::ResponseHeader; +use common_error::status_code::StatusCode; +use common_telemetry::timer; +use snafu::OptionExt; + +use crate::error::{IllegalDatabaseResponseSnafu, Result, ServerSnafu}; +use crate::{metrics, Client}; + +type AffectedRows = u64; + +#[derive(Debug)] +pub struct RegionRequester { + trace_id: Option, + span_id: Option, + client: Client, +} + +impl RegionRequester { + pub fn new(client: Client) -> Self { + // TODO(LFC): Pass in trace_id and span_id from some context when we have it. + Self { + trace_id: None, + span_id: None, + client, + } + } + + pub async fn handle(self, request: region_request::Body) -> Result { + let request_type = request.as_ref().to_string(); + + let request = RegionRequest { + header: Some(RegionRequestHeader { + trace_id: self.trace_id, + span_id: self.span_id, + }), + body: Some(request), + }; + + let _timer = timer!( + metrics::METRIC_REGION_REQUEST_GRPC, + &[("request_type", request_type)] + ); + + let mut client = self.client.raw_region_client()?; + + let RegionResponse { + header, + affected_rows, + } = client.handle(request).await?.into_inner(); + + check_response_header(header)?; + + Ok(affected_rows) + } +} + +fn check_response_header(header: Option) -> Result<()> { + let status = header + .and_then(|header| header.status) + .context(IllegalDatabaseResponseSnafu { + err_msg: "either response header or status is missing", + })?; + + if StatusCode::is_success(status.status_code) { + Ok(()) + } else { + let code = + StatusCode::from_u32(status.status_code).context(IllegalDatabaseResponseSnafu { + err_msg: format!("unknown server status: {:?}", status), + })?; + ServerSnafu { + code, + msg: status.err_msg, + } + .fail() + } +} + +#[cfg(test)] +mod test { + use api::v1::Status as PbStatus; + + use super::*; + use crate::Error::{IllegalDatabaseResponse, Server}; + + #[test] + fn test_check_response_header() { + let result = check_response_header(None); + assert!(matches!( + result.unwrap_err(), + IllegalDatabaseResponse { .. } + )); + + let result = check_response_header(Some(ResponseHeader { status: None })); + assert!(matches!( + result.unwrap_err(), + IllegalDatabaseResponse { .. } + )); + + let result = check_response_header(Some(ResponseHeader { + status: Some(PbStatus { + status_code: StatusCode::Success as u32, + err_msg: "".to_string(), + }), + })); + assert!(result.is_ok()); + + let result = check_response_header(Some(ResponseHeader { + status: Some(PbStatus { + status_code: u32::MAX, + err_msg: "".to_string(), + }), + })); + assert!(matches!( + result.unwrap_err(), + IllegalDatabaseResponse { .. } + )); + + let result = check_response_header(Some(ResponseHeader { + status: Some(PbStatus { + status_code: StatusCode::Internal as u32, + err_msg: "blabla".to_string(), + }), + })); + let Server { code, msg } = result.unwrap_err() else { + unreachable!() + }; + assert_eq!(code, StatusCode::Internal); + assert_eq!(msg, "blabla"); + } +} diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 3978e3aba8..59037d7702 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -35,6 +35,12 @@ pub const INFORMATION_SCHEMA_TABLES_TABLE_ID: u32 = 3; pub const INFORMATION_SCHEMA_COLUMNS_TABLE_ID: u32 = 4; pub const MITO_ENGINE: &str = "mito"; +pub const MITO2_ENGINE: &str = "mito2"; + +pub fn default_engine() -> &'static str { + MITO_ENGINE +} + pub const IMMUTABLE_FILE_ENGINE: &str = "file"; pub const SEMANTIC_TYPE_PRIMARY_KEY: &str = "TAG"; diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index f67ca0a7cd..48e52b4373 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -27,7 +27,7 @@ orc-rust = "0.2" paste = "1.0" regex = "1.7" snafu.workspace = true -strum = { version = "0.21", features = ["derive"] } +strum.workspace = true tokio-util.workspace = true tokio.workspace = true url = "2.3" diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml index e7c6abfc1a..28ce054822 100644 --- a/src/common/error/Cargo.toml +++ b/src/common/error/Cargo.toml @@ -6,4 +6,4 @@ license.workspace = true [dependencies] snafu = { version = "0.7", features = ["backtraces"] } -strum = { version = "0.24", features = ["std", "derive"] } +strum.workspace = true diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 12ee2c01f9..c91832eee5 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -38,7 +38,7 @@ use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; use common_base::Plugins; -use common_catalog::consts::MITO_ENGINE; +use common_catalog::consts::default_engine; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -213,7 +213,6 @@ impl Instance { let create_expr_factory = CreateExprFactory; let row_inserter = Arc::new(RowInserter::new( - MITO_ENGINE.to_string(), catalog_manager.clone(), create_expr_factory, dist_instance.clone(), @@ -286,7 +285,6 @@ impl Instance { let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone()); let row_inserter = Arc::new(RowInserter::new( - MITO_ENGINE.to_string(), catalog_manager.clone(), create_expr_factory, grpc_query_handler.clone(), @@ -366,7 +364,7 @@ impl Instance { catalog_name, schema_name, table_name, ); let _ = self - .create_table_by_columns(ctx, table_name, columns, MITO_ENGINE) + .create_table_by_columns(ctx, table_name, columns, default_engine()) .await?; info!( "Successfully created table on insertion: {}.{}.{}", diff --git a/src/frontend/src/row_inserter.rs b/src/frontend/src/row_inserter.rs index d83af0bd65..803ca52dc5 100644 --- a/src/frontend/src/row_inserter.rs +++ b/src/frontend/src/row_inserter.rs @@ -17,6 +17,7 @@ use api::v1::ddl_request::Expr; use api::v1::greptime_request::Request; use api::v1::{AlterExpr, ColumnSchema, DdlRequest, Row, RowInsertRequest, RowInsertRequests}; use catalog::CatalogManagerRef; +use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_query::Output; use common_telemetry::info; @@ -30,7 +31,6 @@ use crate::error::{CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertio use crate::expr_factory::CreateExprFactory; pub struct RowInserter { - engine_name: String, catalog_manager: CatalogManagerRef, create_expr_factory: CreateExprFactory, grpc_query_handler: GrpcQueryHandlerRef, @@ -38,13 +38,11 @@ pub struct RowInserter { impl RowInserter { pub fn new( - engine_name: String, catalog_manager: CatalogManagerRef, create_expr_factory: CreateExprFactory, grpc_query_handler: GrpcQueryHandlerRef, ) -> Self { Self { - engine_name, catalog_manager, create_expr_factory, grpc_query_handler, @@ -105,7 +103,7 @@ impl RowInserter { let (column_schemas, _) = extract_schema_and_rows(req)?; let create_table_expr = self .create_expr_factory - .create_table_expr_by_column_schemas(table_name, column_schemas, &self.engine_name)?; + .create_table_expr_by_column_schemas(table_name, column_schemas, default_engine())?; let req = Request::Ddl(DdlRequest { expr: Some(Expr::CreateTable(create_table_expr)), diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 488c28fabe..0942583b38 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -44,6 +44,7 @@ serde_json = "1.0" servers = { workspace = true } snafu.workspace = true store-api = { workspace = true } +strum.workspace = true table = { workspace = true } tokio-stream = { version = "0.1", features = ["net"] } tokio.workspace = true @@ -56,6 +57,7 @@ uuid.workspace = true [dev-dependencies] chrono.workspace = true +client = { workspace = true, features = ["testing"] } common-procedure-test = { workspace = true } session = { workspace = true } tracing = "0.1" diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index eaafae1a40..f608bd90b1 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -516,6 +516,9 @@ pub enum Error { operation: String, location: Location, }, + + #[snafu(display("Primary key '{key}' not found when creating region request, at {location}"))] + PrimaryKeyNotFound { key: String, location: Location }, } pub type Result = std::result::Result; @@ -570,6 +573,7 @@ impl ErrorExt for Error { | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } | Error::InvalidHeartbeatRequest { .. } + | Error::PrimaryKeyNotFound { .. } | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 0cb93e47b4..98e0d62d3c 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -19,7 +19,4 @@ pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute"; -pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META: &str = - "meta.procedure.create_table.create_meta"; -pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_TABLE: &str = - "meta.procedure.create_table.create_table"; +pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table"; diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index 01d1813728..1a4713d628 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -12,8 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::region::region_request::Body as PbRegionRequest; +use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest}; +use api::v1::SemanticType; use async_trait::async_trait; +use client::region::RegionRequester; use client::Database; +use common_catalog::consts::MITO2_ENGINE; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_meta::key::table_name::TableNameKey; @@ -25,13 +30,16 @@ use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::info; use futures::future::join_all; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; -use table::engine::TableReference; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::RegionId; +use strum::AsRefStr; +use table::engine::{region_dir, TableReference}; use table::metadata::{RawTableInfo, TableId}; use super::utils::{handle_request_datanode_error, handle_retry_error}; use crate::ddl::DdlContext; -use crate::error::{self, Result, TableMetadataManagerSnafu}; +use crate::error::{self, PrimaryKeyNotFoundSnafu, Result, TableMetadataManagerSnafu}; +use crate::metrics; pub struct CreateTableProcedure { context: DdlContext, @@ -69,6 +77,10 @@ impl CreateTableProcedure { &self.creator.data.task.table_info } + fn table_id(&self) -> TableId { + self.table_info().ident.table_id + } + pub fn region_routes(&self) -> &Vec { &self.creator.data.region_routes } @@ -99,17 +111,126 @@ impl CreateTableProcedure { return Ok(Status::Done); } - self.creator.data.state = CreateTableState::DatanodeCreateTable; + self.creator.data.state = if expr.engine == MITO2_ENGINE { + CreateTableState::DatanodeCreateRegions + } else { + CreateTableState::DatanodeCreateTable + }; + + Ok(Status::executing(true)) + } + + fn create_region_request_template(&self) -> Result { + let create_table_expr = &self.creator.data.task.create_table; + + let column_defs = create_table_expr + .column_defs + .iter() + .enumerate() + .map(|(i, c)| { + let semantic_type = if create_table_expr.time_index == c.name { + SemanticType::Timestamp + } else if create_table_expr.primary_keys.contains(&c.name) { + SemanticType::Tag + } else { + SemanticType::Field + }; + + ColumnDef { + name: c.name.clone(), + column_id: i as u32, + datatype: c.datatype, + is_nullable: c.is_nullable, + default_constraint: c.default_constraint.clone(), + semantic_type: semantic_type as i32, + } + }) + .collect::>(); + + let primary_key = create_table_expr + .primary_keys + .iter() + .map(|key| { + column_defs + .iter() + .find_map(|c| { + if &c.name == key { + Some(c.column_id) + } else { + None + } + }) + .context(PrimaryKeyNotFoundSnafu { key }) + }) + .collect::>()?; + + Ok(PbCreateRegionRequest { + region_id: 0, + engine: create_table_expr.engine.to_string(), + column_defs, + primary_key, + create_if_not_exists: true, + region_dir: "".to_string(), + options: create_table_expr.table_options.clone(), + }) + } + + async fn on_datanode_create_regions(&mut self) -> Result { + let create_table_data = &self.creator.data; + let region_routes = &create_table_data.region_routes; + + let create_table_expr = &create_table_data.task.create_table; + let catalog = &create_table_expr.catalog_name; + let schema = &create_table_expr.schema_name; + + let request_template = self.create_region_request_template()?; + + let leaders = find_leaders(region_routes); + let mut create_table_tasks = Vec::with_capacity(leaders.len()); + + for datanode in leaders { + let clients = self.context.datanode_clients.clone(); + + let regions = find_leader_regions(region_routes, &datanode); + let requests = regions + .iter() + .map(|region_number| { + let region_id = RegionId::new(self.table_id(), *region_number); + + let mut create_table_request = request_template.clone(); + create_table_request.region_id = region_id.as_u64(); + create_table_request.region_dir = region_dir(catalog, schema, region_id); + + PbRegionRequest::Create(create_table_request) + }) + .collect::>(); + + create_table_tasks.push(common_runtime::spawn_bg(async move { + for request in requests { + let client = clients.get_client(&datanode).await; + let requester = RegionRequester::new(client); + + if let Err(err) = requester.handle(request).await { + return Err(handle_request_datanode_error(datanode)(err)); + } + } + Ok(()) + })); + } + + join_all(create_table_tasks) + .await + .into_iter() + .map(|e| e.context(error::JoinSnafu).flatten()) + .collect::>>()?; + + self.creator.data.state = CreateTableState::CreateMetadata; Ok(Status::executing(true)) } async fn on_create_metadata(&self) -> Result { - let _timer = common_telemetry::timer!( - crate::metrics::METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META - ); - - let table_id = self.table_info().ident.table_id as TableId; + let table_id = self.table_id(); let manager = &self.context.table_metadata_manager; let raw_table_info = self.table_info().clone(); @@ -124,15 +245,12 @@ impl CreateTableProcedure { } async fn on_datanode_create_table(&mut self) -> Result { - let _timer = common_telemetry::timer!( - crate::metrics::METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_TABLE - ); let region_routes = &self.creator.data.region_routes; let table_name = self.table_name(); let clients = self.context.datanode_clients.clone(); let leaders = find_leaders(region_routes); let mut joins = Vec::with_capacity(leaders.len()); - let table_id = self.table_info().ident.table_id; + let table_id = self.table_id(); for datanode in leaders { let client = clients.get_client(&datanode).await; @@ -172,9 +290,17 @@ impl Procedure for CreateTableProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - match self.creator.data.state { + let state = &self.creator.data.state; + + let _timer = common_telemetry::timer!( + metrics::METRIC_META_PROCEDURE_CREATE_TABLE, + &[("step", state.as_ref().to_string())] + ); + + match state { CreateTableState::Prepare => self.on_prepare().await, CreateTableState::DatanodeCreateTable => self.on_datanode_create_table().await, + CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await, CreateTableState::CreateMetadata => self.on_create_metadata().await, } .map_err(handle_retry_error) @@ -213,12 +339,14 @@ impl TableCreator { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] enum CreateTableState { /// Prepares to create the table Prepare, /// Datanode creates the table DatanodeCreateTable, + /// Create regions on the Datanode + DatanodeCreateRegions, /// Creates metadata CreateMetadata, } @@ -236,3 +364,323 @@ impl CreateTableData { self.task.table_ref() } } + +#[cfg(test)] +mod test { + use std::collections::{HashMap, HashSet}; + use std::sync::{Arc, Mutex}; + + use api::v1::region::region_server::RegionServer; + use api::v1::region::RegionResponse; + use api::v1::{ + ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr, ResponseHeader, + Status as PbStatus, + }; + use chrono::DateTime; + use client::client_manager::DatanodeClients; + use client::Client; + use common_grpc::channel_manager::ChannelManager; + use common_meta::key::TableMetadataManager; + use common_meta::peer::Peer; + use common_runtime::{Builder as RuntimeBuilder, Runtime}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, RawSchema}; + use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler}; + use table::metadata::{RawTableMeta, TableIdent, TableType}; + use table::requests::TableOptions; + use tokio::sync::mpsc; + use tonic::transport::Server; + use tower::service_fn; + + use super::*; + use crate::handler::{HeartbeatMailbox, Pushers}; + use crate::sequence::Sequence; + use crate::service::store::kv::KvBackendAdapter; + use crate::service::store::memory::MemStore; + use crate::test_util::new_region_route; + + fn create_table_procedure() -> CreateTableProcedure { + let create_table_expr = CreateTableExpr { + catalog_name: "my_catalog".to_string(), + schema_name: "my_schema".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + PbColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + PbColumnDef { + name: "my_tag1".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + PbColumnDef { + name: "my_tag2".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + PbColumnDef { + name: "my_field_column".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + primary_keys: vec!["my_tag2".to_string(), "my_tag1".to_string()], + create_if_not_exists: false, + table_options: HashMap::new(), + table_id: None, + region_numbers: vec![1, 2, 3], + engine: MITO2_ENGINE.to_string(), + }; + + let raw_table_info = RawTableInfo { + ident: TableIdent::new(42), + name: "my_table".to_string(), + desc: Some("blabla".to_string()), + catalog_name: "my_catalog".to_string(), + schema_name: "my_schema".to_string(), + meta: RawTableMeta { + schema: RawSchema { + column_schemas: vec![ + ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ColumnSchema::new( + "my_tag1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "my_tag2".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "my_field_column".to_string(), + ConcreteDataType::int32_datatype(), + true, + ), + ], + timestamp_index: Some(0), + version: 0, + }, + primary_key_indices: vec![1, 2], + value_indices: vec![2], + engine: MITO2_ENGINE.to_string(), + next_column_id: 3, + region_numbers: vec![1, 2, 3], + engine_options: HashMap::new(), + options: TableOptions::default(), + created_on: DateTime::default(), + partition_key_indices: vec![], + }, + table_type: TableType::Base, + }; + + let peers = vec![ + Peer::new(1, "127.0.0.1:4001"), + Peer::new(2, "127.0.0.1:4002"), + Peer::new(3, "127.0.0.1:4003"), + ]; + let region_routes = vec![ + new_region_route(1, &peers, 3), + new_region_route(2, &peers, 2), + new_region_route(3, &peers, 1), + ]; + + let kv_store = Arc::new(MemStore::new()); + + let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone()); + let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); + + CreateTableProcedure::new( + 1, + CreateTableTask::new(create_table_expr, vec![], raw_table_info), + region_routes, + DdlContext { + datanode_clients: Arc::new(DatanodeClients::default()), + mailbox, + server_addr: "127.0.0.1:4321".to_string(), + table_metadata_manager: Arc::new(TableMetadataManager::new( + KvBackendAdapter::wrap(kv_store), + )), + }, + ) + } + + #[test] + fn test_create_region_request_template() { + let procedure = create_table_procedure(); + + let template = procedure.create_region_request_template().unwrap(); + + let expected = PbCreateRegionRequest { + region_id: 0, + engine: MITO2_ENGINE.to_string(), + column_defs: vec![ + ColumnDef { + name: "ts".to_string(), + column_id: 0, + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, + }, + ColumnDef { + name: "my_tag1".to_string(), + column_id: 1, + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + }, + ColumnDef { + name: "my_tag2".to_string(), + column_id: 2, + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + }, + ColumnDef { + name: "my_field_column".to_string(), + column_id: 3, + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + }, + ], + primary_key: vec![2, 1], + create_if_not_exists: true, + region_dir: "".to_string(), + options: HashMap::new(), + }; + assert_eq!(template, expected); + } + + #[derive(Clone)] + struct TestingRegionServerHandler { + runtime: Arc, + create_region_notifier: mpsc::Sender, + } + + impl TestingRegionServerHandler { + fn new(create_region_notifier: mpsc::Sender) -> Self { + Self { + runtime: Arc::new(RuntimeBuilder::default().worker_threads(2).build().unwrap()), + create_region_notifier, + } + } + + fn new_client(&self, datanode: &Peer) -> Client { + let (client, server) = tokio::io::duplex(1024); + + let handler = + RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone()); + + tokio::spawn(async move { + Server::builder() + .add_service(RegionServer::new(handler)) + .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( + server, + )])) + .await + }); + + let channel_manager = ChannelManager::new(); + let mut client = Some(client); + channel_manager + .reset_with_connector( + datanode.addr.clone(), + service_fn(move |_| { + let client = client.take().unwrap(); + async move { Ok::<_, std::io::Error>(client) } + }), + ) + .unwrap(); + Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()]) + } + } + + #[async_trait] + impl RegionServerHandler for TestingRegionServerHandler { + async fn handle(&self, request: PbRegionRequest) -> servers::error::Result { + let PbRegionRequest::Create(request) = request else { + unreachable!() + }; + let region_id = request.region_id.into(); + + self.create_region_notifier.send(region_id).await.unwrap(); + + Ok(RegionResponse { + header: Some(ResponseHeader { + status: Some(PbStatus { + status_code: 0, + err_msg: "".to_string(), + }), + }), + affected_rows: 0, + }) + } + } + + #[tokio::test] + async fn test_on_datanode_create_regions() { + let mut procedure = create_table_procedure(); + + let (tx, mut rx) = mpsc::channel(10); + + let region_server = TestingRegionServerHandler::new(tx); + + let datanodes = find_leaders(&procedure.creator.data.region_routes); + for peer in datanodes { + let client = region_server.new_client(&peer); + procedure + .context + .datanode_clients + .insert_client(peer, client) + .await; + } + + let expected_created_regions = Arc::new(Mutex::new(HashSet::from([ + RegionId::new(42, 1), + RegionId::new(42, 2), + RegionId::new(42, 3), + ]))); + let handle = tokio::spawn({ + let expected_created_regions = expected_created_regions.clone(); + let mut max_recv = expected_created_regions.lock().unwrap().len(); + async move { + while let Some(region_id) = rx.recv().await { + expected_created_regions.lock().unwrap().remove(®ion_id); + + max_recv -= 1; + if max_recv == 0 { + break; + } + } + } + }); + + let status = procedure.on_datanode_create_regions().await.unwrap(); + assert!(matches!(status, Status::Executing { persist: true })); + assert!(matches!( + procedure.creator.data.state, + CreateTableState::CreateMetadata + )); + + handle.await.unwrap(); + + assert!(expected_created_regions.lock().unwrap().is_empty()); + } +} diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 28922b6a7b..280e3c77d3 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -158,7 +158,7 @@ mod tests { use super::super::tests::{TestingEnv, TestingEnvBuilder}; use super::{State, *}; - use crate::table_routes::tests::new_region_route; + use crate::test_util::new_region_route; #[tokio::test] async fn test_next_state() { diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 93aff898cc..403ebadaf4 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -72,7 +72,7 @@ pub(crate) async fn fetch_tables( #[cfg(test)] pub(crate) mod tests { - use std::collections::{BTreeMap, HashMap}; + use std::collections::HashMap; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; @@ -141,25 +141,4 @@ pub(crate) mod tests { .await .unwrap(); } - - pub(crate) fn new_region_route( - region_number: u64, - peers: &[Peer], - leader_node: u64, - ) -> RegionRoute { - let region = Region { - id: region_number.into(), - name: "".to_string(), - partition: None, - attrs: BTreeMap::new(), - }; - - let leader_peer = peers.iter().find(|peer| peer.id == leader_node).cloned(); - - RegionRoute { - region, - leader_peer, - follower_peers: vec![], - } - } } diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index e040fc28bf..d364b042bc 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -15,6 +15,8 @@ use std::sync::Arc; use common_meta::key::TableMetadataManager; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::MetaPeerClientBuilder; @@ -28,6 +30,21 @@ use crate::sequence::Sequence; use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; +pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) -> RegionRoute { + let region = Region { + id: region_id.into(), + ..Default::default() + }; + + let leader_peer = peers.iter().find(|peer| peer.id == leader_node).cloned(); + + RegionRoute { + region, + leader_peer, + follower_peers: vec![], + } +} + pub(crate) fn create_region_failover_manager() -> Arc { let kv_store = Arc::new(MemStore::new()); diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 48aa8ade17..0b9f1c8857 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -49,7 +49,7 @@ serde_json = "1.0" snafu.workspace = true storage = { workspace = true } store-api = { workspace = true } -strum = "0.21" +strum.workspace = true table = { workspace = true } tokio-util.workspace = true tokio.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 7ad63ea38a..d7fd06c4f9 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -83,7 +83,7 @@ sha1 = "0.10" snafu.workspace = true snap = "1" sql = { workspace = true } -strum = { version = "0.24", features = ["derive"] } +strum.workspace = true table = { workspace = true } tokio-rustls = "0.24" tokio-stream = { version = "0.1", features = ["net"] } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 1e71b15b71..ff0ff5173a 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -88,9 +88,8 @@ impl GrpcServer { ) -> Self { let database_handler = GreptimeRequestHandler::new(query_handler, user_provider.clone(), runtime.clone()); - let region_server_handler = region_server_handler.map(|handler| { - RegionServerRequestHandler::new(handler, user_provider.clone(), runtime.clone()) - }); + let region_server_handler = + region_server_handler.map(|handler| RegionServerRequestHandler::new(handler, runtime)); Self { shutdown_tx: Mutex::new(None), user_provider, diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 873a6293fb..d2c1463110 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -71,7 +71,7 @@ impl GreptimeRequestHandler { query_ctx.set_current_user(user_info); let handler = self.handler.clone(); - let request_type = request_type(&query); + let request_type = request_type(&query).to_string(); let db = query_ctx.get_db_string(); let timer = RequestTimer::new(db.clone(), request_type); @@ -180,13 +180,13 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte pub(crate) struct RequestTimer { start: Instant, db: String, - request_type: &'static str, + request_type: String, status_code: StatusCode, } impl RequestTimer { /// Returns a new timer. - pub fn new(db: String, request_type: &'static str) -> RequestTimer { + pub fn new(db: String, request_type: String) -> RequestTimer { RequestTimer { start: Instant::now(), db, @@ -208,7 +208,7 @@ impl Drop for RequestTimer { self.start.elapsed(), &[ (METRIC_DB_LABEL, std::mem::take(&mut self.db)), - (METRIC_TYPE_LABEL, self.request_type.to_string()), + (METRIC_TYPE_LABEL, std::mem::take(&mut self.request_type)), (METRIC_CODE_LABEL, self.status_code.to_string()) ] ); diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index 1bcccc6c30..a93686b026 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -14,30 +14,17 @@ use std::sync::Arc; -use api::helper::region_request_type; -use api::v1::auth_header::AuthScheme; use api::v1::region::region_server::Region as RegionServer; use api::v1::region::{region_request, RegionRequest, RegionResponse}; -use api::v1::{Basic, RequestHeader}; use async_trait::async_trait; -use auth::{Identity, Password, UserInfoRef, UserProviderRef}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_runtime::Runtime; use common_telemetry::{debug, error}; -use metrics::increment_counter; -use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; -use crate::error::{ - AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, - UnsupportedAuthSchemeSnafu, -}; -use crate::grpc::greptime_handler::RequestTimer; +use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result}; use crate::grpc::TonicResult; -use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_CODE_LABEL}; #[async_trait] pub trait RegionServerHandler: Send + Sync { @@ -49,21 +36,12 @@ pub type RegionServerHandlerRef = Arc; #[derive(Clone)] pub struct RegionServerRequestHandler { handler: Arc, - user_provider: Option, runtime: Arc, } impl RegionServerRequestHandler { - pub fn new( - handler: Arc, - user_provider: Option, - runtime: Arc, - ) -> Self { - Self { - handler, - user_provider, - runtime, - } + pub fn new(handler: Arc, runtime: Arc) -> Self { + Self { handler, runtime } } async fn handle(&self, request: RegionRequest) -> Result { @@ -71,15 +49,7 @@ impl RegionServerRequestHandler { reason: "Expecting non-empty GreptimeRequest.", })?; - let header = request.header.as_ref(); - let query_ctx = create_query_context(header); - let user_info = self.auth(header, &query_ctx).await?; - query_ctx.set_current_user(user_info); - let handler = self.handler.clone(); - let request_type = region_request_type(&query); - let db = query_ctx.get_db_string(); - let timer = RequestTimer::new(db.clone(), request_type); // Executes requests in another runtime to // 1. prevent the execution from being cancelled unexpected by Tonic runtime; @@ -100,85 +70,8 @@ impl RegionServerRequestHandler { }) }); - handle.await.context(JoinTaskSnafu).map_err(|e| { - timer.record(e.status_code()); - e - })? + handle.await.context(JoinTaskSnafu)? } - - async fn auth( - &self, - header: Option<&RequestHeader>, - query_ctx: &QueryContextRef, - ) -> Result> { - let Some(user_provider) = self.user_provider.as_ref() else { - return Ok(None); - }; - - let auth_scheme = header - .and_then(|header| { - header - .authorization - .as_ref() - .and_then(|x| x.auth_scheme.clone()) - }) - .context(NotFoundAuthHeaderSnafu)?; - - match auth_scheme { - AuthScheme::Basic(Basic { username, password }) => user_provider - .auth( - Identity::UserId(&username, None), - Password::PlainText(password.into()), - query_ctx.current_catalog(), - query_ctx.current_schema(), - ) - .await - .context(AuthSnafu), - AuthScheme::Token(_) => UnsupportedAuthSchemeSnafu { - name: "Token AuthScheme".to_string(), - } - .fail(), - } - .map(Some) - .map_err(|e| { - increment_counter!( - METRIC_AUTH_FAILURE, - &[(METRIC_CODE_LABEL, format!("{}", e.status_code()))] - ); - e - }) - } -} - -pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef { - let (catalog, schema) = header - .map(|header| { - // We provide dbname field in newer versions of protos/sdks - // parse dbname from header in priority - if !header.dbname.is_empty() { - parse_catalog_and_schema_from_db_string(&header.dbname) - } else { - ( - if !header.catalog.is_empty() { - &header.catalog - } else { - DEFAULT_CATALOG_NAME - }, - if !header.schema.is_empty() { - &header.schema - } else { - DEFAULT_SCHEMA_NAME - }, - ) - } - }) - .unwrap_or((DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)); - - QueryContextBuilder::default() - .current_catalog(catalog.to_string()) - .current_schema(schema.to_string()) - .try_trace_id(header.and_then(|h: &RequestHeader| h.trace_id)) - .build() } #[async_trait] diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 528ebe0e0f..f9cc7ce953 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -14,6 +14,7 @@ use std::cmp::Ordering; +use common_catalog::consts::default_engine; use itertools::Itertools; use once_cell::sync::Lazy; use snafu::{ensure, OptionExt, ResultExt}; @@ -143,7 +144,7 @@ impl<'a> ParserContext<'a> { let partitions = self.parse_partitions()?; - let engine = self.parse_table_engine(common_catalog::consts::MITO_ENGINE)?; + let engine = self.parse_table_engine(default_engine())?; let options = self .parser .parse_options(Keyword::WITH) diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index f4937bfb60..7d3fb5809f 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_base::paths::DATA_DIR; use common_procedure::BoxedProcedure; use datafusion_common::TableReference as DfTableReference; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionId, RegionNumber}; use crate::error::{self, Result}; use crate::metadata::TableId; @@ -198,6 +198,14 @@ pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> St format!("{DATA_DIR}{catalog_name}/{schema_name}/{table_id}/") } +pub fn region_dir(catalog_name: &str, schema_name: &str, region_id: RegionId) -> String { + format!( + "{}{}", + table_dir(catalog_name, schema_name, region_id.table_id()), + region_name(region_id.table_id(), region_id.region_number()) + ) +} + #[cfg(test)] mod tests { use super::*; @@ -212,4 +220,13 @@ mod tests { assert_eq!("greptime.public.test", table_ref.to_string()); } + + #[test] + fn test_region_dir() { + let region_id = RegionId::new(42, 1); + assert_eq!( + region_dir("my_catalog", "my_schema", region_id), + "data/my_catalog/my_schema/42/42_0000000001" + ); + } }