refactor: drop table procedure (#3688)

* refactor: refactor drop table procedure

* refactor: refactor test utils
This commit is contained in:
Weny Xu
2024-04-10 20:22:10 +08:00
committed by GitHub
parent c00c1d95ee
commit c1e005b148
14 changed files with 473 additions and 333 deletions

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod executor;
pub(crate) mod executor;
mod metadata;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
@@ -23,7 +24,7 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::TableId;
use table::table_reference::TableReference;
use self::executor::DropTableExecutor;
@@ -51,16 +52,10 @@ pub struct DropTableProcedure {
impl DropTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
pub fn new(
cluster_id: u64,
task: DropTableTask,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
context: DdlContext,
) -> Self {
pub fn new(cluster_id: u64, task: DropTableTask, context: DdlContext) -> Self {
Self {
context,
data: DropTableData::new(cluster_id, task, table_route_value, table_info_value),
data: DropTableData::new(cluster_id, task),
dropping_regions: vec![],
}
}
@@ -74,10 +69,11 @@ impl DropTableProcedure {
})
}
async fn on_prepare<'a>(&mut self, executor: &DropTableExecutor) -> Result<Status> {
pub(crate) async fn on_prepare<'a>(&mut self, executor: &DropTableExecutor) -> Result<Status> {
if executor.on_prepare(&self.context).await?.stop() {
return Ok(Status::done());
}
self.fill_table_metadata().await?;
self.data.state = DropTableState::RemoveMetadata;
Ok(Status::executing(true))
@@ -85,7 +81,8 @@ impl DropTableProcedure {
/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
let region_routes = self.data.region_routes()?;
// Safety: filled in `on_prepare`.
let region_routes = self.data.region_routes().unwrap()?;
let dropping_regions = operating_leader_regions(region_routes);
@@ -121,7 +118,11 @@ impl DropTableProcedure {
// TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
let table_id = self.data.table_id();
executor
.on_remove_metadata(&self.context, self.data.region_routes()?)
.on_remove_metadata(
&self.context,
// Safety: filled in `on_prepare`.
self.data.region_routes().unwrap()?,
)
.await?;
info!("Deleted table metadata for table {table_id}");
self.data.state = DropTableState::InvalidateTableCache;
@@ -138,10 +139,22 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&self, executor: &DropTableExecutor) -> Result<Status> {
executor
.on_drop_regions(&self.context, self.data.region_routes()?)
.on_drop_regions(
&self.context,
// Safety: filled in `on_prepare`.
self.data.region_routes().unwrap()?,
)
.await?;
Ok(Status::done())
}
pub(crate) fn executor(&self) -> DropTableExecutor {
DropTableExecutor::new(
self.data.task.table_name(),
self.data.table_id(),
self.data.task.drop_if_exists,
)
}
}
#[async_trait]
@@ -151,11 +164,7 @@ impl Procedure for DropTableProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let executor = DropTableExecutor::new(
self.data.task.table_name(),
self.data.table_id(),
self.data.task.drop_if_exists,
);
let executor = self.executor();
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
.with_label_values(&[state.as_ref()])
@@ -188,28 +197,22 @@ impl Procedure for DropTableProcedure {
}
#[derive(Debug, Serialize, Deserialize)]
/// TODO(weny): simplify the table data.
pub struct DropTableData {
pub state: DropTableState,
pub cluster_id: u64,
pub task: DropTableTask,
pub table_route_value: DeserializedValueWithBytes<TableRouteValue>,
pub table_info_value: DeserializedValueWithBytes<TableInfoValue>,
pub table_route_value: Option<DeserializedValueWithBytes<TableRouteValue>>,
pub table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
}
impl DropTableData {
pub fn new(
cluster_id: u64,
task: DropTableTask,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
) -> Self {
pub fn new(cluster_id: u64, task: DropTableTask) -> Self {
Self {
state: DropTableState::Prepare,
cluster_id,
task,
table_info_value,
table_route_value,
table_route_value: None,
table_info_value: None,
}
}
@@ -217,19 +220,16 @@ impl DropTableData {
self.task.table_ref()
}
fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
self.table_route_value.region_routes()
}
fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
fn region_routes(&self) -> Option<Result<&Vec<RegionRoute>>> {
self.table_route_value.as_ref().map(|v| v.region_routes())
}
fn table_id(&self) -> TableId {
self.table_info().ident.table_id
self.task.table_id
}
}
/// The state of drop table.
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
pub enum DropTableState {
/// Prepares to drop the table

View File

@@ -0,0 +1,46 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_table_name;
use snafu::OptionExt;
use crate::ddl::drop_table::DropTableProcedure;
use crate::error::{self, Result};
impl DropTableProcedure {
/// Fetches the table info and table route.
pub(crate) async fn fill_table_metadata(&mut self) -> Result<()> {
let task = &self.data.task;
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(task.table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?;
let (_, table_route_value) = self
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw_physical_table_route(task.table_id)
.await?;
self.data.table_info_value = Some(table_info_value);
self.data.table_route_value = Some(table_route_value);
Ok(())
}
}

View File

@@ -15,6 +15,7 @@
pub mod alter_table;
pub mod columns;
pub mod create_table;
pub mod datanode_handler;
use std::collections::HashMap;

View File

@@ -15,7 +15,8 @@
use std::collections::HashMap;
use api::v1::column_def::try_as_column_schema;
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
use datatypes::schema::RawSchema;
@@ -24,6 +25,9 @@ use store_api::storage::TableId;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::rpc::ddl::CreateTableTask;
#[derive(Default, Builder)]
#[builder(default)]
pub struct TestCreateTableExpr {
@@ -130,3 +134,47 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
table_type: TableType::Base,
}
}
pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.table_id(table_id)
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}

View File

@@ -0,0 +1,126 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{QueryRequest, RegionRequest};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use tokio::sync::mpsc;
use crate::datanode_manager::HandleResponse;
use crate::error::{self, Error, Result};
use crate::peer::Peer;
use crate::test_util::MockDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for () {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
unreachable!()
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[derive(Clone)]
pub struct DatanodeWatcher(pub mpsc::Sender<(Peer, RegionRequest)>);
#[async_trait::async_trait]
impl MockDatanodeHandler for DatanodeWatcher {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
self.0.send((peer.clone(), request)).await.unwrap();
Ok(HandleResponse::new(0))
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[derive(Clone)]
pub struct RetryErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[derive(Clone)]
pub struct UnexpectedErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
error::UnexpectedSnafu {
err_msg: "mock error",
}
.fail()
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[derive(Clone)]
pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(HandleResponse::new(0))
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}

View File

@@ -17,3 +17,4 @@ mod alter_table;
mod create_logical_tables;
mod create_table;
mod drop_database;
mod drop_table;

View File

@@ -23,8 +23,8 @@ use common_procedure_test::MockContextProvider;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::ddl::tests::create_logical_tables::NaiveDatanodeHandler;
use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound};
use crate::key::table_name::TableNameKey;
use crate::rpc::ddl::AlterTableTask;

View File

@@ -17,8 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::meta::Partition;
use api::v1::region::{region_request, QueryRequest, RegionRequest};
use api::v1::region::{region_request, RegionRequest};
use api::v1::{
AddColumn, AddColumns, AlterExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
DropColumns, SemanticType,
@@ -26,70 +25,19 @@ use api::v1::{
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::{self};
use crate::datanode_manager::HandleResponse;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::error::Result;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::datanode_handler::DatanodeWatcher;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::{AlterTableTask, CreateTableTask};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.table_id(table_id)
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
use crate::test_util::{new_ddl_context, MockDatanodeManager};
fn test_rename_alter_table_task(table_name: &str, new_table_name: &str) -> AlterTableTask {
let builder = TestAlterTableExprBuilder::default()
@@ -137,26 +85,6 @@ async fn test_on_prepare_table_not_exists_err() {
assert_matches!(err.status_code(), StatusCode::TableNotFound);
}
#[derive(Clone)]
pub struct DatanodeWatcher(mpsc::Sender<(Peer, RegionRequest)>);
#[async_trait::async_trait]
impl MockDatanodeHandler for DatanodeWatcher {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
self.0.send((peer.clone(), request)).await.unwrap();
Ok(HandleResponse::new(0))
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_submit_alter_request() {
let (tx, mut rx) = mpsc::channel(8);

View File

@@ -15,25 +15,21 @@
use std::assert_matches::assert_matches;
use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use store_api::storage::RegionId;
use crate::datanode_manager::HandleResponse;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use crate::ddl::test_util::{
create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task,
};
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Error, Result};
use crate::error::Error;
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
async fn test_on_prepare_physical_table_not_found() {
@@ -229,25 +225,6 @@ async fn test_on_prepare_part_logical_tables_exist() {
assert_matches!(status, Status::Executing { persist: true });
}
#[derive(Clone)]
pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(HandleResponse::new(0))
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_create_metadata() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));

View File

@@ -17,42 +17,24 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::Partition;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::{ColumnDataType, SemanticType};
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use crate::datanode_manager::HandleResponse;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::error;
use crate::error::{Error, Result};
use crate::ddl::test_util::datanode_handler::{
NaiveDatanodeHandler, RetryErrorDatanodeHandler, UnexpectedErrorDatanodeHandler,
};
use crate::error::Error;
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
#[async_trait::async_trait]
impl MockDatanodeHandler for () {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
unreachable!()
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
use crate::test_util::{new_ddl_context, MockDatanodeManager};
fn test_create_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
@@ -174,32 +156,6 @@ async fn test_on_prepare_with_no_partition_err() {
.contains("The number of partitions must be greater than 0"),);
}
#[derive(Clone)]
pub struct RetryErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_datanode_create_regions_should_retry() {
common_telemetry::init_default_ut_logging();
@@ -218,28 +174,6 @@ async fn test_on_datanode_create_regions_should_retry() {
assert!(error.is_retry_later());
}
#[derive(Clone)]
pub struct UnexpectedErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
error::UnexpectedSnafu {
err_msg: "mock error",
}
.fail()
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_datanode_create_regions_should_not_retry() {
common_telemetry::init_default_ut_logging();
@@ -258,25 +192,6 @@ async fn test_on_datanode_create_regions_should_not_retry() {
assert!(!error.is_retry_later());
}
#[derive(Clone)]
pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(HandleResponse::new(0))
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_create_metadata_error() {
common_telemetry::init_default_ut_logging();

View File

@@ -20,8 +20,8 @@ use common_procedure_test::MockContextProvider;
use futures::TryStreamExt;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::test_util::datanode_handler::{NaiveDatanodeHandler, RetryErrorDatanodeHandler};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::ddl::tests::create_table::{NaiveDatanodeHandler, RetryErrorDatanodeHandler};
use crate::key::schema_name::SchemaNameKey;
use crate::test_util::{new_ddl_context, MockDatanodeManager};

View File

@@ -0,0 +1,193 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::region::{region_request, RegionRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::datanode_handler::DatanodeWatcher;
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
async fn test_on_prepare_table_not_exists_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_name = "foo";
let table_id = 1024;
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "bar".to_string(),
table_id,
drop_if_exists: false,
};
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
let executor = procedure.executor();
let err = procedure.on_prepare(&executor).await.unwrap_err();
assert_eq!(err.status_code(), StatusCode::TableNotFound);
}
#[tokio::test]
async fn test_on_prepare_table() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_name = "foo";
let table_id = 1024;
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "bar".to_string(),
table_id,
drop_if_exists: true,
};
// Drop if exists
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table_name.to_string(),
table_id,
drop_if_exists: false,
};
// Drop table
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
}
#[tokio::test]
async fn test_on_datanode_drop_regions() {
let (tx, mut rx) = mpsc::channel(8);
let datanode_handler = DatanodeWatcher(tx);
let datanode_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_id = 1024;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
]),
HashMap::new(),
)
.await
.unwrap();
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table_name.to_string(),
table_id,
drop_if_exists: false,
};
// Drop table
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
procedure.on_datanode_drop_regions(&executor).await.unwrap();
let check = |peer: Peer,
request: RegionRequest,
expected_peer_id: u64,
expected_region_id: RegionId| {
assert_eq!(peer.id, expected_peer_id);
let Some(region_request::Body::Drop(req)) = request.body else {
unreachable!();
};
assert_eq!(req.region_id, expected_region_id);
};
let mut results = Vec::new();
for _ in 0..3 {
let result = rx.try_recv().unwrap();
results.push(result);
}
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
let (peer, request) = results.remove(0);
check(peer, request, 1, RegionId::new(table_id, 1));
let (peer, request) = results.remove(0);
check(peer, request, 2, RegionId::new(table_id, 2));
let (peer, request) = results.remove(0);
check(peer, request, 3, RegionId::new(table_id, 3));
}

View File

@@ -42,7 +42,6 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::DdlTask::{
@@ -284,18 +283,10 @@ impl DdlManager {
&self,
cluster_id: ClusterId,
drop_table_task: DropTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = DropTableProcedure::new(
cluster_id,
drop_table_task,
table_route_value,
table_info_value,
context,
);
let procedure = DropTableProcedure::new(cluster_id, drop_table_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -475,30 +466,8 @@ async fn handle_drop_table_task(
drop_table_task: DropTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_id = drop_table_task.table_id;
let table_metadata_manager = &ddl_manager.table_metadata_manager();
let table_ref = drop_table_task.table_ref();
let table_info_value = table_metadata_manager
.table_info_manager()
.get(table_id)
.await?;
let (_, table_route_value) = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw_physical_table_route(table_id)
.await?;
let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
table: table_ref.to_string(),
})?;
let (id, _) = ddl_manager
.submit_drop_table_task(
cluster_id,
drop_table_task,
table_info_value,
table_route_value,
)
.submit_drop_table_task(cluster_id, drop_table_task)
.await?;
info!("Table: {table_id} is dropped via procedure_id {id:?}");

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use api::v1::meta::Partition;
use api::v1::region::region_request::{self, Body as PbRegionRequest};
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
use client::client_manager::DatanodeClients;
@@ -24,18 +24,13 @@ use common_catalog::consts::MITO2_ENGINE;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState};
use common_meta::ddl::create_table::*;
use common_meta::ddl::drop_table::executor::DropTableExecutor;
use common_meta::ddl::drop_table::DropTableProcedure;
use common_meta::ddl::test_util::columns::TestColumnDefBuilder;
use common_meta::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use common_meta::key::DeserializedValueWithBytes;
use common_meta::rpc::ddl::{CreateTableTask, DropTableTask};
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_meta::table_name::TableName;
use common_procedure::Status;
use store_api::storage::RegionId;
@@ -310,62 +305,3 @@ async fn test_on_datanode_create_logical_regions() {
let status = procedure.on_create_metadata().await.unwrap();
assert!(status.is_done());
}
#[tokio::test]
async fn test_on_datanode_drop_regions() {
let drop_table_task = DropTableTask {
catalog: "my_catalog".to_string(),
schema: "my_schema".to_string(),
table: "my_table".to_string(),
table_id: 42,
drop_if_exists: false,
};
let drop_table_executor = DropTableExecutor::new(
TableName::new("my_catalog", "my_schema", "my_table"),
42,
false,
);
let (region_server, mut rx) = EchoRegionServer::new();
let region_routes = test_data::new_region_routes();
let datanode_manager = new_datanode_manager(&region_server, &region_routes).await;
let procedure = DropTableProcedure::new(
1,
drop_table_task,
DeserializedValueWithBytes::from_inner(TableRouteValue::physical(region_routes)),
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
test_data::new_ddl_context(datanode_manager),
);
let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_dropped_regions = expected_dropped_regions.clone();
let mut max_recv = expected_dropped_regions.lock().unwrap().len();
async move {
while let Some(region_request::Body::Drop(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_dropped_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure
.on_datanode_drop_regions(&drop_table_executor)
.await
.unwrap();
assert!(status.is_done());
handle.await.unwrap();
assert!(expected_dropped_regions.lock().unwrap().is_empty());
}