diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 7af538f785..aeab51190c 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -36,8 +36,7 @@ pub mod create_database; pub mod create_flow; pub mod create_logical_tables; pub mod create_table; -mod create_table_template; -pub(crate) use create_table_template::{CreateRequestBuilder, build_template_from_raw_table_info}; +pub(crate) use create_table::{CreateRequestBuilder, build_template_from_raw_table_info}; pub mod create_view; pub mod drop_database; pub mod drop_flow; diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index c9ee183a17..1c53bc2e4e 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -22,7 +22,7 @@ use store_api::storage::{RegionId, TableId}; use table::metadata::RawTableInfo; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; -use crate::ddl::create_table_template::{ +use crate::ddl::create_table::template::{ CreateRequestBuilder, build_template, build_template_from_raw_table_info, }; use crate::ddl::utils::region_storage_path; diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 6bb01f584a..48c677aa5b 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -12,74 +12,99 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod executor; +pub(crate) mod template; + use std::collections::HashMap; -use api::v1::region::region_request::Body as PbRegionRequest; -use api::v1::region::{RegionRequest, RegionRequestHeader}; +use api::v1::CreateTableExpr; use async_trait::async_trait; use common_error::ext::BoxedError; use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status}; -use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{info, warn}; -use futures::future::join_all; +use common_telemetry::info; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt, ensure}; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; -use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; -use store_api::storage::{RegionId, RegionNumber}; +use store_api::storage::RegionNumber; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; +use table::table_name::TableName; use table::table_reference::TableReference; +pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info}; -use crate::ddl::create_table_template::{CreateRequestBuilder, build_template}; -use crate::ddl::utils::raw_table_info::update_table_info_column_ids; -use crate::ddl::utils::{ - add_peer_context_if_needed, convert_region_routes_to_detecting_regions, - extract_column_metadatas, map_to_procedure_error, region_storage_path, -}; +use crate::ddl::create_table::executor::CreateTableExecutor; +use crate::ddl::create_table::template::build_template; +use crate::ddl::utils::map_to_procedure_error; use crate::ddl::{DdlContext, TableMetadata}; use crate::error::{self, Result}; -use crate::key::table_name::TableNameKey; -use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; +use crate::key::table_route::PhysicalTableRouteValue; use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::{ - RegionRoute, find_leader_regions, find_leaders, operating_leader_regions, -}; +use crate::rpc::router::{RegionRoute, operating_leader_regions}; + pub struct CreateTableProcedure { pub context: DdlContext, - pub creator: TableCreator, + /// The serializable data. + pub data: CreateTableData, + /// The guards of opening. + pub opening_regions: Vec, + /// The executor of the procedure. + pub executor: CreateTableExecutor, +} + +fn build_executor_from_create_table_data( + create_table_expr: &CreateTableExpr, +) -> Result { + let template = build_template(create_table_expr)?; + let builder = CreateRequestBuilder::new(template, None); + let table_name = TableName::new( + create_table_expr.catalog_name.clone(), + create_table_expr.schema_name.clone(), + create_table_expr.table_name.clone(), + ); + let executor = + CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder); + Ok(executor) } impl CreateTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; - pub fn new(task: CreateTableTask, context: DdlContext) -> Self { - Self { + pub fn new(task: CreateTableTask, context: DdlContext) -> Result { + let executor = build_executor_from_create_table_data(&task.create_table)?; + + Ok(Self { context, - creator: TableCreator::new(task), - } + data: CreateTableData::new(task), + opening_regions: vec![], + executor, + }) } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { - let data = serde_json::from_str(json).context(FromJsonSnafu)?; + let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + let create_table_expr = &data.task.create_table; + let executor = build_executor_from_create_table_data(create_table_expr) + .map_err(BoxedError::new) + .context(ExternalSnafu { + clean_poisons: false, + })?; Ok(CreateTableProcedure { context, - creator: TableCreator { - data, - opening_regions: vec![], - }, + data, + opening_regions: vec![], + executor, }) } fn table_info(&self) -> &RawTableInfo { - &self.creator.data.task.table_info + &self.data.task.table_info } pub(crate) fn table_id(&self) -> TableId { @@ -87,8 +112,7 @@ impl CreateTableProcedure { } fn region_wal_options(&self) -> Result<&HashMap> { - self.creator - .data + self.data .region_wal_options .as_ref() .context(error::UnexpectedSnafu { @@ -97,8 +121,7 @@ impl CreateTableProcedure { } fn table_route(&self) -> Result<&PhysicalTableRouteValue> { - self.creator - .data + self.data .table_route .as_ref() .context(error::UnexpectedSnafu { @@ -106,17 +129,6 @@ impl CreateTableProcedure { }) } - #[cfg(any(test, feature = "testing"))] - pub fn set_allocated_metadata( - &mut self, - table_id: TableId, - table_route: PhysicalTableRouteValue, - region_wal_options: HashMap, - ) { - self.creator - .set_allocated_metadata(table_id, table_route, region_wal_options) - } - /// On the prepare step, it performs: /// - Checks whether the table exists. /// - Allocates the table id. @@ -125,31 +137,16 @@ impl CreateTableProcedure { /// - TableName exists and `create_if_not_exists` is false. /// - Failed to allocate [TableMetadata]. pub(crate) async fn on_prepare(&mut self) -> Result { - let expr = &self.creator.data.task.create_table; - let table_name_value = self - .context - .table_metadata_manager - .table_name_manager() - .get(TableNameKey::new( - &expr.catalog_name, - &expr.schema_name, - &expr.table_name, - )) + let table_id = self + .executor + .on_prepare(&self.context.table_metadata_manager) .await?; - - if let Some(value) = table_name_value { - ensure!( - expr.create_if_not_exists, - error::TableAlreadyExistsSnafu { - table_name: self.creator.data.table_ref().to_string(), - } - ); - - let table_id = value.table_id(); + // Return the table id if the table already exists. + if let Some(table_id) = table_id { return Ok(Status::done_with_output(table_id)); } - self.creator.data.state = CreateTableState::DatanodeCreateRegions; + self.data.state = CreateTableState::DatanodeCreateRegions; let TableMetadata { table_id, table_route, @@ -157,23 +154,13 @@ impl CreateTableProcedure { } = self .context .table_metadata_allocator - .create(&self.creator.data.task) + .create(&self.data.task) .await?; - self.creator - .set_allocated_metadata(table_id, table_route, region_wal_options); + self.set_allocated_metadata(table_id, table_route, region_wal_options); Ok(Status::executing(true)) } - pub fn new_region_request_builder( - &self, - physical_table_id: Option, - ) -> Result { - let create_table_expr = &self.creator.data.task.create_table; - let template = build_template(create_table_expr)?; - Ok(CreateRequestBuilder::new(template, physical_table_id)) - } - /// Creates regions on datanodes /// /// Abort(non-retry): @@ -187,90 +174,29 @@ impl CreateTableProcedure { /// - [Code::Unavailable](tonic::status::Code::Unavailable) pub async fn on_datanode_create_regions(&mut self) -> Result { let table_route = self.table_route()?.clone(); - let request_builder = self.new_region_request_builder(None)?; // Registers opening regions - let guards = self - .creator - .register_opening_regions(&self.context, &table_route.region_routes)?; + let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?; if !guards.is_empty() { - self.creator.opening_regions = guards; + self.opening_regions = guards; } - self.create_regions(&table_route.region_routes, request_builder) - .await + self.create_regions(&table_route.region_routes).await } - async fn create_regions( - &mut self, - region_routes: &[RegionRoute], - request_builder: CreateRequestBuilder, - ) -> Result { - let create_table_data = &self.creator.data; - // Safety: the region_wal_options must be allocated + async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result { + let table_id = self.table_id(); let region_wal_options = self.region_wal_options()?; - let create_table_expr = &create_table_data.task.create_table; - let catalog = &create_table_expr.catalog_name; - let schema = &create_table_expr.schema_name; - let storage_path = region_storage_path(catalog, schema); - let leaders = find_leaders(region_routes); - let mut create_region_tasks = Vec::with_capacity(leaders.len()); + let column_metadatas = self + .executor + .on_create_regions( + &self.context.node_manager, + table_id, + region_routes, + region_wal_options, + ) + .await?; - let partition_exprs = region_routes - .iter() - .map(|r| (r.region.id.region_number(), r.region.partition_expr())) - .collect(); - - for datanode in leaders { - let requester = self.context.node_manager.datanode(&datanode).await; - - let regions = find_leader_regions(region_routes, &datanode); - let mut requests = Vec::with_capacity(regions.len()); - for region_number in regions { - let region_id = RegionId::new(self.table_id(), region_number); - let create_region_request = request_builder.build_one( - region_id, - storage_path.clone(), - region_wal_options, - &partition_exprs, - ); - requests.push(PbRegionRequest::Create(create_region_request)); - } - - for request in requests { - let request = RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(request), - }; - - let datanode = datanode.clone(); - let requester = requester.clone(); - create_region_tasks.push(async move { - requester - .handle(request) - .await - .map_err(add_peer_context_if_needed(datanode)) - }); - } - } - - let mut results = join_all(create_region_tasks) - .await - .into_iter() - .collect::>>()?; - - if let Some(column_metadatas) = - extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)? - { - self.creator.data.column_metadatas = column_metadatas; - } else { - warn!( - "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged" - ); - } - - self.creator.data.state = CreateTableState::CreateMetadata; + self.data.column_metadatas = column_metadatas; + self.data.state = CreateTableState::CreateMetadata; Ok(Status::executing(true)) } @@ -280,107 +206,33 @@ impl CreateTableProcedure { /// - Failed to create table metadata. async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result { let table_id = self.table_id(); - let table_ref = self.creator.data.table_ref(); + let table_ref = self.data.table_ref(); let manager = &self.context.table_metadata_manager; - let mut raw_table_info = self.table_info().clone(); - if !self.creator.data.column_metadatas.is_empty() { - update_table_info_column_ids(&mut raw_table_info, &self.creator.data.column_metadatas); - } + let raw_table_info = self.table_info().clone(); // Safety: the region_wal_options must be allocated. let region_wal_options = self.region_wal_options()?.clone(); // Safety: the table_route must be allocated. let physical_table_route = self.table_route()?.clone(); - let detecting_regions = - convert_region_routes_to_detecting_regions(&physical_table_route.region_routes); - let table_route = TableRouteValue::Physical(physical_table_route); - manager - .create_table_metadata(raw_table_info, table_route, region_wal_options) + self.executor + .on_create_metadata( + manager, + &self.context.region_failure_detector_controller, + raw_table_info, + &self.data.column_metadatas, + physical_table_route, + region_wal_options, + ) .await?; - self.context - .register_failure_detectors(detecting_regions) - .await; + info!( "Successfully created table: {}, table_id: {}, procedure_id: {}", table_ref, table_id, pid ); - self.creator.opening_regions.clear(); + self.opening_regions.clear(); Ok(Status::done_with_output(table_id)) } -} - -#[async_trait] -impl Procedure for CreateTableProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - fn recover(&mut self) -> ProcedureResult<()> { - // Only registers regions if the table route is allocated. - if let Some(x) = &self.creator.data.table_route { - self.creator.opening_regions = self - .creator - .register_opening_regions(&self.context, &x.region_routes) - .map_err(BoxedError::new) - .context(ExternalSnafu { - clean_poisons: false, - })?; - } - - Ok(()) - } - - async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { - let state = &self.creator.data.state; - - let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE - .with_label_values(&[state.as_ref()]) - .start_timer(); - - match state { - CreateTableState::Prepare => self.on_prepare().await, - CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await, - CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await, - } - .map_err(map_to_procedure_error) - } - - fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.creator.data).context(ToJsonSnafu) - } - - fn lock_key(&self) -> LockKey { - let table_ref = &self.creator.data.table_ref(); - - LockKey::new(vec![ - CatalogLock::Read(table_ref.catalog).into(), - SchemaLock::read(table_ref.catalog, table_ref.schema).into(), - TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(), - ]) - } -} - -pub struct TableCreator { - /// The serializable data. - pub data: CreateTableData, - /// The guards of opening. - pub opening_regions: Vec, -} - -impl TableCreator { - pub fn new(task: CreateTableTask) -> Self { - Self { - data: CreateTableData { - state: CreateTableState::Prepare, - column_metadatas: vec![], - task, - table_route: None, - region_wal_options: None, - }, - opening_regions: vec![], - } - } /// Registers and returns the guards of the opening region if they don't exist. fn register_opening_regions( @@ -389,7 +241,6 @@ impl TableCreator { region_routes: &[RegionRoute], ) -> Result> { let opening_regions = operating_leader_regions(region_routes); - if self.opening_regions.len() == opening_regions.len() { return Ok(vec![]); } @@ -409,7 +260,7 @@ impl TableCreator { Ok(opening_region_guards) } - fn set_allocated_metadata( + pub fn set_allocated_metadata( &mut self, table_id: TableId, table_route: PhysicalTableRouteValue, @@ -421,6 +272,56 @@ impl TableCreator { } } +#[async_trait] +impl Procedure for CreateTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + fn recover(&mut self) -> ProcedureResult<()> { + // Only registers regions if the table route is allocated. + if let Some(x) = &self.data.table_route { + self.opening_regions = self + .register_opening_regions(&self.context, &x.region_routes) + .map_err(BoxedError::new) + .context(ExternalSnafu { + clean_poisons: false, + })?; + } + + Ok(()) + } + + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + CreateTableState::Prepare => self.on_prepare().await, + CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await, + CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await, + } + .map_err(map_to_procedure_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = &self.data.table_ref(); + + LockKey::new(vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(), + ]) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] pub enum CreateTableState { /// Prepares to create the table @@ -444,6 +345,16 @@ pub struct CreateTableData { } impl CreateTableData { + pub fn new(task: CreateTableTask) -> Self { + CreateTableData { + state: CreateTableState::Prepare, + column_metadatas: vec![], + task, + table_route: None, + region_wal_options: None, + } + } + fn table_ref(&self) -> TableReference<'_> { self.task.table_ref() } diff --git a/src/common/meta/src/ddl/create_table/executor.rs b/src/common/meta/src/ddl/create_table/executor.rs new file mode 100644 index 0000000000..2d58085f41 --- /dev/null +++ b/src/common/meta/src/ddl/create_table/executor.rs @@ -0,0 +1,203 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::region::region_request::Body as PbRegionRequest; +use api::v1::region::{RegionRequest, RegionRequestHeader}; +use common_telemetry::tracing_context::TracingContext; +use common_telemetry::warn; +use futures::future::join_all; +use snafu::ensure; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; +use store_api::storage::{RegionId, RegionNumber}; +use table::metadata::{RawTableInfo, TableId}; +use table::table_name::TableName; + +use crate::ddl::utils::raw_table_info::update_table_info_column_ids; +use crate::ddl::utils::{ + add_peer_context_if_needed, convert_region_routes_to_detecting_regions, + extract_column_metadatas, region_storage_path, +}; +use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef}; +use crate::error::{self, Result}; +use crate::key::TableMetadataManagerRef; +use crate::key::table_name::TableNameKey; +use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; +use crate::node_manager::NodeManagerRef; +use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders}; + +/// [CreateTableExecutor] performs: +/// - Creates the metadata of the table. +/// - Creates the regions on the Datanode nodes. +pub struct CreateTableExecutor { + create_if_not_exists: bool, + table_name: TableName, + builder: CreateRequestBuilder, +} + +impl CreateTableExecutor { + /// Creates a new [`CreateTableExecutor`]. + pub fn new( + table_name: TableName, + create_if_not_exists: bool, + builder: CreateRequestBuilder, + ) -> Self { + Self { + create_if_not_exists, + table_name, + builder, + } + } + + /// On the prepare step, it performs: + /// - Checks whether the table exists. + /// - Returns the table id if the table exists. + /// + /// Abort(non-retry): + /// - Table exists and `create_if_not_exists` is `false`. + /// - Failed to get the table name value. + pub async fn on_prepare( + &self, + table_metadata_manager: &TableMetadataManagerRef, + ) -> Result> { + let table_name_value = table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + &self.table_name.catalog_name, + &self.table_name.schema_name, + &self.table_name.table_name, + )) + .await?; + + if let Some(value) = table_name_value { + ensure!( + self.create_if_not_exists, + error::TableAlreadyExistsSnafu { + table_name: self.table_name.to_string(), + } + ); + + return Ok(Some(value.table_id())); + } + + Ok(None) + } + + pub async fn on_create_regions( + &self, + node_manager: &NodeManagerRef, + table_id: TableId, + region_routes: &[RegionRoute], + region_wal_options: &HashMap, + ) -> Result> { + let storage_path = + region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name); + let leaders = find_leaders(region_routes); + let mut create_region_tasks = Vec::with_capacity(leaders.len()); + let partition_exprs = region_routes + .iter() + .map(|r| (r.region.id.region_number(), r.region.partition_expr())) + .collect::>(); + + for datanode in leaders { + let requester = node_manager.datanode(&datanode).await; + + let regions = find_leader_regions(region_routes, &datanode); + let mut requests = Vec::with_capacity(regions.len()); + for region_number in regions { + let region_id = RegionId::new(table_id, region_number); + let create_region_request = self.builder.build_one( + region_id, + storage_path.clone(), + region_wal_options, + &partition_exprs, + ); + requests.push(PbRegionRequest::Create(create_region_request)); + } + + for request in requests { + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(request), + }; + + let datanode = datanode.clone(); + let requester = requester.clone(); + create_region_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(datanode)) + }); + } + } + + let mut results = join_all(create_region_tasks) + .await + .into_iter() + .collect::>>()?; + + let column_metadatas = if let Some(column_metadatas) = + extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)? + { + column_metadatas + } else { + warn!( + "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged" + ); + vec![] + }; + + Ok(column_metadatas) + } + + /// Creates table metadata + /// + /// Abort(non-retry): + /// - Failed to create table metadata. + pub async fn on_create_metadata( + &self, + table_metadata_manager: &TableMetadataManagerRef, + region_failure_detector_controller: &RegionFailureDetectorControllerRef, + mut raw_table_info: RawTableInfo, + column_metadatas: &[ColumnMetadata], + table_route: PhysicalTableRouteValue, + region_wal_options: HashMap, + ) -> Result<()> { + if !column_metadatas.is_empty() { + update_table_info_column_ids(&mut raw_table_info, column_metadatas); + } + let detecting_regions = + convert_region_routes_to_detecting_regions(&table_route.region_routes); + let table_route = TableRouteValue::Physical(table_route); + table_metadata_manager + .create_table_metadata(raw_table_info, table_route, region_wal_options) + .await?; + region_failure_detector_controller + .register_failure_detectors(detecting_regions) + .await; + + Ok(()) + } + + /// Returns the builder of the executor. + pub fn builder(&self) -> &CreateRequestBuilder { + &self.builder + } +} diff --git a/src/common/meta/src/ddl/create_table_template.rs b/src/common/meta/src/ddl/create_table/template.rs similarity index 100% rename from src/common/meta/src/ddl/create_table_template.rs rename to src/common/meta/src/ddl/create_table/template.rs diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 5f0a5d8f48..5cc1db71cb 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -162,7 +162,7 @@ async fn test_on_prepare_table_exists_err() { ) .await .unwrap(); - let mut procedure = CreateTableProcedure::new(task, ddl_context); + let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap(); let err = procedure.on_prepare().await.unwrap_err(); assert_matches!(err, Error::TableAlreadyExists { .. }); assert_eq!(err.status_code(), StatusCode::TableAlreadyExists); @@ -185,7 +185,7 @@ async fn test_on_prepare_with_create_if_table_exists() { ) .await .unwrap(); - let mut procedure = CreateTableProcedure::new(task, ddl_context); + let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap(); let status = procedure.on_prepare().await.unwrap(); assert_matches!(status, Status::Done { output: Some(..) }); let table_id = *status.downcast_output_ref::().unwrap(); @@ -198,7 +198,7 @@ async fn test_on_prepare_without_create_if_table_exists() { let ddl_context = new_ddl_context(node_manager); let mut task = test_create_table_task("foo"); task.create_table.create_if_not_exists = true; - let mut procedure = CreateTableProcedure::new(task, ddl_context); + let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap(); let status = procedure.on_prepare().await.unwrap(); assert_matches!( status, @@ -217,7 +217,7 @@ async fn test_on_datanode_create_regions_should_retry() { let ddl_context = new_ddl_context(node_manager); let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); - let mut procedure = CreateTableProcedure::new(task, ddl_context); + let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap(); procedure.on_prepare().await.unwrap(); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), @@ -234,7 +234,7 @@ async fn test_on_datanode_create_regions_should_not_retry() { let ddl_context = new_ddl_context(node_manager); let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); - let mut procedure = CreateTableProcedure::new(task, ddl_context); + let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap(); procedure.on_prepare().await.unwrap(); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), @@ -251,7 +251,7 @@ async fn test_on_create_metadata_error() { let ddl_context = new_ddl_context(node_manager); let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); - let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone()); + let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone()).unwrap(); procedure.on_prepare().await.unwrap(); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), @@ -284,7 +284,7 @@ async fn test_on_create_metadata() { let ddl_context = new_ddl_context(node_manager); let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); - let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()); + let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap(); procedure.on_prepare().await.unwrap(); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), @@ -312,16 +312,16 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend); let task = test_create_table_task("foo"); - let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()); + let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap(); execute_procedure_until(&mut procedure, |p| { - p.creator.data.state == CreateTableState::CreateMetadata + p.data.state == CreateTableState::CreateMetadata }) .await; // Ensure that after running to the state `CreateMetadata`(just past `DatanodeCreateRegions`), // the opening regions should be recorded: - let guards = &procedure.creator.opening_regions; + let guards = &procedure.opening_regions; assert_eq!(guards.len(), 1); let (datanode_id, region_id) = (0, RegionId::new(procedure.table_id(), 0)); assert_eq!(guards[0].info(), (datanode_id, region_id)); @@ -334,7 +334,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { execute_procedure_until_done(&mut procedure).await; // Ensure that when run to the end, the opening regions should be cleared: - let guards = &procedure.creator.opening_regions; + let guards = &procedure.opening_regions; assert!(guards.is_empty()); assert!( !ddl_context diff --git a/src/common/meta/src/ddl/tests/create_view.rs b/src/common/meta/src/ddl/tests/create_view.rs index e730176fc6..0ab242cce0 100644 --- a/src/common/meta/src/ddl/tests/create_view.rs +++ b/src/common/meta/src/ddl/tests/create_view.rs @@ -259,7 +259,7 @@ async fn test_replace_table() { { // Create a `foo` table. let task = test_create_table_task("foo"); - let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()); + let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap(); procedure.on_prepare().await.unwrap(); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index ff8583b6b9..74493d4d47 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -231,7 +231,7 @@ impl DdlManager { ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); - let procedure = CreateTableProcedure::new(create_table_task, context); + let procedure = CreateTableProcedure::new(create_table_task, context)?; let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 8b3f7cc852..105bfb7bc4 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -92,7 +92,8 @@ fn test_region_request_builder() { let mut procedure = CreateTableProcedure::new( create_table_task(None), test_data::new_ddl_context(Arc::new(NodeClients::default())), - ); + ) + .unwrap(); procedure.set_allocated_metadata( 1024, @@ -100,7 +101,7 @@ fn test_region_request_builder() { HashMap::default(), ); - let template = procedure.new_region_request_builder(None).unwrap(); + let template = procedure.executor.builder(); let expected = PbCreateRegionRequest { region_id: 0, @@ -187,7 +188,8 @@ async fn test_on_datanode_create_regions() { let mut procedure = CreateTableProcedure::new( create_table_task(None), test_data::new_ddl_context(node_manager), - ); + ) + .unwrap(); procedure.set_allocated_metadata( 42, @@ -226,7 +228,7 @@ async fn test_on_datanode_create_regions() { } )); assert!(matches!( - procedure.creator.data.state, + procedure.data.state, CreateTableState::CreateMetadata ));