test: add tests for drop databases (#3594)

* refactor: minimize visibility of drop database steps

* feat: implement as_any

* refactor: move common functions to test_util

* test: add tests for drop databases

* fix: fix deteling physical table route unexpectedly
This commit is contained in:
Weny Xu
2024-03-27 17:18:37 +08:00
committed by GitHub
parent 623c930736
commit 5addb7d75a
11 changed files with 742 additions and 181 deletions

View File

@@ -17,6 +17,7 @@ pub mod end;
pub mod executor;
pub mod metadata;
pub mod start;
use std::any::Any;
use std::fmt::Debug;
use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu};
@@ -43,14 +44,14 @@ pub struct DropDatabaseProcedure {
}
/// Target of dropping tables.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum DropTableTarget {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum DropTableTarget {
Logical,
Physical,
}
/// Context of [DropDatabaseProcedure] execution.
pub struct DropDatabaseContext {
pub(crate) struct DropDatabaseContext {
catalog: String,
schema: String,
drop_if_exists: bool,
@@ -66,6 +67,9 @@ pub(crate) trait State: Send + Debug {
ddl_ctx: &DdlContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)>;
/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}
impl DropDatabaseProcedure {

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::Status;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
@@ -27,8 +29,8 @@ use crate::key::table_route::TableRouteValue;
use crate::table_name::TableName;
#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseCursor {
target: DropTableTarget,
pub(crate) struct DropDatabaseCursor {
pub(crate) target: DropTableTarget,
}
impl DropDatabaseCursor {
@@ -41,16 +43,13 @@ impl DropDatabaseCursor {
&mut self,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
// Consumes the tables stream.
ctx.tables.take();
match self.target {
DropTableTarget::Logical => {
// Consumes the tables stream.
ctx.tables.take();
Ok((
Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
Status::executing(true),
))
}
DropTableTarget::Logical => Ok((
Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
Status::executing(true),
)),
DropTableTarget::Physical => Ok((
Box::new(DropDatabaseRemoveMetadata),
Status::executing(true),
@@ -68,12 +67,12 @@ impl DropDatabaseCursor {
) -> Result<(Box<dyn State>, Status)> {
match (self.target, table_route_value) {
(DropTableTarget::Logical, TableRouteValue::Logical(route)) => {
let table_id = route.physical_table_id();
let physical_table_id = route.physical_table_id();
let (_, table_route) = ddl_ctx
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.get_physical_table_route(physical_table_id)
.await?;
Ok((
Box::new(DropDatabaseExecutor::new(
@@ -141,4 +140,108 @@ impl State for DropDatabaseCursor {
None => self.handle_reach_end(ctx),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
async fn test_next_without_logical_tables() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
create_physical_table(ddl_context.clone(), 0, "phy").await;
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
// Ticks
let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
assert!(ctx.tables.is_none());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
let executor = state
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.unwrap();
assert_eq!(executor.target, DropTableTarget::Physical);
}
#[tokio::test]
async fn test_next_with_logical_tables() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await;
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
let executor = state
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.unwrap();
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
assert_eq!(table_route.region_routes, executor.region_routes);
assert_eq!(executor.target, DropTableTarget::Logical);
}
#[tokio::test]
async fn test_reach_the_end() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
state
.as_any()
.downcast_ref::<DropDatabaseRemoveMetadata>()
.unwrap();
assert!(ctx.tables.is_none());
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
@@ -20,7 +22,7 @@ use crate::ddl::DdlContext;
use crate::error::Result;
#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseEnd;
pub(crate) struct DropDatabaseEnd;
#[async_trait::async_trait]
#[typetag::serde]
@@ -32,4 +34,8 @@ impl State for DropDatabaseEnd {
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(DropDatabaseEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::Status;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
@@ -29,11 +31,11 @@ use crate::rpc::router::{operating_leader_regions, RegionRoute};
use crate::table_name::TableName;
#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseExecutor {
pub(crate) struct DropDatabaseExecutor {
table_id: TableId,
table_name: TableName,
region_routes: Vec<RegionRoute>,
target: DropTableTarget,
pub(crate) region_routes: Vec<RegionRoute>,
pub(crate) target: DropTableTarget,
#[serde(skip)]
dropping_regions: Vec<OperatingRegionGuard>,
}
@@ -99,4 +101,196 @@ impl State for DropDatabaseExecutor {
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use crate::datanode_manager::HandleResponse;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::error::{self, Error, Result};
use crate::peer::Peer;
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
#[derive(Clone)]
pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
Ok(HandleResponse::new(0))
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_next_with_physical_table() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes.clone(),
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
}
// Execute again
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
DropTableTarget::Physical,
);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
}
#[tokio::test]
async fn test_next_logical_table() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric").await;
let logical_table_id = physical_table_id + 1;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(logical_table_id)
.await
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
table_route.region_routes.clone(),
DropTableTarget::Logical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
}
// Execute again
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
DropTableTarget::Logical,
);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
}
#[derive(Clone)]
pub struct RetryErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_next_retryable_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
assert!(err.is_retry_later());
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
@@ -22,7 +24,7 @@ use crate::error::Result;
use crate::key::schema_name::SchemaNameKey;
#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseRemoveMetadata;
pub(crate) struct DropDatabaseRemoveMetadata;
#[async_trait::async_trait]
#[typetag::serde]
@@ -40,4 +42,58 @@ impl State for DropDatabaseRemoveMetadata {
return Ok((Box::new(DropDatabaseEnd), Status::done()));
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::ddl::drop_database::end::DropDatabaseEnd;
use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
use crate::ddl::drop_database::{DropDatabaseContext, State};
use crate::key::schema_name::SchemaNameKey;
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
async fn test_next() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
ddl_context
.table_metadata_manager
.schema_manager()
.create(SchemaNameKey::new("foo", "bar"), None, true)
.await
.unwrap();
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
assert!(status.is_done());
assert!(!ddl_context
.table_metadata_manager
.schema_manager()
.exists(SchemaNameKey::new("foo", "bar"))
.await
.unwrap());
// Schema not exists
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
assert!(status.is_done());
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use snafu::ensure;
@@ -24,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::schema_name::SchemaNameKey;
#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseStart;
pub(crate) struct DropDatabaseStart;
#[async_trait::async_trait]
#[typetag::serde]
@@ -62,4 +64,75 @@ impl State for DropDatabaseStart {
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::end::DropDatabaseEnd;
use crate::ddl::drop_database::start::DropDatabaseStart;
use crate::ddl::drop_database::{DropDatabaseContext, State};
use crate::error;
use crate::key::schema_name::SchemaNameKey;
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
async fn test_schema_not_exists_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let mut step = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,
tables: None,
};
let err = step.next(&ddl_context, &mut ctx).await.unwrap_err();
assert_matches!(err, error::Error::SchemaNotFound { .. });
}
#[tokio::test]
async fn test_schema_not_exists() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
assert!(status.is_done());
}
#[tokio::test]
async fn test_next() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
ddl_context
.table_metadata_manager
.schema_manager()
.create(SchemaNameKey::new("foo", "bar"), None, true)
.await
.unwrap();
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert!(status.need_persist());
}
}

View File

@@ -15,3 +15,158 @@
pub mod alter_table;
pub mod columns;
pub mod create_table;
use std::collections::HashMap;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, SemanticType};
use common_procedure::Status;
use table::metadata::{RawTableInfo, TableId};
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::CreateTableTask;
use crate::ClusterId;
pub async fn create_physical_table_metadata(
ddl_context: &DdlContext,
table_info: RawTableInfo,
table_route: TableRouteValue,
) {
ddl_context
.table_metadata_manager
.create_table_metadata(table_info, table_route, HashMap::default())
.await
.unwrap();
}
pub async fn create_physical_table(
ddl_context: DdlContext,
cluster_id: ClusterId,
name: &str,
) -> TableId {
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task(name);
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
table_id
}
pub async fn create_logical_table(
ddl_context: DdlContext,
cluster_id: ClusterId,
physical_table_id: TableId,
table_name: &str,
) {
use std::assert_matches::assert_matches;
let tasks = vec![test_create_logical_table_task(table_name)];
let mut procedure =
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let status = procedure.on_create_metadata().await.unwrap();
assert_matches!(status, Status::Done { .. });
}
pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("value")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.time_index("ts")
.primary_keys(["value".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}

View File

@@ -15,3 +15,4 @@
mod alter_logical_tables;
mod create_logical_tables;
mod create_table;
mod drop_database;

View File

@@ -19,22 +19,16 @@ use api::v1::{ColumnDataType, SemanticType};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::{Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use table::metadata::TableId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::tests::create_logical_tables;
use crate::ddl::tests::create_logical_tables::{
test_create_physical_table_task, NaiveDatanodeHandler,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::ddl::tests::create_logical_tables::NaiveDatanodeHandler;
use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound};
use crate::key::table_name::TableNameKey;
use crate::rpc::ddl::AlterTableTask;
use crate::test_util::{new_ddl_context, MockDatanodeManager};
use crate::ClusterId;
fn make_alter_logical_table_add_column_task(
schema: Option<&str>,
@@ -128,53 +122,6 @@ async fn test_on_prepare_check_alter_kind() {
assert_matches!(err, AlterLogicalTablesInvalidArguments { .. });
}
async fn create_physical_table(
ddl_context: DdlContext,
cluster_id: ClusterId,
name: &str,
) -> TableId {
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task(name);
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_logical_tables::create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
table_id
}
async fn create_logical_table(
ddl_context: DdlContext,
cluster_id: ClusterId,
physical_table_id: TableId,
table_name: &str,
) {
let tasks = vec![create_logical_tables::test_create_logical_table_task(
table_name,
)];
let mut procedure =
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let status = procedure.on_create_metadata().await.unwrap();
assert_matches!(status, Status::Done { .. });
}
#[tokio::test]
async fn test_on_prepare_different_physical_table() {
let cluster_id = 1;

View File

@@ -13,12 +13,9 @@
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::Partition;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::{ColumnDataType, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
@@ -26,104 +23,18 @@ use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use crate::datanode_manager::HandleResponse;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
use crate::ddl::test_util::{
create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
pub(crate) fn test_create_logical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
pub(crate) fn test_create_physical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("value")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.time_index("ts")
.primary_keys(["value".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
#[tokio::test]
async fn test_on_prepare_physical_table_not_found() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
@@ -137,18 +48,6 @@ async fn test_on_prepare_physical_table_not_found() {
assert_matches!(err, Error::TableRouteNotFound { .. });
}
pub(crate) async fn create_physical_table_metadata(
ddl_context: &DdlContext,
table_info: RawTableInfo,
table_route: TableRouteValue,
) {
ddl_context
.table_metadata_manager
.create_table_metadata(table_info, table_route, HashMap::default())
.await
.unwrap();
}
#[tokio::test]
async fn test_on_prepare() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));

View File

@@ -0,0 +1,123 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use futures::TryStreamExt;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::ddl::tests::create_table::{NaiveDatanodeHandler, RetryErrorDatanodeHandler};
use crate::key::schema_name::SchemaNameKey;
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
async fn test_drop_database_with_logical_tables() {
common_telemetry::init_default_ut_logging();
let cluster_id = 1;
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
ddl_context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
None,
false,
)
.await
.unwrap();
// Creates physical table
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
// Creates 3 logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await;
let mut procedure = DropDatabaseProcedure::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
false,
ddl_context.clone(),
);
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
while !procedure.execute(&ctx).await.unwrap().is_done() {
procedure.execute(&ctx).await.unwrap();
}
let tables = ddl_context
.table_metadata_manager
.table_name_manager()
.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert!(tables.is_empty());
}
#[tokio::test]
async fn test_drop_database_retryable_error() {
common_telemetry::init_default_ut_logging();
let cluster_id = 1;
let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
ddl_context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
None,
false,
)
.await
.unwrap();
// Creates physical table
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
// Creates 3 logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await;
let mut procedure = DropDatabaseProcedure::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
false,
ddl_context.clone(),
);
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
loop {
match procedure.execute(&ctx).await {
Ok(_) => {
// go next
}
Err(err) => {
assert!(err.is_retry_later());
break;
}
}
}
}