From fa5e3b94d3751a9c19b85840512e1138443b4989 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Sat, 2 Sep 2023 13:04:18 +0800 Subject: [PATCH] refactor: refactor ddl procedure (#2304) --- Cargo.lock | 2 + src/common/meta/Cargo.toml | 3 + src/common/meta/src/ddl.rs | 29 +++ .../meta/src/ddl}/alter_table.rs | 87 ++----- .../meta/src/ddl}/create_table.rs | 209 ++------------- .../meta/src/ddl}/drop_table.rs | 132 +++------- src/common/meta/src/ddl/utils.rs | 44 ++++ src/common/meta/src/error.rs | 63 ++++- src/common/meta/src/lib.rs | 1 + src/common/meta/src/metrics.rs | 3 + src/meta-srv/src/ddl.rs | 20 +- src/meta-srv/src/error.rs | 32 +-- src/meta-srv/src/metrics.rs | 3 - src/meta-srv/src/procedure.rs | 5 +- src/meta-srv/src/procedure/tests.rs | 240 ++++++++++++++++++ src/meta-srv/src/procedure/utils.rs | 54 +--- 16 files changed, 458 insertions(+), 469 deletions(-) create mode 100644 src/common/meta/src/ddl.rs rename src/{meta-srv/src/procedure => common/meta/src/ddl}/alter_table.rs (79%) rename src/{meta-srv/src/procedure => common/meta/src/ddl}/create_table.rs (56%) rename src/{meta-srv/src/procedure => common/meta/src/ddl}/drop_table.rs (66%) create mode 100644 src/common/meta/src/ddl/utils.rs create mode 100644 src/meta-srv/src/procedure/tests.rs diff --git a/Cargo.lock b/Cargo.lock index fa2584dab2..b9ba55d339 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1833,6 +1833,7 @@ dependencies = [ "chrono", "common-catalog", "common-error", + "common-procedure", "common-runtime", "common-telemetry", "common-time", @@ -1848,6 +1849,7 @@ dependencies = [ "serde_json", "snafu", "store-api", + "strum 0.25.0", "table", "tokio", ] diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index f978d73745..8e9ae24a1d 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -10,9 +10,11 @@ async-stream.workspace = true async-trait.workspace = true common-catalog = { workspace = true } common-error = { workspace = true } +common-procedure = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } common-time = { workspace = true } +datatypes = { workspace = true } etcd-client.workspace = true futures.workspace = true humantime-serde.workspace = true @@ -23,6 +25,7 @@ serde.workspace = true serde_json.workspace = true snafu.workspace = true store-api = { workspace = true } +strum.workspace = true table = { workspace = true } tokio.workspace = true diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs new file mode 100644 index 0000000000..5f8e147af8 --- /dev/null +++ b/src/common/meta/src/ddl.rs @@ -0,0 +1,29 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::cache_invalidator::CacheInvalidatorRef; +use crate::datanode_manager::DatanodeManagerRef; +use crate::key::TableMetadataManagerRef; + +pub mod alter_table; +pub mod create_table; +pub mod drop_table; +pub mod utils; + +#[derive(Clone)] +pub struct DdlContext { + pub datanode_manager: DatanodeManagerRef, + pub cache_invalidator: CacheInvalidatorRef, + pub table_metadata_manager: TableMetadataManagerRef, +} diff --git a/src/meta-srv/src/procedure/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs similarity index 79% rename from src/meta-srv/src/procedure/alter_table.rs rename to src/common/meta/src/ddl/alter_table.rs index 916f1ea551..1bc95b56c5 100644 --- a/src/meta-srv/src/procedure/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -15,30 +15,27 @@ use std::vec; use async_trait::async_trait; -use client::Database; -use common_meta::cache_invalidator::Context; -use common_meta::ident::TableIdent; -use common_meta::key::table_info::TableInfoValue; -use common_meta::key::table_name::TableNameKey; -use common_meta::key::table_route::TableRouteValue; -use common_meta::rpc::ddl::AlterTableTask; -use common_meta::rpc::router::{find_leaders, RegionRoute}; -use common_meta::table_name::TableName; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, }; use common_telemetry::{debug, info}; -use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::requests::{AlterKind, AlterTableRequest}; +use crate::cache_invalidator::Context; use crate::ddl::DdlContext; -use crate::error::{self, Result, TableMetadataManagerSnafu}; -use crate::procedure::utils::handle_request_datanode_error; +use crate::error::{self, Result}; +use crate::ident::TableIdent; +use crate::key::table_info::TableInfoValue; +use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteValue; +use crate::rpc::ddl::AlterTableTask; +use crate::rpc::router::RegionRoute; +use crate::table_name::TableName; // TODO(weny): removes in following PRs. #[allow(dead_code)] @@ -50,9 +47,9 @@ pub struct AlterTableProcedure { // TODO(weny): removes in following PRs. #[allow(dead_code)] impl AlterTableProcedure { - pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable"; + pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable"; - pub(crate) fn new( + pub fn new( cluster_id: u64, task: AlterTableTask, alter_table_request: AlterTableRequest, @@ -65,7 +62,7 @@ impl AlterTableProcedure { } } - pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; Ok(AlterTableProcedure { context, data }) @@ -85,8 +82,7 @@ impl AlterTableProcedure { &request.schema_name, new_table_name, )) - .await - .context(TableMetadataManagerSnafu)?; + .await?; ensure!( !exist, @@ -107,13 +103,12 @@ impl AlterTableProcedure { &request.schema_name, &request.table_name, )) - .await - .context(TableMetadataManagerSnafu)?; + .await?; ensure!( exist, error::TableNotFoundSnafu { - name: request.table_ref().to_string() + table_name: request.table_ref().to_string() } ); @@ -122,44 +117,6 @@ impl AlterTableProcedure { Ok(Status::executing(true)) } - /// Alters table on datanode. - async fn on_datanode_alter_table(&mut self) -> Result { - let region_routes = self - .data - .region_routes - .as_ref() - .context(error::UnexpectedSnafu { - violated: "expected table_route", - })?; - - let table_ref = self.data.table_ref(); - - let clients = self.context.datanode_clients.clone(); - let leaders = find_leaders(region_routes); - let mut joins = Vec::with_capacity(leaders.len()); - - for datanode in leaders { - let client = clients.get_client(&datanode).await; - let client = Database::new(table_ref.catalog, table_ref.schema, client); - let expr = self.data.task.alter_table.clone(); - joins.push(common_runtime::spawn_bg(async move { - debug!("Sending {:?} to {:?}", expr, client); - client - .alter(expr) - .await - .map_err(handle_request_datanode_error(datanode)) - })); - } - - let _ = join_all(joins) - .await - .into_iter() - .map(|e| e.context(error::JoinSnafu).flatten()) - .collect::>>()?; - - Ok(Status::Done) - } - /// Update table metadata for rename table operation. async fn on_update_metadata_for_rename(&self, new_table_name: String) -> Result<()> { let table_metadata_manager = &self.context.table_metadata_manager; @@ -168,8 +125,7 @@ impl AlterTableProcedure { table_metadata_manager .rename_table(current_table_info_value, new_table_name) - .await - .context(error::TableMetadataManagerSnafu)?; + .await?; Ok(()) } @@ -180,8 +136,7 @@ impl AlterTableProcedure { table_metadata_manager .update_table_info(current_table_info_value, new_table_info) - .await - .context(error::TableMetadataManagerSnafu)?; + .await?; Ok(()) } @@ -241,8 +196,7 @@ impl AlterTableProcedure { let TableRouteValue { region_routes, .. } = table_metadata_manager .table_route_manager() .get(table_id) - .await - .context(error::TableMetadataManagerSnafu)? + .await? .with_context(|| error::TableRouteNotFoundSnafu { table_name: table_ref.to_string(), })?; @@ -272,8 +226,7 @@ impl AlterTableProcedure { }, table_ident, ) - .await - .context(error::InvalidateTableCacheSnafu)?; + .await?; self.data.state = AlterTableState::DatanodeAlterTable; Ok(Status::executing(true)) @@ -320,7 +273,7 @@ impl Procedure for AlterTableProcedure { AlterTableState::Prepare => self.on_prepare().await, AlterTableState::UpdateMetadata => self.on_update_metadata().await, AlterTableState::InvalidateTableCache => self.on_broadcast().await, - AlterTableState::DatanodeAlterTable => self.on_datanode_alter_table().await, + AlterTableState::DatanodeAlterTable => todo!(), } .map_err(error_handler) } diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/common/meta/src/ddl/create_table.rs similarity index 56% rename from src/meta-srv/src/procedure/create_table.rs rename to src/common/meta/src/ddl/create_table.rs index 3b536428da..e4c2c68258 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -16,9 +16,6 @@ use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest}; use api::v1::SemanticType; use async_trait::async_trait; -use common_meta::key::table_name::TableNameKey; -use common_meta::rpc::ddl::CreateTableTask; -use common_meta::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::info; @@ -30,20 +27,24 @@ use strum::AsRefStr; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; +use crate::ddl::utils::{handle_operate_region_error, handle_retry_error}; use crate::ddl::DdlContext; -use crate::error::{self, PrimaryKeyNotFoundSnafu, Result, TableMetadataManagerSnafu}; +use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; use crate::metrics; -use crate::procedure::utils::{handle_operate_region_error, handle_retry_error}; +use crate::rpc::ddl::CreateTableTask; +use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; pub struct CreateTableProcedure { - context: DdlContext, - creator: TableCreator, + pub context: DdlContext, + pub creator: TableCreator, } +#[allow(dead_code)] impl CreateTableProcedure { - pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; - pub(crate) fn new( + pub fn new( cluster_id: u64, task: CreateTableTask, region_routes: Vec, @@ -55,7 +56,7 @@ impl CreateTableProcedure { } } - pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; Ok(CreateTableProcedure { context, @@ -87,8 +88,7 @@ impl CreateTableProcedure { &expr.schema_name, &expr.table_name, )) - .await - .context(TableMetadataManagerSnafu)?; + .await?; if exist { ensure!( @@ -106,7 +106,7 @@ impl CreateTableProcedure { Ok(Status::executing(true)) } - fn create_region_request_template(&self) -> Result { + pub fn create_region_request_template(&self) -> Result { let create_table_expr = &self.creator.data.task.create_table; let column_defs = create_table_expr @@ -146,7 +146,7 @@ impl CreateTableProcedure { None } }) - .context(PrimaryKeyNotFoundSnafu { key }) + .context(error::PrimaryKeyNotFoundSnafu { key }) }) .collect::>()?; @@ -162,7 +162,7 @@ impl CreateTableProcedure { }) } - async fn on_datanode_create_regions(&mut self) -> Result { + pub async fn on_datanode_create_regions(&mut self) -> Result { let create_table_data = &self.creator.data; let region_routes = &create_table_data.region_routes; @@ -223,8 +223,7 @@ impl CreateTableProcedure { let region_routes = self.region_routes().clone(); manager .create_table_metadata(raw_table_info, region_routes) - .await - .context(TableMetadataManagerSnafu)?; + .await?; info!("Created table metadata for table {table_id}"); Ok(Status::Done) @@ -270,7 +269,7 @@ impl Procedure for CreateTableProcedure { } pub struct TableCreator { - data: CreateTableData, + pub data: CreateTableData, } impl TableCreator { @@ -287,7 +286,7 @@ impl TableCreator { } #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] -enum CreateTableState { +pub enum CreateTableState { /// Prepares to create the table Prepare, /// Creates regions on the Datanode @@ -298,10 +297,10 @@ enum CreateTableState { #[derive(Debug, Serialize, Deserialize)] pub struct CreateTableData { - state: CreateTableState, - task: CreateTableTask, - region_routes: Vec, - cluster_id: u64, + pub state: CreateTableState, + pub task: CreateTableTask, + pub region_routes: Vec, + pub cluster_id: u64, } impl CreateTableData { @@ -309,167 +308,3 @@ impl CreateTableData { self.task.table_ref() } } - -#[cfg(test)] -mod test { - use std::collections::{HashMap, HashSet}; - use std::sync::{Arc, Mutex}; - - use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr}; - use common_catalog::consts::MITO2_ENGINE; - - use super::*; - use crate::procedure::utils::mock::EchoRegionServer; - use crate::procedure::utils::test_data; - - fn create_table_procedure() -> CreateTableProcedure { - let create_table_expr = CreateTableExpr { - catalog_name: "my_catalog".to_string(), - schema_name: "my_schema".to_string(), - table_name: "my_table".to_string(), - desc: "blabla".to_string(), - column_defs: vec![ - PbColumnDef { - name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - }, - PbColumnDef { - name: "my_tag1".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - }, - PbColumnDef { - name: "my_tag2".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - }, - PbColumnDef { - name: "my_field_column".to_string(), - datatype: ColumnDataType::Int32 as i32, - is_nullable: true, - default_constraint: vec![], - }, - ], - time_index: "ts".to_string(), - primary_keys: vec!["my_tag2".to_string(), "my_tag1".to_string()], - create_if_not_exists: false, - table_options: HashMap::new(), - table_id: None, - region_numbers: vec![1, 2, 3], - engine: MITO2_ENGINE.to_string(), - }; - - CreateTableProcedure::new( - 1, - CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()), - test_data::new_region_routes(), - test_data::new_ddl_context(), - ) - } - - #[test] - fn test_create_region_request_template() { - let procedure = create_table_procedure(); - - let template = procedure.create_region_request_template().unwrap(); - - let expected = PbCreateRegionRequest { - region_id: 0, - engine: MITO2_ENGINE.to_string(), - column_defs: vec![ - ColumnDef { - name: "ts".to_string(), - column_id: 0, - datatype: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Timestamp as i32, - }, - ColumnDef { - name: "my_tag1".to_string(), - column_id: 1, - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - }, - ColumnDef { - name: "my_tag2".to_string(), - column_id: 2, - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - }, - ColumnDef { - name: "my_field_column".to_string(), - column_id: 3, - datatype: ColumnDataType::Int32 as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - }, - ], - primary_key: vec![2, 1], - create_if_not_exists: true, - catalog: String::new(), - schema: String::new(), - options: HashMap::new(), - }; - assert_eq!(template, expected); - } - - #[tokio::test] - async fn test_on_datanode_create_regions() { - let mut procedure = create_table_procedure(); - - let (region_server, mut rx) = EchoRegionServer::new(); - - let datanodes = find_leaders(&procedure.creator.data.region_routes); - for peer in datanodes { - let client = region_server.new_client(&peer); - procedure - .context - .datanode_clients - .insert_client(peer, client) - .await; - } - - let expected_created_regions = Arc::new(Mutex::new(HashSet::from([ - RegionId::new(42, 1), - RegionId::new(42, 2), - RegionId::new(42, 3), - ]))); - let handle = tokio::spawn({ - let expected_created_regions = expected_created_regions.clone(); - let mut max_recv = expected_created_regions.lock().unwrap().len(); - async move { - while let Some(PbRegionRequest::Create(request)) = rx.recv().await { - let region_id = RegionId::from_u64(request.region_id); - - expected_created_regions.lock().unwrap().remove(®ion_id); - - max_recv -= 1; - if max_recv == 0 { - break; - } - } - } - }); - - let status = procedure.on_datanode_create_regions().await.unwrap(); - assert!(matches!(status, Status::Executing { persist: true })); - assert!(matches!( - procedure.creator.data.state, - CreateTableState::CreateMetadata - )); - - handle.await.unwrap(); - - assert!(expected_created_regions.lock().unwrap().is_empty()); - } -} diff --git a/src/meta-srv/src/procedure/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs similarity index 66% rename from src/meta-srv/src/procedure/drop_table.rs rename to src/common/meta/src/ddl/drop_table.rs index 29428fd31b..d0f38617d4 100644 --- a/src/meta-srv/src/procedure/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -14,17 +14,8 @@ use api::v1::region::{region_request, DropRequest as PbDropRegionRequest}; use async_trait::async_trait; -use client::region::RegionRequester; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_meta::cache_invalidator::Context; -use common_meta::ident::TableIdent; -use common_meta::key::table_info::TableInfoValue; -use common_meta::key::table_name::TableNameKey; -use common_meta::key::table_route::TableRouteValue; -use common_meta::rpc::ddl::DropTableTask; -use common_meta::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; -use common_meta::table_name::TableName; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, @@ -39,20 +30,29 @@ use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; use super::utils::handle_retry_error; +use crate::cache_invalidator::Context; +use crate::ddl::utils::handle_operate_region_error; use crate::ddl::DdlContext; -use crate::error::{self, Result, TableMetadataManagerSnafu}; +use crate::error::{self, Result}; +use crate::ident::TableIdent; +use crate::key::table_info::TableInfoValue; +use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteValue; use crate::metrics; -use crate::procedure::utils::handle_request_datanode_error; +use crate::rpc::ddl::DropTableTask; +use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::table_name::TableName; pub struct DropTableProcedure { - context: DdlContext, - data: DropTableData, + pub context: DdlContext, + pub data: DropTableData, } +#[allow(dead_code)] impl DropTableProcedure { - pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::DropTable"; + pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable"; - pub(crate) fn new( + pub fn new( cluster_id: u64, task: DropTableTask, table_route_value: TableRouteValue, @@ -65,7 +65,7 @@ impl DropTableProcedure { } } - pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; Ok(Self { context, data }) } @@ -82,13 +82,12 @@ impl DropTableProcedure { table_ref.schema, table_ref.table, )) - .await - .context(TableMetadataManagerSnafu)?; + .await?; ensure!( exist, error::TableNotFoundSnafu { - name: table_ref.to_string() + table_name: table_ref.to_string() } ); @@ -106,8 +105,7 @@ impl DropTableProcedure { table_metadata_manager .delete_table_metadata(table_info_value, table_route_value) - .await - .context(error::TableMetadataManagerSnafu)?; + .await?; info!("Deleted table metadata for table {table_id}"); @@ -137,15 +135,14 @@ impl DropTableProcedure { }, table_ident, ) - .await - .context(error::InvalidateTableCacheSnafu)?; + .await?; self.data.state = DropTableState::DatanodeDropRegions; Ok(Status::executing(true)) } - async fn on_datanode_drop_regions(&self) -> Result { + pub async fn on_datanode_drop_regions(&self) -> Result { let table_id = self.data.table_id(); let region_routes = &self.data.region_routes(); @@ -153,7 +150,7 @@ impl DropTableProcedure { let mut drop_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let clients = self.context.datanode_clients.clone(); + let clients = self.context.datanode_manager.clone(); let regions = find_leader_regions(region_routes, &datanode); let region_ids = regions @@ -169,12 +166,11 @@ impl DropTableProcedure { region_id: region_id.as_u64(), }); - let client = clients.get_client(&datanode).await; - let requester = RegionRequester::new(client); + let requester = clients.datanode(&datanode).await; if let Err(err) = requester.handle(request).await { if err.status_code() != StatusCode::RegionNotFound { - return Err(handle_request_datanode_error(datanode)(err)); + return Err(handle_operate_region_error(datanode)(err)); } } } @@ -232,11 +228,11 @@ impl Procedure for DropTableProcedure { #[derive(Debug, Serialize, Deserialize)] pub struct DropTableData { - state: DropTableState, - cluster_id: u64, - task: DropTableTask, - table_route_value: TableRouteValue, - table_info_value: TableInfoValue, + pub state: DropTableState, + pub cluster_id: u64, + pub task: DropTableTask, + pub table_route_value: TableRouteValue, + pub table_info_value: TableInfoValue, } impl DropTableData { @@ -277,7 +273,7 @@ impl DropTableData { } #[derive(Debug, Serialize, Deserialize, AsRefStr)] -enum DropTableState { +pub enum DropTableState { /// Prepares to drop the table Prepare, /// Removes metadata @@ -287,71 +283,3 @@ enum DropTableState { /// Drops regions on Datanode DatanodeDropRegions, } - -#[cfg(test)] -mod test { - use std::collections::HashSet; - use std::sync::{Arc, Mutex}; - - use super::*; - use crate::procedure::utils::mock::EchoRegionServer; - use crate::procedure::utils::test_data; - - #[tokio::test] - async fn test_on_datanode_drop_regions() { - let drop_table_task = DropTableTask { - catalog: "my_catalog".to_string(), - schema: "my_schema".to_string(), - table: "my_table".to_string(), - table_id: 42, - }; - let procedure = DropTableProcedure::new( - 1, - drop_table_task, - TableRouteValue::new(test_data::new_region_routes()), - TableInfoValue::new(test_data::new_table_info()), - test_data::new_ddl_context(), - ); - - let (region_server, mut rx) = EchoRegionServer::new(); - - let datanodes = find_leaders(&procedure.data.table_route_value.region_routes); - for peer in datanodes { - let client = region_server.new_client(&peer); - procedure - .context - .datanode_clients - .insert_client(peer, client) - .await; - } - - let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([ - RegionId::new(42, 1), - RegionId::new(42, 2), - RegionId::new(42, 3), - ]))); - let handle = tokio::spawn({ - let expected_dropped_regions = expected_dropped_regions.clone(); - let mut max_recv = expected_dropped_regions.lock().unwrap().len(); - async move { - while let Some(region_request::Body::Drop(request)) = rx.recv().await { - let region_id = RegionId::from_u64(request.region_id); - - expected_dropped_regions.lock().unwrap().remove(®ion_id); - - max_recv -= 1; - if max_recv == 0 { - break; - } - } - } - }); - - let status = procedure.on_datanode_drop_regions().await.unwrap(); - assert!(matches!(status, Status::Done)); - - handle.await.unwrap(); - - assert!(expected_dropped_regions.lock().unwrap().is_empty()); - } -} diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs new file mode 100644 index 0000000000..785f9d26e1 --- /dev/null +++ b/src/common/meta/src/ddl/utils.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::BoxedError; +use common_procedure::error::Error as ProcedureError; +use snafu::{location, Location}; + +use crate::error::{self, Error}; +use crate::peer::Peer; + +pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(crate::error::Error) -> Error { + move |err| { + if matches!(err, crate::error::Error::RetryLater { .. }) { + error::Error::RetryLater { + source: BoxedError::new(err), + } + } else { + error::Error::OperateDatanode { + location: location!(), + peer: datanode, + source: BoxedError::new(err), + } + } + } +} + +pub fn handle_retry_error(e: Error) -> ProcedureError { + if matches!(e, error::Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 13aba5eabc..06f51857c1 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -21,9 +21,43 @@ use snafu::{Location, Snafu}; use store_api::storage::RegionNumber; use table::metadata::TableId; +use crate::peer::Peer; + #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))] + ConvertRawTableInfo { + location: Location, + source: datatypes::Error, + }, + + #[snafu(display("Primary key '{key}' not found when creating region request, at {location}"))] + PrimaryKeyNotFound { key: String, location: Location }, + + #[snafu(display( + "Failed to build table meta for table: {}, source: {}", + table_name, + source + ))] + BuildTableMeta { + table_name: String, + source: table::metadata::TableMetaBuilderError, + location: Location, + }, + + #[snafu(display("Table occurs error, source: {}", source))] + Table { + location: Location, + source: table::error::Error, + }, + + #[snafu(display("Table route not found: {}", table_name))] + TableRouteNotFound { + table_name: String, + location: Location, + }, + #[snafu(display("Failed to decode protobuf, source: {}", source))] DecodeProto { location: Location, @@ -77,9 +111,9 @@ pub enum Error { #[snafu(display("Unexpected: {err_msg}"))] Unexpected { err_msg: String, location: Location }, - #[snafu(display("Table already exists, table_id: {}", table_id))] + #[snafu(display("Table already exists, table: {}", table_name))] TableAlreadyExists { - table_id: TableId, + table_name: String, location: Location, }, @@ -99,8 +133,8 @@ pub enum Error { source: Utf8Error, }, - #[snafu(display("Table does not exist, table_name: {}", table_name))] - TableNotExist { + #[snafu(display("Table nod found, table: {}", table_name))] + TableNotFound { table_name: String, location: Location, }, @@ -163,6 +197,13 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to operate on datanode: {}, source: {}", peer, source))] + OperateDatanode { + location: Location, + peer: Peer, + source: BoxedError, + }, + #[snafu(display("Retry later, source: {}", source))] RetryLater { source: BoxedError }, } @@ -188,18 +229,26 @@ impl ErrorExt for Error { SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } - | TableAlreadyExists { .. } | CatalogAlreadyExists { .. } | SchemaAlreadyExists { .. } - | TableNotExist { .. } | RenameTable { .. } => StatusCode::Internal, + PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, + + TableNotFound { .. } => StatusCode::TableNotFound, + TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, + EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | ConvertRawKey { .. } - | DecodeProto { .. } => StatusCode::Unexpected, + | DecodeProto { .. } + | BuildTableMeta { .. } + | TableRouteNotFound { .. } + | ConvertRawTableInfo { .. } => StatusCode::Unexpected, + OperateDatanode { source, .. } => source.status_code(), + Table { source, .. } => source.status_code(), RetryLater { source, .. } => source.status_code(), OperateRegion { source, .. } => source.status_code(), ExecuteDdl { source, .. } => source.status_code(), diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index e6fb8fd018..ecf640ea21 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -16,6 +16,7 @@ pub mod cache_invalidator; pub mod datanode_manager; +pub mod ddl; pub mod ddl_manager; pub mod error; pub mod heartbeat; diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 01e3e3859f..2f54e15a77 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -13,3 +13,6 @@ // limitations under the License. pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request"; + +pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table"; +pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table"; diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index 6ca38790aa..ddb20883ba 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -16,7 +16,10 @@ use std::sync::Arc; use client::client_manager::DatanodeClients; use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::ddl::alter_table::AlterTableProcedure; +use common_meta::ddl::create_table::CreateTableProcedure; +use common_meta::ddl::drop_table::DropTableProcedure; +use common_meta::ddl::DdlContext; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; @@ -31,9 +34,6 @@ use crate::error::{ RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, UnsupportedSnafu, WaitProcedureSnafu, }; -use crate::procedure::alter_table::AlterTableProcedure; -use crate::procedure::create_table::CreateTableProcedure; -use crate::procedure::drop_table::DropTableProcedure; pub type DdlManagerRef = Arc; @@ -45,16 +45,6 @@ pub struct DdlManager { pub(crate) table_metadata_manager: TableMetadataManagerRef, } -#[derive(Clone)] -pub(crate) struct DdlContext { - // TODO(weny): removes it - pub(crate) datanode_clients: Arc, - - pub(crate) datanode_manager: DatanodeManagerRef, - pub(crate) cache_invalidator: CacheInvalidatorRef, - pub(crate) table_metadata_manager: TableMetadataManagerRef, -} - impl DdlManager { pub(crate) fn new( procedure_manager: ProcedureManagerRef, @@ -73,9 +63,7 @@ impl DdlManager { pub(crate) fn create_context(&self) -> DdlContext { DdlContext { datanode_manager: self.datanode_clients.clone(), - datanode_clients: self.datanode_clients.clone(), cache_invalidator: self.cache_invalidator.clone(), - table_metadata_manager: self.table_metadata_manager.clone(), } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 03d815c1d6..750b979d22 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -64,29 +64,6 @@ pub enum Error { source: common_grpc_expr::error::Error, }, - #[snafu(display( - "Failed to build table meta for table: {}, source: {}", - table_name, - source - ))] - BuildTableMeta { - table_name: String, - source: table::metadata::TableMetaBuilderError, - location: Location, - }, - - #[snafu(display("Table occurs error, source: {}", source))] - Table { - location: Location, - source: table::error::Error, - }, - - #[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))] - ConvertRawTableInfo { - location: Location, - source: datatypes::Error, - }, - #[snafu(display("Failed to execute transaction: {}", msg))] Txn { location: Location, msg: String }, @@ -529,9 +506,6 @@ pub enum Error { operation: String, location: Location, }, - - #[snafu(display("Primary key '{key}' not found when creating region request, at {location}"))] - PrimaryKeyNotFound { key: String, location: Location }, } pub type Result = std::result::Result; @@ -586,7 +560,6 @@ impl ErrorExt for Error { | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } | Error::InvalidHeartbeatRequest { .. } - | Error::PrimaryKeyNotFound { .. } | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } @@ -604,12 +577,9 @@ impl ErrorExt for Error { | Error::UnexpectedInstructionReply { .. } | Error::Unexpected { .. } | Error::Txn { .. } - | Error::TableIdChanged { .. } - | Error::ConvertRawTableInfo { .. } - | Error::BuildTableMeta { .. } => StatusCode::Unexpected, + | Error::TableIdChanged { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidateTableCache { source, .. } => source.status_code(), - Error::Table { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } => source.status_code(), Error::RecoverProcedure { source, .. } diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 3ea14807fd..cac6598991 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -18,6 +18,3 @@ pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute"; - -pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table"; -pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table"; diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 643d7ffdde..358fe3c46c 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod alter_table; -pub mod create_table; -pub mod drop_table; pub mod region_failover; pub(crate) mod state_store; +#[cfg(test)] +mod tests; mod utils; diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs new file mode 100644 index 0000000000..bea5138549 --- /dev/null +++ b/src/meta-srv/src/procedure/tests.rs @@ -0,0 +1,240 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; + +use api::v1::region::region_request::{self, Body as PbRegionRequest}; +use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest}; +use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr, SemanticType}; +use client::client_manager::DatanodeClients; +use common_catalog::consts::MITO2_ENGINE; +use common_meta::ddl::create_table::*; +use common_meta::ddl::drop_table::DropTableProcedure; +use common_meta::key::table_info::TableInfoValue; +use common_meta::key::table_route::TableRouteValue; +use common_meta::rpc::ddl::{CreateTableTask, DropTableTask}; +use common_meta::rpc::router::find_leaders; +use common_procedure::Status; +use store_api::storage::RegionId; + +use crate::procedure::utils::mock::EchoRegionServer; +use crate::procedure::utils::test_data; + +fn create_table_procedure() -> CreateTableProcedure { + let create_table_expr = CreateTableExpr { + catalog_name: "my_catalog".to_string(), + schema_name: "my_schema".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + PbColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + PbColumnDef { + name: "my_tag1".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + PbColumnDef { + name: "my_tag2".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + PbColumnDef { + name: "my_field_column".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + primary_keys: vec!["my_tag2".to_string(), "my_tag1".to_string()], + create_if_not_exists: false, + table_options: HashMap::new(), + table_id: None, + region_numbers: vec![1, 2, 3], + engine: MITO2_ENGINE.to_string(), + }; + + CreateTableProcedure::new( + 1, + CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()), + test_data::new_region_routes(), + test_data::new_ddl_context(), + ) +} + +#[test] +fn test_create_region_request_template() { + let procedure = create_table_procedure(); + + let template = procedure.create_region_request_template().unwrap(); + + let expected = PbCreateRegionRequest { + region_id: 0, + engine: MITO2_ENGINE.to_string(), + column_defs: vec![ + ColumnDef { + name: "ts".to_string(), + column_id: 0, + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, + }, + ColumnDef { + name: "my_tag1".to_string(), + column_id: 1, + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + }, + ColumnDef { + name: "my_tag2".to_string(), + column_id: 2, + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + }, + ColumnDef { + name: "my_field_column".to_string(), + column_id: 3, + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + }, + ], + primary_key: vec![2, 1], + create_if_not_exists: true, + catalog: String::new(), + schema: String::new(), + options: HashMap::new(), + }; + assert_eq!(template, expected); +} + +#[tokio::test] +async fn test_on_datanode_create_regions() { + let mut procedure = create_table_procedure(); + + let (region_server, mut rx) = EchoRegionServer::new(); + + let datanodes = find_leaders(&procedure.creator.data.region_routes); + let datanode_clients = DatanodeClients::default(); + for peer in datanodes { + let client = region_server.new_client(&peer); + datanode_clients.insert_client(peer, client).await; + } + + procedure.context.datanode_manager = Arc::new(datanode_clients); + + let expected_created_regions = Arc::new(Mutex::new(HashSet::from([ + RegionId::new(42, 1), + RegionId::new(42, 2), + RegionId::new(42, 3), + ]))); + let handle = tokio::spawn({ + let expected_created_regions = expected_created_regions.clone(); + let mut max_recv = expected_created_regions.lock().unwrap().len(); + async move { + while let Some(PbRegionRequest::Create(request)) = rx.recv().await { + let region_id = RegionId::from_u64(request.region_id); + + expected_created_regions.lock().unwrap().remove(®ion_id); + + max_recv -= 1; + if max_recv == 0 { + break; + } + } + } + }); + + let status = procedure.on_datanode_create_regions().await.unwrap(); + assert!(matches!(status, Status::Executing { persist: true })); + assert!(matches!( + procedure.creator.data.state, + CreateTableState::CreateMetadata + )); + + handle.await.unwrap(); + + assert!(expected_created_regions.lock().unwrap().is_empty()); +} + +#[tokio::test] +async fn test_on_datanode_drop_regions() { + let drop_table_task = DropTableTask { + catalog: "my_catalog".to_string(), + schema: "my_schema".to_string(), + table: "my_table".to_string(), + table_id: 42, + }; + let mut procedure = DropTableProcedure::new( + 1, + drop_table_task, + TableRouteValue::new(test_data::new_region_routes()), + TableInfoValue::new(test_data::new_table_info()), + test_data::new_ddl_context(), + ); + + let (region_server, mut rx) = EchoRegionServer::new(); + + let datanodes = find_leaders(&procedure.data.table_route_value.region_routes); + let datanode_clients = DatanodeClients::default(); + for peer in datanodes { + let client = region_server.new_client(&peer); + datanode_clients.insert_client(peer, client).await; + } + + procedure.context.datanode_manager = Arc::new(datanode_clients); + + let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([ + RegionId::new(42, 1), + RegionId::new(42, 2), + RegionId::new(42, 3), + ]))); + let handle = tokio::spawn({ + let expected_dropped_regions = expected_dropped_regions.clone(); + let mut max_recv = expected_dropped_regions.lock().unwrap().len(); + async move { + while let Some(region_request::Body::Drop(request)) = rx.recv().await { + let region_id = RegionId::from_u64(request.region_id); + + expected_dropped_regions.lock().unwrap().remove(®ion_id); + + max_recv -= 1; + if max_recv == 0 { + break; + } + } + } + }); + + let status = procedure.on_datanode_drop_regions().await.unwrap(); + assert!(matches!(status, Status::Done)); + + handle.await.unwrap(); + + assert!(expected_dropped_regions.lock().unwrap().is_empty()); +} diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 1ebc611254..90608516c2 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -12,57 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_error::ext::BoxedError; -use common_meta::peer::Peer; -use common_procedure::error::Error as ProcedureError; -use snafu::{location, Location}; - -use crate::error::{self, Error}; - -pub fn handle_request_datanode_error(datanode: Peer) -> impl FnOnce(client::error::Error) -> Error { - move |err| { - if matches!(err, client::error::Error::FlightGet { .. }) { - error::RetryLaterSnafu { - reason: format!("Failed to execute operation on datanode, source: {}", err), - } - .build() - } else { - error::Error::RequestDatanode { - location: location!(), - peer: datanode, - source: err, - } - } - } -} - -pub fn handle_operate_region_error( - datanode: Peer, -) -> impl FnOnce(common_meta::error::Error) -> Error { - move |err| { - if matches!(err, common_meta::error::Error::RetryLater { .. }) { - error::RetryLaterSnafu { - reason: format!("Failed to execute operation on datanode, source: {}", err), - } - .build() - } else { - error::Error::OperateRegion { - location: location!(), - peer: datanode, - source: BoxedError::new(err), - } - } - } -} - -pub fn handle_retry_error(e: Error) -> ProcedureError { - if matches!(e, error::Error::RetryLater { .. }) { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } -} - #[cfg(test)] pub mod mock { use std::io::Error; @@ -157,6 +106,7 @@ pub mod test_data { use chrono::DateTime; use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; + use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; @@ -166,7 +116,6 @@ pub mod test_data { use table::requests::TableOptions; use crate::cache_invalidator::MetasrvCacheInvalidator; - use crate::ddl::DdlContext; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::metasrv::MetasrvInfo; use crate::sequence::Sequence; @@ -244,7 +193,6 @@ pub mod test_data { let kv_backend = KvBackendAdapter::wrap(kv_store); let clients = Arc::new(DatanodeClients::default()); DdlContext { - datanode_clients: clients.clone(), datanode_manager: clients, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( mailbox,