diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index d2ca945903..e00d1571e2 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod executor; +pub(crate) mod executor; +mod metadata; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; @@ -23,7 +24,7 @@ use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::TableId; use table::table_reference::TableReference; use self::executor::DropTableExecutor; @@ -51,16 +52,10 @@ pub struct DropTableProcedure { impl DropTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable"; - pub fn new( - cluster_id: u64, - task: DropTableTask, - table_route_value: DeserializedValueWithBytes, - table_info_value: DeserializedValueWithBytes, - context: DdlContext, - ) -> Self { + pub fn new(cluster_id: u64, task: DropTableTask, context: DdlContext) -> Self { Self { context, - data: DropTableData::new(cluster_id, task, table_route_value, table_info_value), + data: DropTableData::new(cluster_id, task), dropping_regions: vec![], } } @@ -74,10 +69,11 @@ impl DropTableProcedure { }) } - async fn on_prepare<'a>(&mut self, executor: &DropTableExecutor) -> Result { + pub(crate) async fn on_prepare<'a>(&mut self, executor: &DropTableExecutor) -> Result { if executor.on_prepare(&self.context).await?.stop() { return Ok(Status::done()); } + self.fill_table_metadata().await?; self.data.state = DropTableState::RemoveMetadata; Ok(Status::executing(true)) @@ -85,7 +81,8 @@ impl DropTableProcedure { /// Register dropping regions if doesn't exist. fn register_dropping_regions(&mut self) -> Result<()> { - let region_routes = self.data.region_routes()?; + // Safety: filled in `on_prepare`. + let region_routes = self.data.region_routes().unwrap()?; let dropping_regions = operating_leader_regions(region_routes); @@ -121,7 +118,11 @@ impl DropTableProcedure { // TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping. let table_id = self.data.table_id(); executor - .on_remove_metadata(&self.context, self.data.region_routes()?) + .on_remove_metadata( + &self.context, + // Safety: filled in `on_prepare`. + self.data.region_routes().unwrap()?, + ) .await?; info!("Deleted table metadata for table {table_id}"); self.data.state = DropTableState::InvalidateTableCache; @@ -138,10 +139,22 @@ impl DropTableProcedure { pub async fn on_datanode_drop_regions(&self, executor: &DropTableExecutor) -> Result { executor - .on_drop_regions(&self.context, self.data.region_routes()?) + .on_drop_regions( + &self.context, + // Safety: filled in `on_prepare`. + self.data.region_routes().unwrap()?, + ) .await?; Ok(Status::done()) } + + pub(crate) fn executor(&self) -> DropTableExecutor { + DropTableExecutor::new( + self.data.task.table_name(), + self.data.table_id(), + self.data.task.drop_if_exists, + ) + } } #[async_trait] @@ -151,11 +164,7 @@ impl Procedure for DropTableProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let executor = DropTableExecutor::new( - self.data.task.table_name(), - self.data.table_id(), - self.data.task.drop_if_exists, - ); + let executor = self.executor(); let state = &self.data.state; let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE .with_label_values(&[state.as_ref()]) @@ -188,28 +197,22 @@ impl Procedure for DropTableProcedure { } #[derive(Debug, Serialize, Deserialize)] -/// TODO(weny): simplify the table data. pub struct DropTableData { pub state: DropTableState, pub cluster_id: u64, pub task: DropTableTask, - pub table_route_value: DeserializedValueWithBytes, - pub table_info_value: DeserializedValueWithBytes, + pub table_route_value: Option>, + pub table_info_value: Option>, } impl DropTableData { - pub fn new( - cluster_id: u64, - task: DropTableTask, - table_route_value: DeserializedValueWithBytes, - table_info_value: DeserializedValueWithBytes, - ) -> Self { + pub fn new(cluster_id: u64, task: DropTableTask) -> Self { Self { state: DropTableState::Prepare, cluster_id, task, - table_info_value, - table_route_value, + table_route_value: None, + table_info_value: None, } } @@ -217,19 +220,16 @@ impl DropTableData { self.task.table_ref() } - fn region_routes(&self) -> Result<&Vec> { - self.table_route_value.region_routes() - } - - fn table_info(&self) -> &RawTableInfo { - &self.table_info_value.table_info + fn region_routes(&self) -> Option>> { + self.table_route_value.as_ref().map(|v| v.region_routes()) } fn table_id(&self) -> TableId { - self.table_info().ident.table_id + self.task.table_id } } +/// The state of drop table. #[derive(Debug, Serialize, Deserialize, AsRefStr)] pub enum DropTableState { /// Prepares to drop the table diff --git a/src/common/meta/src/ddl/drop_table/metadata.rs b/src/common/meta/src/ddl/drop_table/metadata.rs new file mode 100644 index 0000000000..e3596eb583 --- /dev/null +++ b/src/common/meta/src/ddl/drop_table/metadata.rs @@ -0,0 +1,46 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_catalog::format_full_table_name; +use snafu::OptionExt; + +use crate::ddl::drop_table::DropTableProcedure; +use crate::error::{self, Result}; + +impl DropTableProcedure { + /// Fetches the table info and table route. + pub(crate) async fn fill_table_metadata(&mut self) -> Result<()> { + let task = &self.data.task; + let table_info_value = self + .context + .table_metadata_manager + .table_info_manager() + .get(task.table_id) + .await? + .with_context(|| error::TableInfoNotFoundSnafu { + table: format_full_table_name(&task.catalog, &task.schema, &task.table), + })?; + let (_, table_route_value) = self + .context + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_raw_physical_table_route(task.table_id) + .await?; + + self.data.table_info_value = Some(table_info_value); + self.data.table_route_value = Some(table_route_value); + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 2d62b11440..eadc4e42e6 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -15,6 +15,7 @@ pub mod alter_table; pub mod columns; pub mod create_table; +pub mod datanode_handler; use std::collections::HashMap; diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index 2e6829e1d1..15f55dca2f 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -15,7 +15,8 @@ use std::collections::HashMap; use api::v1::column_def::try_as_column_schema; -use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; +use api::v1::meta::Partition; +use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType}; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE}; use datatypes::schema::RawSchema; @@ -24,6 +25,9 @@ use store_api::storage::TableId; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; +use crate::ddl::test_util::columns::TestColumnDefBuilder; +use crate::rpc::ddl::CreateTableTask; + #[derive(Default, Builder)] #[builder(default)] pub struct TestCreateTableExpr { @@ -130,3 +134,47 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo { table_type: TableType::Base, } } + +pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask { + let create_table = TestCreateTableExprBuilder::default() + .column_defs([ + TestColumnDefBuilder::default() + .name("ts") + .data_type(ColumnDataType::TimestampMillisecond) + .semantic_type(SemanticType::Timestamp) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("host") + .data_type(ColumnDataType::String) + .semantic_type(SemanticType::Tag) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("cpu") + .data_type(ColumnDataType::Float64) + .semantic_type(SemanticType::Field) + .build() + .unwrap() + .into(), + ]) + .table_id(table_id) + .time_index("ts") + .primary_keys(["host".into()]) + .table_name(name) + .build() + .unwrap() + .into(); + let table_info = build_raw_table_info_from_expr(&create_table); + CreateTableTask { + create_table, + // Single region + partitions: vec![Partition { + column_list: vec![], + value_list: vec![], + }], + table_info, + } +} diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs new file mode 100644 index 0000000000..530705f7d3 --- /dev/null +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -0,0 +1,126 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::{QueryRequest, RegionRequest}; +use common_error::ext::BoxedError; +use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::debug; +use tokio::sync::mpsc; + +use crate::datanode_manager::HandleResponse; +use crate::error::{self, Error, Result}; +use crate::peer::Peer; +use crate::test_util::MockDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for () { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + unreachable!() + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[derive(Clone)] +pub struct DatanodeWatcher(pub mpsc::Sender<(Peer, RegionRequest)>); + +#[async_trait::async_trait] +impl MockDatanodeHandler for DatanodeWatcher { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); + self.0.send((peer.clone(), request)).await.unwrap(); + Ok(HandleResponse::new(0)) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[derive(Clone)] +pub struct RetryErrorDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for RetryErrorDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning retry later for request: {request:?}, peer: {peer:?}"); + Err(Error::RetryLater { + source: BoxedError::new( + error::UnexpectedSnafu { + err_msg: "retry later", + } + .build(), + ), + }) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[derive(Clone)] +pub struct UnexpectedErrorDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning mock error for request: {request:?}, peer: {peer:?}"); + error::UnexpectedSnafu { + err_msg: "mock error", + } + .fail() + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[derive(Clone)] +pub struct NaiveDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for NaiveDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); + Ok(HandleResponse::new(0)) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index 84a3b49849..ff0261037c 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -17,3 +17,4 @@ mod alter_table; mod create_logical_tables; mod create_table; mod drop_database; +mod drop_table; diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 2c8c62cf52..3ff121b09a 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -23,8 +23,8 @@ use common_procedure_test::MockContextProvider; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder; use crate::ddl::test_util::columns::TestColumnDefBuilder; +use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler; use crate::ddl::test_util::{create_logical_table, create_physical_table}; -use crate::ddl::tests::create_logical_tables::NaiveDatanodeHandler; use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound}; use crate::key::table_name::TableNameKey; use crate::rpc::ddl::AlterTableTask; diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index 062f1552b7..1a85ec5766 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -17,8 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::v1::alter_expr::Kind; -use api::v1::meta::Partition; -use api::v1::region::{region_request, QueryRequest, RegionRequest}; +use api::v1::region::{region_request, RegionRequest}; use api::v1::{ AddColumn, AddColumns, AlterExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn, DropColumns, SemanticType, @@ -26,70 +25,19 @@ use api::v1::{ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::debug; use store_api::storage::RegionId; -use table::metadata::TableId; use tokio::sync::mpsc::{self}; -use crate::datanode_manager::HandleResponse; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder; -use crate::ddl::test_util::columns::TestColumnDefBuilder; -use crate::ddl::test_util::create_table::{ - build_raw_table_info_from_expr, TestCreateTableExprBuilder, -}; -use crate::error::Result; +use crate::ddl::test_util::create_table::test_create_table_task; +use crate::ddl::test_util::datanode_handler::DatanodeWatcher; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::peer::Peer; -use crate::rpc::ddl::{AlterTableTask, CreateTableTask}; +use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{Region, RegionRoute}; -use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; - -fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask { - let create_table = TestCreateTableExprBuilder::default() - .column_defs([ - TestColumnDefBuilder::default() - .name("ts") - .data_type(ColumnDataType::TimestampMillisecond) - .semantic_type(SemanticType::Timestamp) - .build() - .unwrap() - .into(), - TestColumnDefBuilder::default() - .name("host") - .data_type(ColumnDataType::String) - .semantic_type(SemanticType::Tag) - .build() - .unwrap() - .into(), - TestColumnDefBuilder::default() - .name("cpu") - .data_type(ColumnDataType::Float64) - .semantic_type(SemanticType::Field) - .build() - .unwrap() - .into(), - ]) - .table_id(table_id) - .time_index("ts") - .primary_keys(["host".into()]) - .table_name(name) - .build() - .unwrap() - .into(); - let table_info = build_raw_table_info_from_expr(&create_table); - CreateTableTask { - create_table, - // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], - table_info, - } -} +use crate::test_util::{new_ddl_context, MockDatanodeManager}; fn test_rename_alter_table_task(table_name: &str, new_table_name: &str) -> AlterTableTask { let builder = TestAlterTableExprBuilder::default() @@ -137,26 +85,6 @@ async fn test_on_prepare_table_not_exists_err() { assert_matches!(err.status_code(), StatusCode::TableNotFound); } -#[derive(Clone)] -pub struct DatanodeWatcher(mpsc::Sender<(Peer, RegionRequest)>); - -#[async_trait::async_trait] -impl MockDatanodeHandler for DatanodeWatcher { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { - debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); - self.0.send((peer.clone(), request)).await.unwrap(); - Ok(HandleResponse::new(0)) - } - - async fn handle_query( - &self, - _peer: &Peer, - _request: QueryRequest, - ) -> Result { - unreachable!() - } -} - #[tokio::test] async fn test_on_submit_alter_request() { let (tx, mut rx) = mpsc::channel(8); diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index fdadcb75dd..74000cb557 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -15,25 +15,21 @@ use std::assert_matches::assert_matches; use std::sync::Arc; -use api::v1::region::{QueryRequest, RegionRequest}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; use common_procedure_test::MockContextProvider; -use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::debug; use store_api::storage::RegionId; -use crate::datanode_manager::HandleResponse; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler; use crate::ddl::test_util::{ create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task, }; use crate::ddl::{TableMetadata, TableMetadataAllocatorContext}; -use crate::error::{Error, Result}; +use crate::error::Error; use crate::key::table_route::TableRouteValue; -use crate::peer::Peer; -use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; +use crate::test_util::{new_ddl_context, MockDatanodeManager}; #[tokio::test] async fn test_on_prepare_physical_table_not_found() { @@ -229,25 +225,6 @@ async fn test_on_prepare_part_logical_tables_exist() { assert_matches!(status, Status::Executing { persist: true }); } -#[derive(Clone)] -pub struct NaiveDatanodeHandler; - -#[async_trait::async_trait] -impl MockDatanodeHandler for NaiveDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { - debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); - Ok(HandleResponse::new(0)) - } - - async fn handle_query( - &self, - _peer: &Peer, - _request: QueryRequest, - ) -> Result { - unreachable!() - } -} - #[tokio::test] async fn test_on_create_metadata() { let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 2ba289488d..f59be01d7d 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -17,42 +17,24 @@ use std::collections::HashMap; use std::sync::Arc; use api::v1::meta::Partition; -use api::v1::region::{QueryRequest, RegionRequest}; use api::v1::{ColumnDataType, SemanticType}; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; use common_procedure_test::MockContextProvider; -use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::debug; -use crate::datanode_manager::HandleResponse; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::test_util::columns::TestColumnDefBuilder; use crate::ddl::test_util::create_table::{ build_raw_table_info_from_expr, TestCreateTableExprBuilder, }; -use crate::error; -use crate::error::{Error, Result}; +use crate::ddl::test_util::datanode_handler::{ + NaiveDatanodeHandler, RetryErrorDatanodeHandler, UnexpectedErrorDatanodeHandler, +}; +use crate::error::Error; use crate::key::table_route::TableRouteValue; -use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; -use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; - -#[async_trait::async_trait] -impl MockDatanodeHandler for () { - async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { - unreachable!() - } - - async fn handle_query( - &self, - _peer: &Peer, - _request: QueryRequest, - ) -> Result { - unreachable!() - } -} +use crate::test_util::{new_ddl_context, MockDatanodeManager}; fn test_create_table_task(name: &str) -> CreateTableTask { let create_table = TestCreateTableExprBuilder::default() @@ -174,32 +156,6 @@ async fn test_on_prepare_with_no_partition_err() { .contains("The number of partitions must be greater than 0"),); } -#[derive(Clone)] -pub struct RetryErrorDatanodeHandler; - -#[async_trait::async_trait] -impl MockDatanodeHandler for RetryErrorDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { - debug!("Returning retry later for request: {request:?}, peer: {peer:?}"); - Err(Error::RetryLater { - source: BoxedError::new( - error::UnexpectedSnafu { - err_msg: "retry later", - } - .build(), - ), - }) - } - - async fn handle_query( - &self, - _peer: &Peer, - _request: QueryRequest, - ) -> Result { - unreachable!() - } -} - #[tokio::test] async fn test_on_datanode_create_regions_should_retry() { common_telemetry::init_default_ut_logging(); @@ -218,28 +174,6 @@ async fn test_on_datanode_create_regions_should_retry() { assert!(error.is_retry_later()); } -#[derive(Clone)] -pub struct UnexpectedErrorDatanodeHandler; - -#[async_trait::async_trait] -impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { - debug!("Returning mock error for request: {request:?}, peer: {peer:?}"); - error::UnexpectedSnafu { - err_msg: "mock error", - } - .fail() - } - - async fn handle_query( - &self, - _peer: &Peer, - _request: QueryRequest, - ) -> Result { - unreachable!() - } -} - #[tokio::test] async fn test_on_datanode_create_regions_should_not_retry() { common_telemetry::init_default_ut_logging(); @@ -258,25 +192,6 @@ async fn test_on_datanode_create_regions_should_not_retry() { assert!(!error.is_retry_later()); } -#[derive(Clone)] -pub struct NaiveDatanodeHandler; - -#[async_trait::async_trait] -impl MockDatanodeHandler for NaiveDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { - debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); - Ok(HandleResponse::new(0)) - } - - async fn handle_query( - &self, - _peer: &Peer, - _request: QueryRequest, - ) -> Result { - unreachable!() - } -} - #[tokio::test] async fn test_on_create_metadata_error() { common_telemetry::init_default_ut_logging(); diff --git a/src/common/meta/src/ddl/tests/drop_database.rs b/src/common/meta/src/ddl/tests/drop_database.rs index 8e5f2fe45a..7bdd83ca1f 100644 --- a/src/common/meta/src/ddl/tests/drop_database.rs +++ b/src/common/meta/src/ddl/tests/drop_database.rs @@ -20,8 +20,8 @@ use common_procedure_test::MockContextProvider; use futures::TryStreamExt; use crate::ddl::drop_database::DropDatabaseProcedure; +use crate::ddl::test_util::datanode_handler::{NaiveDatanodeHandler, RetryErrorDatanodeHandler}; use crate::ddl::test_util::{create_logical_table, create_physical_table}; -use crate::ddl::tests::create_table::{NaiveDatanodeHandler, RetryErrorDatanodeHandler}; use crate::key::schema_name::SchemaNameKey; use crate::test_util::{new_ddl_context, MockDatanodeManager}; diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs new file mode 100644 index 0000000000..9908f604a4 --- /dev/null +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -0,0 +1,193 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::region::{region_request, RegionRequest}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use store_api::storage::RegionId; +use tokio::sync::mpsc; + +use crate::ddl::drop_table::DropTableProcedure; +use crate::ddl::test_util::create_table::test_create_table_task; +use crate::ddl::test_util::datanode_handler::DatanodeWatcher; +use crate::key::table_route::TableRouteValue; +use crate::peer::Peer; +use crate::rpc::ddl::DropTableTask; +use crate::rpc::router::{Region, RegionRoute}; +use crate::test_util::{new_ddl_context, MockDatanodeManager}; + +#[tokio::test] +async fn test_on_prepare_table_not_exists_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let table_name = "foo"; + let table_id = 1024; + let task = test_create_table_task(table_name, table_id); + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let task = DropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: "bar".to_string(), + table_id, + drop_if_exists: false, + }; + + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); + let executor = procedure.executor(); + let err = procedure.on_prepare(&executor).await.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::TableNotFound); +} + +#[tokio::test] +async fn test_on_prepare_table() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let table_name = "foo"; + let table_id = 1024; + let task = test_create_table_task(table_name, table_id); + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let task = DropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: "bar".to_string(), + table_id, + drop_if_exists: true, + }; + + // Drop if exists + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); + let executor = procedure.executor(); + procedure.on_prepare(&executor).await.unwrap(); + + let task = DropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + table_id, + drop_if_exists: false, + }; + + // Drop table + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); + let executor = procedure.executor(); + procedure.on_prepare(&executor).await.unwrap(); +} + +#[tokio::test] +async fn test_on_datanode_drop_regions() { + let (tx, mut rx) = mpsc::channel(8); + let datanode_handler = DatanodeWatcher(tx); + let datanode_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![ + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5)], + leader_status: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 2)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![Peer::empty(4)], + leader_status: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 3)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_status: None, + leader_down_since: None, + }, + ]), + HashMap::new(), + ) + .await + .unwrap(); + + let task = DropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + table_id, + drop_if_exists: false, + }; + // Drop table + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); + let executor = procedure.executor(); + procedure.on_prepare(&executor).await.unwrap(); + procedure.on_datanode_drop_regions(&executor).await.unwrap(); + + let check = |peer: Peer, + request: RegionRequest, + expected_peer_id: u64, + expected_region_id: RegionId| { + assert_eq!(peer.id, expected_peer_id); + let Some(region_request::Body::Drop(req)) = request.body else { + unreachable!(); + }; + assert_eq!(req.region_id, expected_region_id); + }; + + let mut results = Vec::new(); + for _ in 0..3 { + let result = rx.try_recv().unwrap(); + results.push(result); + } + results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id)); + + let (peer, request) = results.remove(0); + check(peer, request, 1, RegionId::new(table_id, 1)); + let (peer, request) = results.remove(0); + check(peer, request, 2, RegionId::new(table_id, 2)); + let (peer, request) = results.remove(0); + check(peer, request, 3, RegionId::new(table_id, 3)); +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 13460a2b69..1577cab2dc 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -42,7 +42,6 @@ use crate::error::{ }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; -use crate::key::table_route::TableRouteValue; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::DdlTask::{ @@ -284,18 +283,10 @@ impl DdlManager { &self, cluster_id: ClusterId, drop_table_task: DropTableTask, - table_info_value: DeserializedValueWithBytes, - table_route_value: DeserializedValueWithBytes, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); - let procedure = DropTableProcedure::new( - cluster_id, - drop_table_task, - table_route_value, - table_info_value, - context, - ); + let procedure = DropTableProcedure::new(cluster_id, drop_table_task, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -475,30 +466,8 @@ async fn handle_drop_table_task( drop_table_task: DropTableTask, ) -> Result { let table_id = drop_table_task.table_id; - let table_metadata_manager = &ddl_manager.table_metadata_manager(); - let table_ref = drop_table_task.table_ref(); - - let table_info_value = table_metadata_manager - .table_info_manager() - .get(table_id) - .await?; - let (_, table_route_value) = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_raw_physical_table_route(table_id) - .await?; - - let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu { - table: table_ref.to_string(), - })?; - let (id, _) = ddl_manager - .submit_drop_table_task( - cluster_id, - drop_table_task, - table_info_value, - table_route_value, - ) + .submit_drop_table_task(cluster_id, drop_table_task) .await?; info!("Table: {table_id} is dropped via procedure_id {id:?}"); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 415c2bb505..03ddc2dbde 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; use api::v1::meta::Partition; -use api::v1::region::region_request::{self, Body as PbRegionRequest}; +use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType}; use client::client_manager::DatanodeClients; @@ -24,18 +24,13 @@ use common_catalog::consts::MITO2_ENGINE; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState}; use common_meta::ddl::create_table::*; -use common_meta::ddl::drop_table::executor::DropTableExecutor; -use common_meta::ddl::drop_table::DropTableProcedure; use common_meta::ddl::test_util::columns::TestColumnDefBuilder; use common_meta::ddl::test_util::create_table::{ build_raw_table_info_from_expr, TestCreateTableExprBuilder, }; -use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; -use common_meta::key::DeserializedValueWithBytes; -use common_meta::rpc::ddl::{CreateTableTask, DropTableTask}; +use common_meta::rpc::ddl::CreateTableTask; use common_meta::rpc::router::{find_leaders, RegionRoute}; -use common_meta::table_name::TableName; use common_procedure::Status; use store_api::storage::RegionId; @@ -310,62 +305,3 @@ async fn test_on_datanode_create_logical_regions() { let status = procedure.on_create_metadata().await.unwrap(); assert!(status.is_done()); } - -#[tokio::test] -async fn test_on_datanode_drop_regions() { - let drop_table_task = DropTableTask { - catalog: "my_catalog".to_string(), - schema: "my_schema".to_string(), - table: "my_table".to_string(), - table_id: 42, - drop_if_exists: false, - }; - let drop_table_executor = DropTableExecutor::new( - TableName::new("my_catalog", "my_schema", "my_table"), - 42, - false, - ); - let (region_server, mut rx) = EchoRegionServer::new(); - let region_routes = test_data::new_region_routes(); - let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; - - let procedure = DropTableProcedure::new( - 1, - drop_table_task, - DeserializedValueWithBytes::from_inner(TableRouteValue::physical(region_routes)), - DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())), - test_data::new_ddl_context(datanode_manager), - ); - - let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([ - RegionId::new(42, 1), - RegionId::new(42, 2), - RegionId::new(42, 3), - ]))); - let handle = tokio::spawn({ - let expected_dropped_regions = expected_dropped_regions.clone(); - let mut max_recv = expected_dropped_regions.lock().unwrap().len(); - async move { - while let Some(region_request::Body::Drop(request)) = rx.recv().await { - let region_id = RegionId::from_u64(request.region_id); - - expected_dropped_regions.lock().unwrap().remove(®ion_id); - - max_recv -= 1; - if max_recv == 0 { - break; - } - } - } - }); - - let status = procedure - .on_datanode_drop_regions(&drop_table_executor) - .await - .unwrap(); - assert!(status.is_done()); - - handle.await.unwrap(); - - assert!(expected_dropped_regions.lock().unwrap().is_empty()); -}